diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 35 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 2 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 42 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/process.lux | 2 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 16 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 14 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 32 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/task.lux | 25 |
8 files changed, 82 insertions, 86 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index 9b20dcfde..3e288ca42 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -3,15 +3,15 @@ [control monad ["p" parser] ["ex" exception (#+ exception:)]] - ["." io (#- run) ("io/." Monad<IO>)] + ["." io ("io/." monad)] [data ["." product] ["e" error] [text format] [collection - ["." list ("list/." Monoid<List> Monad<List> Fold<List>)]]] - ["." macro (#+ with-gensyms Monad<Meta>) + ["." list ("list/." monoid monad fold)]]] + ["." macro (#+ with-gensyms monad) ["." code] ["s" syntax (#+ syntax: Syntax)] [syntax @@ -23,7 +23,7 @@ abstract]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver) ("promise/." Monad<Promise>)] + ["." promise (#+ Promise Resolver) ("promise/." monad)] ["." task (#+ Task)]]) (exception: #export poisoned) @@ -33,7 +33,6 @@ (ex.report ["Actor" actor-name] ["Message" message-name])) -## [Types] (with-expansions [<Message> (as-is (-> s (Actor s) (Task s))) <Obituary> (as-is [Text s (List <Message>)]) @@ -90,7 +89,7 @@ (promise.promise [])) process (loop [state init [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))] - (do promise.Monad<Promise> + (do promise.monad [[head tail] |mailbox| ?state' (handle head state self)] (case ?state' @@ -120,7 +119,7 @@ (All [s] (-> (Message s) (Actor s) (IO Bit))) (if (alive? actor) (let [entry [message (promise.promise [])]] - (do Monad<IO> + (do io.monad [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))] (loop [[|mailbox| resolve] |mailbox|&resolve] (case (promise.poll |mailbox|) @@ -139,7 +138,6 @@ ) ) -## [Values] (def: (default-handle message state self) (All [s] (-> (Message s) s (Actor s) (Task s))) (message state self)) @@ -161,7 +159,6 @@ (task.throw poisoned [])) actor)) -## [Syntax] (do-template [<with> <resolve> <tag> <desc>] [(def: #export (<with> name) (-> Name cs.Annotations cs.Annotations) @@ -170,7 +167,7 @@ (def: #export (<resolve> name) (-> Name (Meta Name)) - (do Monad<Meta> + (do io.monad [[_ annotations _] (macro.find-def name)] (case (macro.get-tag-ann (name-of <tag>) annotations) (#.Some actor-name) @@ -186,7 +183,7 @@ (def: actor-decl^ (Syntax [Text (List Text)]) (p.either (s.form (p.and s.local-identifier (p.some s.local-identifier))) - (p.and s.local-identifier (:: p.Monad<Parser> wrap (list))))) + (p.and s.local-identifier (:: p.monad wrap (list))))) (do-template [<name> <desc>] [(def: #export <name> @@ -227,7 +224,7 @@ Nat ((stop cause state) - (:: promise.Monad<Promise> wrap + (:: promise.monad wrap (log! (if (ex.match? ..poisoned cause) (format "Counter was poisoned: " (%n state)) cause))))) @@ -236,7 +233,7 @@ (List a) ((handle message state self) - (do task.Monad<Task> + (do task.monad [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] @@ -268,7 +265,7 @@ (~ (code.local-identifier messageN)) (~ (code.local-identifier stateN)) (~ (code.local-identifier selfN))) - (do task.Monad<Task> + (do task.monad [] (~ bodyC)))))) #..end (~ (case ?stop @@ -279,7 +276,7 @@ (` (function ((~ g!_) (~ (code.local-identifier causeN)) (~ (code.local-identifier stateN))) - (do promise.Monad<Promise> + (do promise.monad [] (~ bodyC))))))})) (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) @@ -309,7 +306,7 @@ (def: reference^ (s.Syntax [Name (List Text)]) (p.either (s.form (p.and s.identifier (p.some s.local-identifier))) - (p.and s.identifier (:: p.Monad<Parser> wrap (list))))) + (p.and s.identifier (:: p.monad wrap (list))))) (syntax: #export (message: {export csr.export} @@ -367,12 +364,12 @@ (let [[(~ g!task) (~ g!resolve)] (: [(task.Task (~ g!outputT)) (task.Resolver (~ g!outputT))] (task.task []))] - (io.run (do io.Monad<IO> + (io.run (do io.monad [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) - (do promise.Monad<Promise> + (do promise.monad [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) (~ g!outputT)]) - (do task.Monad<Task> + (do task.monad [] (~ body)))] (case (~ g!return) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux index b1692b6e3..61152d7b6 100644 --- a/stdlib/source/lux/control/concurrency/atom.lux +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -46,7 +46,7 @@ "The retries will be done with the new values of the atom, as they show up.")} (All [a] (-> (-> a a) (Atom a) (IO a))) (loop [_ []] - (do io.Monad<IO> + (do io.monad [old (read atom) #let [new (f old)] swapped? (compare-and-swap old new atom)] diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index 18b385a65..84def78d1 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -8,14 +8,14 @@ [equivalence (#+ Equivalence)]] ["." io (#+ IO)] [data - [maybe ("maybe/." Functor<Maybe>)] + [maybe ("maybe/." functor)] [collection - [list ("list/." Monoid<List>)]]] + [list ("list/." monoid)]]] [type (#+ :share) abstract]] [// ["." atom (#+ Atom)] - ["." promise (#+ Promise) ("promise/." Functor<Promise>)]]) + ["." promise (#+ Promise) ("promise/." functor)]]) (type: #export (Channel a) {#.doc "An asynchronous channel to distribute values."} @@ -35,7 +35,7 @@ (structure (def: close (loop [_ []] - (do io.Monad<IO> + (do io.monad [current (atom.read source) stopped? (current #.None)] (if stopped? @@ -52,7 +52,7 @@ (def: (feed value) (loop [_ []] - (do io.Monad<IO> + (do io.monad [current (atom.read source) #let [[next resolve-next] (:share [a] {(promise.Resolver (Maybe [a (Channel a)])) @@ -82,7 +82,7 @@ (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) (io.io (exec (: (Promise Any) (loop [channel channel] - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons (#.Some [head tail]) @@ -93,18 +93,18 @@ (wrap []))))) []))) -(structure: #export _ (Functor Channel) +(structure: #export functor (Functor Channel) (def: (map f) (promise/map (maybe/map (function (_ [head tail]) [(f head) (map f tail)]))))) -(structure: #export _ (Apply Channel) - (def: functor Functor<Channel>) +(structure: #export apply (Apply Channel) + (def: &functor ..functor) (def: (apply ff fa) - (do promise.Monad<Promise> + (do promise.monad [cons-f ff cons-a fa] (case [cons-f cons-a] @@ -114,8 +114,8 @@ _ (wrap #.None))))) -(structure: #export _ (Monad Channel) - (def: functor Functor<Channel>) +(structure: #export monad (Monad Channel) + (def: &functor ..functor) (def: (wrap a) (promise.resolved (#.Some [a (promise.resolved #.None)]))) @@ -128,7 +128,7 @@ (def: #export (filter pass? channel) (All [a] (-> (Predicate a) (Channel a) (Channel a))) - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons (#.Some [head tail]) @@ -151,7 +151,7 @@ (All [a b] (-> (-> b a (Promise a)) a (Channel b) (Promise a))) - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons #.None @@ -167,7 +167,7 @@ (All [a b] (-> (-> b a (Promise a)) a (Channel b) (Channel a))) - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons #.None @@ -182,7 +182,7 @@ (All [a] (-> Nat (IO a) (Channel a))) (let [[output source] (channel [])] (exec (io.run (loop [_ []] - (do io.Monad<IO> + (do io.monad [value action _ (:: source feed value)] (promise.await recur (promise.wait milli-seconds))))) @@ -194,7 +194,7 @@ (def: #export (iterate f init) (All [a] (-> (-> a (Promise (Maybe a))) a (Channel a))) - (do promise.Monad<Promise> + (do promise.monad [?next (f init)] (case ?next (#.Some next) @@ -205,7 +205,7 @@ (def: (distinct' equivalence previous channel) (All [a] (-> (Equivalence a) a (Channel a) (Channel a))) - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons (#.Some [head tail]) @@ -218,7 +218,7 @@ (def: #export (distinct equivalence channel) (All [a] (-> (Equivalence a) (Channel a) (Channel a))) - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons (#.Some [head tail]) @@ -230,7 +230,7 @@ (def: #export (consume channel) {#.doc "Reads the entirety of a channel's content and returns it as a list."} (All [a] (-> (Channel a) (Promise (List a)))) - (do promise.Monad<Promise> + (do promise.monad [cons channel] (case cons (#.Some [head tail]) @@ -247,6 +247,6 @@ (promise.resolved #.None) (#.Cons head tail) - (promise.resolved (#.Some [head (do promise.Monad<Promise> + (promise.resolved (#.Some [head (do promise.monad [_ (promise.wait milli-seconds)] (sequential milli-seconds tail))])))) diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux index a67734747..d1d2ac245 100644 --- a/stdlib/source/lux/control/concurrency/process.lux +++ b/stdlib/source/lux/control/concurrency/process.lux @@ -87,7 +87,7 @@ (def: #export run! (IO Any) (loop [_ []] - (do io.Monad<IO> + (do io.monad [processes (atom.read runner)] (case processes ## And... we're done! diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index 33a04190b..244951139 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -25,7 +25,7 @@ {#.doc "Sets an promise's value if it has not been done yet."} (All [a] (-> (Promise a) (Resolver a))) (function (resolve value) - (do io.Monad<IO> + (do io.monad [(^@ old [_value _observers]) (atom.read promise)] (case _value (#.Some _) @@ -82,14 +82,14 @@ (#.Some _) #1)) -(structure: #export _ (Functor Promise) +(structure: #export functor (Functor Promise) (def: (map f fa) (let [[fb resolve] (..promise [])] (exec (io.run (await (|>> f resolve) fa)) fb)))) -(structure: #export _ (Apply Promise) - (def: functor Functor<Promise>) +(structure: #export apply (Apply Promise) + (def: &functor ..functor) (def: (apply ff fa) (let [[fb resolve] (..promise [])] @@ -98,8 +98,8 @@ ff)) fb)))) -(structure: #export _ (Monad Promise) - (def: functor Functor<Promise>) +(structure: #export monad (Monad Promise) + (def: &functor ..functor) (def: wrap ..resolved) @@ -113,7 +113,7 @@ (def: #export (and left right) {#.doc "Sequencing combinator."} (All [a b] (-> (Promise a) (Promise b) (Promise [a b]))) - (do Monad<Promise> + (do ..monad [a left b right] (wrap [a b]))) @@ -148,7 +148,7 @@ "Returns a Promise that will eventually host its result.")} (All [a] (-> Nat (IO a) (Promise a))) (let [[!out resolve] (..promise [])] - (exec (|> (do io.Monad<IO> + (exec (|> (do io.monad [value computation] (resolve value)) (process.schedule millis-delay) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index 46762ecf3..ddc73b300 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -29,7 +29,7 @@ (io.run (loop [signal (: (Promise Any) (promise.promise #.None))] - (do io.Monad<IO> + (do io.monad [state (atom.read semaphore) #let [[ready? state'] (: [Bit State] (case (get@ #open-positions state) @@ -50,7 +50,7 @@ (let [semaphore (:representation semaphore)] (promise.future (loop [_ []] - (do io.Monad<IO> + (do io.monad [state (atom.read semaphore) #let [[?signal state'] (: [(Maybe (Promise Any)) State] (case (get@ #waiting-list state) @@ -91,7 +91,7 @@ (def: #export (synchronize mutex procedure) (All [a] (-> Mutex (IO (Promise a)) (Promise a))) - (do promise.Monad<Promise> + (do promise.monad [_ (acquire mutex) output (io.run procedure) _ (release mutex)] @@ -120,15 +120,15 @@ (-> Nat Semaphore (Promise Any)) (loop [step 0] (if (n/< times step) - (do promise.Monad<Promise> + (do promise.monad [_ (signal turnstile)] (recur (inc step))) - (:: promise.Monad<Promise> wrap [])))) + (:: promise.monad wrap [])))) (do-template [<phase> <update> <goal> <turnstile>] [(def: (<phase> (^:representation barrier)) (-> Barrier (Promise Any)) - (do promise.Monad<Promise> + (do promise.monad [#let [limit (refinement.un-refine (get@ #limit barrier)) goal <goal> count (io.run (atom.update <update> (get@ #count barrier)))] @@ -143,7 +143,7 @@ (def: #export (block barrier) (-> Barrier (Promise Any)) - (do promise.Monad<Promise> + (do promise.monad [_ (start barrier)] (end barrier))) ) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 34122abd4..5bb537025 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -15,7 +15,7 @@ [// ["." atom (#+ Atom atom)] ["." promise (#+ Promise Resolver)] - ["." frp ("frp/." Functor<Channel>)]]) + ["." frp ("frp/." functor)]]) (type: #export (Observer a) (-> a (IO Any))) @@ -39,11 +39,11 @@ (All [a] (-> (Var a) (IO a))) (|> var atom.read - (:: io.Functor<IO> map product.left))) + (:: io.functor map product.left))) (def: (write! new-value (^:representation var)) (All [a] (-> a (Var a) (IO Any))) - (do io.Monad<IO> + (do io.monad [(^@ old [_value _observers]) (atom.read var) succeeded? (atom.compare-and-swap old [new-value _observers] var)] (if succeeded? @@ -55,7 +55,7 @@ (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} (All [a] (-> (Var a) (IO (frp.Channel a)))) - (do io.Monad<IO> + (do io.monad [#let [[channel source] (frp.channel []) target (:representation target)] _ (atom.update (function (_ [value observers]) @@ -82,8 +82,8 @@ (list.find (function (_ [_var _original _current]) (is? (:coerce (Var Any) var) (:coerce (Var Any) _var)))) - (:: maybe.Monad<Maybe> map (function (_ [_var _original _current]) - _current)) + (:: maybe.monad map (function (_ [_var _original _current]) + _current)) (:assume) )) @@ -137,8 +137,8 @@ (let [[tx' a] (fa tx)] [tx' (f a)])))) -(structure: #export _ (Apply STM) - (def: functor Functor<STM>) +(structure: #export apply (Apply STM) + (def: &functor ..functor) (def: (apply ff fa) (function (_ tx) @@ -146,8 +146,8 @@ [tx'' a] (fa tx')] [tx'' (f a)])))) -(structure: #export _ (Monad STM) - (def: functor Functor<STM>) +(structure: #export monad (Monad STM) + (def: &functor ..functor) (def: (wrap a) (function (_ tx) [tx a])) @@ -160,7 +160,7 @@ (def: #export (update f var) {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) - (do Monad<STM> + (do ..monad [a (read var) #let [a' (f a)] _ (write a' var)] @@ -198,12 +198,12 @@ (def: (issue-commit commit) (All [a] (-> (Commit a) (IO Any))) (let [entry [commit (promise.promise [])]] - (do io.Monad<IO> + (do io.monad [|commits|&resolve (atom.read pending-commits)] (loop [[|commits| resolve] |commits|&resolve] (case (promise.poll |commits|) #.None - (do io.Monad<IO> + (do io.monad [resolved? (resolve entry)] (if resolved? (atom.write (product.right entry) pending-commits) @@ -217,14 +217,14 @@ (let [[stm-proc output resolve] commit [finished-tx value] (stm-proc fresh-tx)] (if (can-commit? finished-tx) - (do io.Monad<IO> + (do io.monad [_ (monad.map @ commit-var! finished-tx)] (resolve value)) (issue-commit commit)))) (def: init-processor! (IO Any) - (do io.Monad<IO> + (do io.monad [flag (atom.read commit-processor-flag)] (if flag (wrap []) @@ -247,7 +247,7 @@ "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} (All [a] (-> (STM a) (Promise a))) (let [[output resolver] (promise.promise [])] - (exec (io.run (do io.Monad<IO> + (exec (io.run (do io.monad [_ init-processor!] (issue-commit [stm-proc output resolver]))) output))) diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux index a5bf17819..1f16da8ca 100644 --- a/stdlib/source/lux/control/concurrency/task.lux +++ b/stdlib/source/lux/control/concurrency/task.lux @@ -29,16 +29,15 @@ (def: #export (throw exception message) (All [e a] (-> (Exception e) e (Task a))) - (:: promise.Monad<Promise> wrap - (ex.throw exception message))) + (:: promise.monad wrap (ex.throw exception message))) (def: #export (try computation) (All [a] (-> (Task a) (Task (Error a)))) - (:: promise.Functor<Promise> map (|>> #error.Success) computation)) + (:: promise.functor map (|>> #error.Success) computation)) -(structure: #export _ (Functor Task) +(structure: #export functor (Functor Task) (def: (map f fa) - (:: promise.Functor<Promise> map + (:: promise.functor map (function (_ fa') (case fa' (#error.Failure error) @@ -48,25 +47,25 @@ (#error.Success (f a)))) fa))) -(structure: #export _ (Apply Task) - (def: functor Functor<Task>) +(structure: #export apply (Apply Task) + (def: &functor ..functor) (def: (apply ff fa) - (do promise.Monad<Promise> + (do promise.monad [ff' ff fa' fa] - (wrap (do error.Monad<Error> + (wrap (do error.monad [f ff' a fa'] (wrap (f a))))))) -(structure: #export _ (Monad Task) - (def: functor Functor<Task>) +(structure: #export monad (Monad Task) + (def: &functor ..functor) (def: wrap return) (def: (join mma) - (do promise.Monad<Promise> + (do promise.monad [mma' mma] (case mma' (#error.Failure error) @@ -81,4 +80,4 @@ (def: #export (from-promise promise) (All [a] (-> (Promise a) (Task a))) - (:: promise.Functor<Promise> map (|>> #error.Success) promise)) + (:: promise.functor map (|>> #error.Success) promise)) |