diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 15 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 18 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 111 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/thread.lux | 4 |
4 files changed, 66 insertions, 82 deletions
diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux index b82a24cca..5f3719ba8 100644 --- a/stdlib/source/lux/control/concurrency/atom.lux +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -6,8 +6,9 @@ [monad (#+ do)]] [control ["." function] - ["." io (#- run)]] + ["." io (#- run) ("#\." functor)]] [data + ["." product] [collection ["." array]]] [type @@ -71,16 +72,18 @@ {#.doc (doc "Updates an atom by applying a function to its current value." "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds." "The retries will be done with the new values of the atom, as they show up.")} - (All [a] (-> (-> a a) (Atom a) (IO a))) + (All [a] (-> (-> a a) (Atom a) (IO [a a]))) (loop [_ []] (do io.monad [old (read atom) #let [new (f old)] - swapped? (compare_and_swap old new atom)] + swapped? (..compare_and_swap old new atom)] (if swapped? - (wrap new) + (wrap [old new]) (recur []))))) (def: #export (write value atom) - (All [a] (-> a (Atom a) (IO Any))) - (..update (function.constant value) atom)) + (All [a] (-> a (Atom a) (IO a))) + (|> atom + (..update (function.constant value)) + (io\map product.left))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index 96822700d..6f8a35f96 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -93,7 +93,7 @@ (def: (map f fa) (let [[fb resolve] (..promise [])] - (exec (io.run (await (|>> f resolve) fa)) + (exec (io.run (..await (|>> f resolve) fa)) fb)))) (structure: #export apply @@ -103,9 +103,9 @@ (def: (apply ff fa) (let [[fb resolve] (..promise [])] - (exec (io.run (await (function (_ f) - (await (|>> f resolve) fa)) - ff)) + (exec (io.run (..await (function (_ f) + (..await (|>> f resolve) fa)) + ff)) fb)))) (structure: #export monad @@ -117,9 +117,7 @@ (def: (join mma) (let [[ma resolve] (promise [])] - (exec (io.run (await (function (_ ma') - (await resolve ma')) - mma)) + (exec (io.run (..await (..await resolve) mma)) ma)))) (def: #export (and left right) @@ -171,17 +169,17 @@ {#.doc (doc "Runs an I/O computation on its own thread." "Returns a Promise that will eventually host its result.")} (All [a] (-> (IO a) (Promise a))) - (schedule 0)) + (..schedule 0)) (def: #export (delay time_millis value) {#.doc "Delivers a value after a certain period has passed."} (All [a] (-> Nat a (Promise a))) - (schedule time_millis (io value))) + (..schedule time_millis (io value))) (def: #export (wait time_millis) {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."} (-> Nat (Promise Any)) - (delay time_millis [])) + (..delay time_millis [])) (def: #export (time_out time_millis promise) {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."} diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index 5be5582de..0e8fa2b94 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -3,6 +3,7 @@ [abstract [monad (#+ do)]] [control + [pipe (#+ if>)] ["." io (#+ IO)] ["." try (#+ Try)] ["." exception (#+ exception:)]] @@ -48,23 +49,19 @@ (let [semaphore (:representation semaphore) [signal sink] (: [(Promise Any) (Resolver Any)] (promise.promise []))] - (exec (promise.future - (loop [_ []] + (exec (io.run + (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))] (do io.monad - [state (atom.read semaphore) - #let [[ready? state'] (: [Bit State] - (if (i.> +0 (get@ #open_positions state)) - [true (|> state - (update@ #open_positions dec))] - [false (|> state - (update@ #open_positions dec) - (update@ #waiting_list (queue.push sink)))]))] - success? (atom.compare_and_swap state state' semaphore)] - (if success? - (if ready? - (sink []) - (wrap false)) - (recur []))))) + [[_ state'] (atom.update (|>> (update@ #open_positions dec) + (if> [<had_open_position?>] + [] + [(update@ #waiting_list (queue.push sink))])) + semaphore)] + (with_expansions [<go_ahead> (sink []) + <get_in_line> (wrap false)] + (if (|> state' <had_open_position?>) + <go_ahead> + <get_in_line>))))) signal))) (exception: #export (semaphore_is_maxed_out {max_positions Nat}) @@ -75,42 +72,25 @@ (Ex [k] (-> Semaphore (Promise (Try Int)))) (let [semaphore (:representation semaphore)] (promise.future - (loop [_ []] - (do {! io.monad} - [state (atom.read semaphore) - #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit] - (case (queue.peek (get@ #waiting_list state)) - #.None - (if (n.= (get@ #max_positions state) - (.nat (get@ #open_positions state))) - [#.None - state - true] - [#.None - (update@ #open_positions inc state) - false]) - - (#.Some head) - [(#.Some head) - (|> state - (update@ #open_positions inc) - (update@ #waiting_list queue.pop)) - false]))]] - (if maxed_out? - (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)])) - (do ! - [#let [open_positions (get@ #open_positions state')] - success? (atom.compare_and_swap state state' semaphore)] - (if success? - (do ! - [_ (case ?sink - #.None - (wrap true) - - (#.Some sink) - (sink []))] - (wrap (#try.Success open_positions))) - (recur []))))))))) + (do {! io.monad} + [[pre post] (atom.update (function (_ state) + (if (i.= (.int (get@ #max_positions state)) + (get@ #open_positions state)) + state + (|> state + (update@ #open_positions inc) + (update@ #waiting_list queue.pop)))) + semaphore)] + (if (is? pre post) + (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)])) + (do ! + [_ (case (queue.peek (get@ #waiting_list pre)) + #.None + (wrap true) + + (#.Some sink) + (sink []))] + (wrap (#try.Success (get@ #open_positions post))))))))) ) (abstract: #export Mutex @@ -124,23 +104,26 @@ (def: acquire (-> Mutex (Promise Any)) - (|>> :representation wait)) + (|>> :representation ..wait)) (def: release (-> Mutex (Promise Any)) - (|>> :representation signal)) + (|>> :representation ..signal)) (def: #export (synchronize mutex procedure) (All [a] (-> Mutex (IO (Promise a)) (Promise a))) (do promise.monad - [_ (acquire mutex) + [_ (..acquire mutex) output (io.run procedure) - _ (release mutex)] + _ (..release mutex)] (wrap output))) ) -(def: #export limit (refinement.refinement (n.> 0))) -(type: #export Limit (:~ (refinement.type limit))) +(def: #export limit + (refinement.refinement (n.> 0))) + +(type: #export Limit + (:~ (refinement.type limit))) (abstract: #export Barrier {#limit Limit @@ -154,15 +137,15 @@ (-> Limit Barrier) (:abstraction {#limit limit #count (atom.atom 0) - #start_turnstile (semaphore 0) - #end_turnstile (semaphore 0)})) + #start_turnstile (..semaphore 0) + #end_turnstile (..semaphore 0)})) (def: (un_block times turnstile) (-> Nat Semaphore (Promise Any)) (loop [step 0] (if (n.< times step) (do promise.monad - [_ (..signal turnstile)] + [outcome (..signal turnstile)] (recur (inc step))) (\ promise.monad wrap [])))) @@ -172,11 +155,11 @@ (do promise.monad [#let [limit (refinement.un_refine (get@ #limit barrier)) goal <goal> - count (io.run (atom.update <update> (get@ #count barrier))) + [_ count] (io.run (atom.update <update> (get@ #count barrier))) reached? (n.= goal count)]] (if reached? - (un_block limit (get@ <turnstile> barrier)) - (wait (get@ <turnstile> barrier)))))] + (..un_block (dec limit) (get@ <turnstile> barrier)) + (..wait (get@ <turnstile> barrier)))))] [start inc limit #start_turnstile] [end dec 0 #end_turnstile] diff --git a/stdlib/source/lux/control/concurrency/thread.lux b/stdlib/source/lux/control/concurrency/thread.lux index 2ae0afec9..a34e050d5 100644 --- a/stdlib/source/lux/control/concurrency/thread.lux +++ b/stdlib/source/lux/control/concurrency/thread.lux @@ -5,7 +5,7 @@ [abstract ["." monad (#+ do)]] [control - ["ex" exception (#+ exception:)] + ["." exception (#+ exception:)] ["." io (#+ IO io)]] [data [collection @@ -153,6 +153,6 @@ (do ! [_ (monad.map ! (get@ #action) ready)] (run! [])) - (error! (ex.construct ..cannot_continue_running_threads [])))) + (error! (exception.construct ..cannot_continue_running_threads [])))) ))) )) |