From a2d994a3f7a39964452df7523f69e16b10b266f9 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 5 Apr 2022 07:16:43 -0400 Subject: Properly handling variance for remaining mutable types. --- .../library/lux/control/concurrency/actor.lux | 4 +- .../library/lux/control/concurrency/async.lux | 110 ++++++++++++------- .../library/lux/control/concurrency/atom.lux | 104 +++++++++--------- .../source/library/lux/control/concurrency/frp.lux | 93 +++++++++------- .../library/lux/control/concurrency/semaphore.lux | 2 +- .../source/library/lux/control/concurrency/stm.lux | 119 ++++++++++++--------- .../library/lux/control/concurrency/thread.lux | 16 +-- stdlib/source/library/lux/control/thread.lux | 55 +++++----- .../library/lux/tool/compiler/default/platform.lux | 20 ++-- .../library/lux/tool/compiler/meta/io/archive.lux | 12 ++- stdlib/source/test/lux/control/concurrency/frp.lux | 10 +- stdlib/source/test/lux/control/thread.lux | 4 +- 12 files changed, 312 insertions(+), 237 deletions(-) (limited to 'stdlib/source') diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux index 06eb74e44..8334f6f41 100644 --- a/stdlib/source/library/lux/control/concurrency/actor.lux +++ b/stdlib/source/library/lux/control/concurrency/actor.lux @@ -33,7 +33,7 @@ [// ["[0]" atom {"+" Atom atom}] ["[0]" async {"+" Async Resolver} ("[1]#[0]" monad)] - ["[0]" frp {"+" Channel}]]) + ["[0]" frp {"+" Channel Channel'}]]) (exception: .public poisoned) (exception: .public dead) @@ -350,7 +350,7 @@ (def: stop! false) (def: .public (observe! action channel actor) - (All (_ e s) (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any))) + (All (_ r w s) (-> (-> r Stop (Mail s)) (Channel' r w) (Actor s) (IO Any))) (let [signal (is (Atom Bit) (atom.atom ..continue!)) stop (is Stop diff --git a/stdlib/source/library/lux/control/concurrency/async.lux b/stdlib/source/library/lux/control/concurrency/async.lux index 6070efb3e..e8edda452 100644 --- a/stdlib/source/library/lux/control/concurrency/async.lux +++ b/stdlib/source/library/lux/control/concurrency/async.lux @@ -8,26 +8,34 @@ [control ["[0]" pipe] ["[0]" function] - ["[0]" io {"+" IO io}]] + ["[0]" io {"+" IO io}] + ["[0]" maybe ("[1]#[0]" functor)]] [data ["[0]" product]] [macro ["^" pattern]] [type {"+" sharing} - [abstract {"-" pattern}]]]] + [abstract {"-" pattern}] + ["[0]" variance {"+" Mutable}]]]] [// ["[0]" thread] ["[0]" atom {"+" Atom atom}]]) -(abstract: .public (Async a) +(abstract: .public (Async'' a) (Atom [(Maybe a) (List (-> a (IO Any)))]) - (type: .public (Resolver a) - (-> a (IO Bit))) + (type: .public (Async' r w) + (Async'' (Mutable r w))) + + (type: .public (Async a) + (Async'' (Mutable a a))) + + (type: .public (Resolver w) + (-> w (IO Bit))) ... Sets an async's value if it has not been done yet. (def: (resolver async) - (All (_ a) (-> (Async a) (Resolver a))) + (All (_ r w) (-> (Async' r w) (Resolver w))) (function (resolve value) (let [async (representation async)] (do [! io.monad] @@ -38,50 +46,49 @@ {.#None} (do ! - [.let [new [{.#Some value} {.#None}]] - succeeded? (atom.compare_and_swap! old new async)] + [succeeded? (atom.compare_and_swap! old [{.#Some (variance.write value)} (list)] async)] (if succeeded? (do ! - [_ (monad.each ! (function (_ f) (f value)) + [_ (monad.each ! (function.on (variance.write value)) _observers)] (in #1)) (resolve value)))))))) (def: .public (resolved value) (All (_ a) (-> a (Async a))) - (abstraction (atom [{.#Some value} (list)]))) + (abstraction (atom [{.#Some (variance.write value)} (list)]))) (def: .public (async _) - (All (_ a) (-> Any [(Async a) (Resolver a)])) + (All (_ r w) (-> Any [(Async' r w) (Resolver w)])) (let [async (abstraction (atom [{.#None} (list)]))] [async (..resolver async)])) (def: .public value - (All (_ a) (-> (Async a) (IO (Maybe a)))) + (All (_ r w) (-> (Async' r w) (IO (Maybe r)))) (|>> representation atom.read! - (# io.functor each product.left))) + (# io.functor each (|>> product.left + (maybe#each (|>> variance.read)))))) (def: .public (upon! f async) - (All (_ a) (-> (-> a (IO Any)) (Async a) (IO Any))) + (All (_ r w) (-> (-> r (IO Any)) (Async' r w) (IO Any))) (do [! io.monad] [.let [async (representation async)] (^.let old [_value _observers]) (atom.read! async)] (case _value {.#Some value} - (f value) + (f (variance.read value)) {.#None} - (let [new [_value {.#Item f _observers}]] - (do ! - [swapped? (atom.compare_and_swap! old new async)] - (if swapped? - (in []) - (upon! f (abstraction async)))))))) + (do ! + [swapped? (atom.compare_and_swap! old [_value {.#Item (|>> variance.read f) _observers}] async)] + (if swapped? + (in []) + (upon! f (abstraction async))))))) ) (def: .public resolved? - (All (_ a) (-> (Async a) (IO Bit))) + (All (_ r w) (-> (Async' r w) (IO Bit))) (|>> ..value (# io.functor each (|>> (pipe.case @@ -95,7 +102,12 @@ (Functor Async) (def: (each f fa) - (let [[fb resolve] (..async [])] + (let [[fb resolve] (sharing [a b] + (-> a b) + f + + [(Async b) (Resolver b)] + (..async []))] (exec (io.run! (..upon! (|>> f resolve) fa)) fb)))) @@ -106,7 +118,12 @@ (def: &functor ..functor) (def: (on fa ff) - (let [[fb resolve] (..async [])] + (let [[fb resolve] (sharing [a b] + (Async (-> a b)) + ff + + [(Async b) (Resolver b)] + (..async []))] (exec (io.run! (..upon! (function (_ f) (..upon! (|>> f resolve) fa)) @@ -121,19 +138,23 @@ (def: in ..resolved) (def: (conjoint mma) - (let [[ma resolve] (async [])] + (let [[ma resolve] (sharing [a] + (Async (Async a)) + mma + + [(Async a) (Resolver a)] + (..async []))] (exec (io.run! (..upon! (..upon! resolve) mma)) ma)))) (def: .public (and left right) - (All (_ a b) (-> (Async a) (Async b) (Async [a b]))) - (let [[read! write!] (sharing [a b] - [(Async a) (Async b)] + (All (_ lr lw rr rw) (-> (Async' lr lw) (Async' rr rw) (Async [lr rr]))) + (let [[read! write!] (sharing [lr lw rr rw] + [(Async' lr lw) (Async' rr rw)] [left right] - [(Async [a b]) - (Resolver [a b])] + [(Async [lr rr]) (Resolver [lr rr])] (..async [])) _ (io.run! (..upon! (function (_ left) (..upon! (function (_ right) @@ -143,8 +164,13 @@ read!)) (def: .public (or left right) - (All (_ a b) (-> (Async a) (Async b) (Async (Or a b)))) - (let [[a|b resolve] (..async [])] + (All (_ lr lw rr rw) (-> (Async' lr lw) (Async' rr rw) (Async (Or lr rr)))) + (let [[left|right resolve] (sharing [lr lw rr rw] + [(Async' lr lw) (Async' rr rw)] + [left right] + + [(Async (Or lr rr)) (Resolver (Or lr rr))] + (..async []))] (with_expansions [ (template [ ] [(io.run! (upon! (|>> {} resolve) ))] @@ -154,11 +180,16 @@ )] (exec - a|b)))) + left|right)))) (def: .public (either left right) - (All (_ a) (-> (Async a) (Async a) (Async a))) - (let [[left||right resolve] (..async [])] + (All (_ a lw rw) (-> (Async' a lw) (Async' a rw) (Async a))) + (let [[left||right resolve] (sharing [a lw rw] + [(Async' a lw) (Async' a rw)] + [left right] + + [(Async a) (Resolver a)] + (..async []))] (`` (exec (~~ (template [] [(io.run! (upon! resolve ))] @@ -169,7 +200,12 @@ (def: .public (schedule! milli_seconds computation) (All (_ a) (-> Nat (IO a) (Async a))) - (let [[!out resolve] (..async [])] + (let [[!out resolve] (sharing [a] + (IO a) + computation + + [(Async a) (Resolver a)] + (..async []))] (exec (|> (do io.monad [value computation] @@ -191,6 +227,6 @@ (..after milli_seconds [])) (def: .public (within milli_seconds async) - (All (_ a) (-> Nat (Async a) (Async (Maybe a)))) + (All (_ r w) (-> Nat (Async' r w) (Async (Maybe r)))) (..or (..delay milli_seconds) async)) diff --git a/stdlib/source/library/lux/control/concurrency/atom.lux b/stdlib/source/library/lux/control/concurrency/atom.lux index bb8c732a4..08533b6a5 100644 --- a/stdlib/source/library/lux/control/concurrency/atom.lux +++ b/stdlib/source/library/lux/control/concurrency/atom.lux @@ -11,76 +11,68 @@ [data ["[0]" product] [collection - ["[0]" array]]] + ["[0]" array "_" + ["[1]" \\unsafe]]]] [type - [abstract {"-" pattern}]]]]) + [abstract {"-" pattern}] + ["[0]" variance {"+" Mutable}]]]]) (with_expansions [ (these (ffi.import: (java/util/concurrent/atomic/AtomicReference a) "[1]::[0]" (new [a]) - (get [] "io" a) + (get [] a) (compareAndSet [a a] boolean)))] (for @.old @.jvm (these))) -(with_expansions [ (for @.js "js array new" - @.python "python array new" - @.lua "lua array new" - @.ruby "ruby array new" - @.php "php array new" - @.scheme "scheme array new" - (these)) - (for @.js "js array write" - @.python "python array write" - @.lua "lua array write" - @.ruby "ruby array write" - @.php "php array write" - @.scheme "scheme array write" - (these)) - - (for @.js "js array read" - @.python "python array read" - @.lua "lua array read" - @.ruby "ruby array read" - @.php "php array read" - @.scheme "scheme array read" - (these))] - (abstract: .public (Atom a) - (with_expansions [ (java/util/concurrent/atomic/AtomicReference a)] - (for @.old - @.jvm - (array.Array a))) +(abstract: .public (Atom'' a) + (with_expansions [ (java/util/concurrent/atomic/AtomicReference a)] + (for @.old + @.jvm + (array.Array a))) - (def: .public (atom value) - (All (_ a) (-> a (Atom a))) - (abstraction (with_expansions [ (java/util/concurrent/atomic/AtomicReference::new value)] - (for @.old - @.jvm - ( 0 value ( 1)))))) + (type: .public (Atom' r w) + (Atom'' (Mutable r w))) - (def: .public (read! atom) - (All (_ a) (-> (Atom a) (IO a))) - (with_expansions [ (java/util/concurrent/atomic/AtomicReference::get (representation atom))] - (for @.old - @.jvm - (io.io ( 0 (representation atom)))))) + (type: .public (Atom a) + (Atom'' (Mutable a a))) - (def: .public (compare_and_swap! current new atom) - (All (_ a) (-> a a (Atom a) (IO Bit))) - (io.io (with_expansions [ (ffi.of_boolean (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (representation atom)))] - (for @.old - @.jvm - (let [old ( 0 (representation atom))] - (if (same? old current) - (exec - ( 0 new (representation atom)) - true) - false)))))) - )) + (def: .public (atom value) + (All (_ a) (-> a (Atom a))) + (abstraction + (with_expansions [ (as_expected (java/util/concurrent/atomic/AtomicReference::new value))] + (for @.old + @.jvm + (array.has! 0 value (array.empty 1)))))) + + (def: .public (read! atom) + (All (_ r w) (-> (Atom' r w) (IO r))) + (with_expansions [ (java/util/concurrent/atomic/AtomicReference::get (representation atom))] + (io.io (for @.old (variance.read ) + @.jvm + (variance.read (array.item 0 (representation atom))))))) + + (def: .public (compare_and_swap! current new atom) + (All (_ r w) (-> r w (Atom' r w) (IO Bit))) + (io.io (for @.old (ffi.of_boolean + (java/util/concurrent/atomic/AtomicReference::compareAndSet (variance.write + (`` (as (~~ (type_of new)) + current))) + (variance.write new) + (representation atom))) + @.jvm (ffi.of_boolean + (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (representation atom))) + (let [old (array.item 0 (representation atom))] + (if (same? old current) + (exec + (array.has! 0 new (representation atom)) + true) + false))))) + ) (def: .public (update! f atom) - (All (_ a) (-> (-> a a) (Atom a) (IO [a a]))) + (All (_ r w) (-> (-> r w) (Atom' r w) (IO [r w]))) (loop (again [_ []]) (do io.monad [old (read! atom) @@ -91,7 +83,7 @@ (again []))))) (def: .public (write! value atom) - (All (_ a) (-> a (Atom a) (IO a))) + (All (_ r w) (-> w (Atom' r w) (IO r))) (|> atom (..update! (function.constant value)) (io#each product.left))) diff --git a/stdlib/source/library/lux/control/concurrency/frp.lux b/stdlib/source/library/lux/control/concurrency/frp.lux index cb0613fae..92aab2a89 100644 --- a/stdlib/source/library/lux/control/concurrency/frp.lux +++ b/stdlib/source/library/lux/control/concurrency/frp.lux @@ -12,28 +12,35 @@ ["[0]" exception {"+" exception:}] ["[0]" io {"+" IO io}]] [type {"+" sharing} - [abstract {"-" pattern}]]]] + [abstract {"-" pattern}] + ["[0]" variance {"+" Mutable}]]]] [// ["[0]" atom {"+" Atom}] - ["[0]" async {"+" Async} ("[1]#[0]" monad)]]) + ["[0]" async {"+" Async Async'} ("[1]#[0]" monad)]]) + +(type: .public (Channel'' a) + (Async (Maybe [a (Channel'' a)]))) + +(type: .public (Channel' r w) + (Channel'' (Mutable r w))) (type: .public (Channel a) - (Async (Maybe [a (Channel a)]))) + (Channel'' (Mutable a a))) (exception: .public channel_is_already_closed) -(type: .public (Sink a) +(type: .public (Sink w) (Interface (is (IO (Try Any)) close) - (is (-> a (IO (Try Any))) + (is (-> w (IO (Try Any))) feed))) -(def: (sink resolve) +(def: (sink resolution) (All (_ a) - (-> (async.Resolver (Maybe [a (Channel a)])) + (-> (async.Resolver (Maybe [(Mutable a a) (Channel a)])) (Sink a))) - (let [sink (atom.atom resolve)] + (let [sink (atom.atom resolution)] (implementation (def: close (loop (again [_ []]) @@ -57,13 +64,13 @@ (do [! io.monad] [current (atom.read! sink) .let [[next resolve_next] (sharing [a] - (async.Resolver (Maybe [a (Channel a)])) + (async.Resolver (Maybe [(Mutable a a) (Channel a)])) current - [(Async (Maybe [a (Channel a)])) - (async.Resolver (Maybe [a (Channel a)]))] + [(Channel a) + (async.Resolver (Maybe [(Mutable a a) (Channel a)]))] (async.async []))] - fed? (current {.#Some [value next]})] + fed? (current {.#Some [(variance.write value) next]})] (if fed? ... I fed the sink. (do ! @@ -90,7 +97,8 @@ (async#each (maybe#each (function (_ [head tail]) - [(f head) (each f tail)]))))) + [(variance.write (f (variance.read head))) + (each f tail)]))))) (implementation: .public apply (Apply Channel) @@ -103,7 +111,8 @@ item_a fa] (case [item_f item_a] [{.#Some [head_f tail_f]} {.#Some [head_a tail_a]}] - (in {.#Some [(head_f head_a) (on tail_a tail_f)]}) + (in {.#Some [(variance.write ((variance.read head_f) (variance.read head_a))) + (on tail_a tail_f)]}) _ (in {.#None}))))) @@ -118,10 +127,15 @@ (def: &functor ..functor) (def: (in a) - (async.resolved {.#Some [a ..empty]})) + (async.resolved {.#Some [(variance.write a) ..empty]})) (def: (conjoint mma) - (let [[output sink] (channel [])] + (let [[output sink] (sharing [a] + (Channel (Channel a)) + mma + + [(Channel a) (Sink a)] + (channel []))] (exec (is (Async Any) (loop (again [mma mma]) @@ -132,12 +146,12 @@ (do ! [_ (loop (again [ma ma]) (do ! - [?ma ma] + [?ma (variance.read ma)] (case ?ma {.#Some [a ma']} (exec - (io.run! (# sink feed a)) - (again ma')) + (io.run! (# sink feed (variance.read a))) + (again (variance.write ma'))) {.#None} (in []))))] @@ -151,7 +165,7 @@ (-> a (IO (Maybe Any)))) (def: .public (subscribe! subscriber channel) - (All (_ a) (-> (Subscriber a) (Channel a) (IO Any))) + (All (_ r w) (-> (Subscriber r) (Channel' r w) (IO Any))) (io (exec (is (Async Any) (loop (again [channel channel]) @@ -159,7 +173,7 @@ [item channel] (case item {.#Some [head tail]} - (case (io.run! (subscriber head)) + (case (io.run! (subscriber (variance.read head))) {.#Some _} (again tail) @@ -177,7 +191,7 @@ (case item {.#Some [head tail]} (let [tail' (only pass? tail)] - (if (pass? head) + (if (pass? (variance.read head)) (in {.#Some [head tail']}) tail')) @@ -187,7 +201,7 @@ (def: .public (of_async async) (All (_ a) (-> (Async a) (Channel a))) (async#each (function (_ value) - {.#Some [value ..empty]}) + {.#Some [(variance.write value) ..empty]}) async)) (def: .public (mix f init channel) @@ -202,7 +216,7 @@ {.#Some [head tail]} (do ! - [init' (f head init)] + [init' (f (variance.read head) init)] (mix f init' tail))))) (def: .public (mixes f init channel) @@ -211,7 +225,7 @@ (Channel a))) (<| async#in {.#Some} - [init] + [(variance.write init)] (do [! async.monad] [item channel] (case item @@ -220,7 +234,7 @@ {.#Some [head tail]} (do ! - [init' (f head init)] + [init' (f (variance.read head) init)] (mixes f init' tail)))))) (def: .public (poll milli_seconds action) @@ -243,12 +257,12 @@ (All (_ s o) (-> (-> s (Async (Maybe [s o]))) s (Channel o))) (do async.monad [?next (f init)] - (case ?next - {.#Some [state output]} - (in {.#Some [output (iterations f state)]}) - - {.#None} - (in {.#None})))) + (in (case ?next + {.#Some [state output]} + {.#Some [(variance.write output) (iterations f state)]} + + {.#None} + {.#None})))) (def: (distinct' equivalence previous channel) (All (_ a) (-> (Equivalence a) a (Channel a) (Channel a))) @@ -256,9 +270,9 @@ [item channel] (case item {.#Some [head tail]} - (if (# equivalence = previous head) + (if (# equivalence = previous (variance.read head)) (distinct' equivalence previous tail) - (in {.#Some [head (distinct' equivalence head tail)]})) + (in {.#Some [head (distinct' equivalence (variance.read head) tail)]})) {.#None} (in {.#None})))) @@ -269,7 +283,7 @@ [item channel] (in (case item {.#Some [head tail]} - {.#Some [head (distinct' equivalence head tail)]} + {.#Some [head (distinct' equivalence (variance.read head) tail)]} {.#None} {.#None})))) @@ -280,7 +294,7 @@ [item channel] (case item {.#Some [head tail]} - (# ! each (|>> {.#Item head}) + (# ! each (|>> {.#Item (variance.read head)}) (list tail)) {.#None} @@ -293,6 +307,7 @@ ..empty {.#Item head tail} - (async.resolved {.#Some [head (do async.monad - [_ (async.delay milli_seconds)] - (sequential milli_seconds tail))]}))) + (async.resolved {.#Some [(variance.write head) + (do async.monad + [_ (async.delay milli_seconds)] + (sequential milli_seconds tail))]}))) diff --git a/stdlib/source/library/lux/control/concurrency/semaphore.lux b/stdlib/source/library/lux/control/concurrency/semaphore.lux index 4b5f886a4..05ce1aef6 100644 --- a/stdlib/source/library/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/library/lux/control/concurrency/semaphore.lux @@ -106,7 +106,7 @@ (|>> representation ..wait!)) (def: release! - (-> Mutex (Async Any)) + (-> Mutex (Async (Try Int))) (|>> representation ..signal!)) (def: .public (synchronize! mutex procedure) diff --git a/stdlib/source/library/lux/control/concurrency/stm.lux b/stdlib/source/library/lux/control/concurrency/stm.lux index be8203dde..918283f70 100644 --- a/stdlib/source/library/lux/control/concurrency/stm.lux +++ b/stdlib/source/library/lux/control/concurrency/stm.lux @@ -15,8 +15,9 @@ ["[0]" list]]] [macro ["^" pattern]] - [type - [abstract {"-" pattern}]]]] + [type {"+" sharing} + [abstract {"-" pattern}] + ["[0]" variance {"+" Mutable}]]]] [// ["[0]" atom {"+" Atom atom}] ["[0]" async {"+" Async Resolver}] @@ -25,70 +26,82 @@ (type: (Observer a) (-> a (IO Any))) -(abstract: .public (Var a) +(abstract: .public (Var'' a) (Atom [a (List (Sink a))]) + (type: .public (Var' r w) + (Var'' (Mutable r w))) + + (type: .public (Var a) + (Var'' (Mutable a a))) + (def: .public (var value) (All (_ a) (-> a (Var a))) - (abstraction (atom.atom [value (list)]))) + (abstraction (atom.atom [(variance.write value) (list)]))) (def: read! - (All (_ a) (-> (Var a) a)) - (|>> representation atom.read! io.run! product.left)) - - (def: (un_follow! sink var) - (All (_ a) (-> (Sink a) (Var a) (IO Any))) - (do io.monad - [_ (atom.update! (function (_ [value observers]) - [value (list.only (|>> (same? sink) not) observers)]) - (representation var))] - (in []))) + (All (_ r w) (-> (Var' r w) r)) + (|>> representation atom.read! io.run! product.left variance.read)) (def: (write! new_value var) - (All (_ a) (-> a (Var a) (IO Any))) + (All (_ r w) (-> w (Var' r w) (IO Any))) (do [! io.monad] [.let [var' (representation var)] - (^.let old [old_value observers]) (atom.read! var') - succeeded? (atom.compare_and_swap! old [new_value observers] var')] + (^.let old [_ observers]) (atom.read! var') + succeeded? (atom.compare_and_swap! old [(variance.write new_value) observers] var')] (if succeeded? (do ! - [_ (monad.each ! (function (_ sink) - (do ! - [result (# sink feed new_value)] - (case result - {try.#Success _} - (in []) - - {try.#Failure _} - (un_follow! sink var)))) - observers)] + [banned (monad.only ! (function (_ sink) + (do ! + [result (# sink feed (variance.write new_value))] + (in (case result + {try.#Success _} + false + + {try.#Failure _} + true)))) + observers) + _ (atom.update! (function (_ [value audience]) + (|> audience + (list.only (function (_ it) + (not (list.any? (same? it) banned)))) + [value])) + var')] (in [])) (write! new_value var)))) (def: .public (changes target) (All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)]))) (do io.monad - [.let [[channel sink] (frp.channel [])] + [.let [[channel sink] (sharing [a] + (Var a) + target + + [(Channel a) (Sink a)] + (frp.channel []))] _ (atom.update! (function (_ [value observers]) - [value {.#Item sink observers}]) + [value {.#Item (implementation + (def: close (# sink close)) + (def: feed (|>> variance.read (# sink feed)))) + observers}]) (representation target))] (in [channel sink]))) ) -(type: (Tx_Frame a) +(type: (Tx_Frame r w) (Record - [#var (Var a) - #original a - #current a])) + [#var (Var' r w) + #original r + #current w])) (type: Tx - (List (Ex (_ a) (Tx_Frame a)))) + (List (Ex (_ r w) (Tx_Frame r w)))) (type: .public (STM a) (-> Tx [Tx a])) (def: (var_value var tx) - (All (_ a) (-> (Var a) Tx (Maybe a))) + (All (_ r w) (-> (Var' r w) Tx (Maybe r))) (|> tx (list.example (function (_ [_var _original _current]) (same? (as (Var Any) var) @@ -98,7 +111,7 @@ as_expected)) (def: .public (read var) - (All (_ a) (-> (Var a) (STM a))) + (All (_ r w) (-> (Var' r w) (STM r))) (function (_ tx) (case (var_value var tx) {.#Some value} @@ -106,11 +119,14 @@ {.#None} (let [value (..read! var)] - [{.#Item [var value value] tx} + [{.#Item [#var var + #original value + #current (as_expected value)] + tx} value])))) (def: (with_updated_var var value tx) - (All (_ a) (-> (Var a) a Tx Tx)) + (All (_ r w) (-> (Var' r w) w Tx Tx)) (case tx {.#End} {.#End} @@ -118,9 +134,9 @@ {.#Item [_var _original _current] tx'} (if (same? (as (Var Any) var) (as (Var Any) _var)) - {.#Item [#var (as (Var Any) _var) - #original (as Any _original) - #current (as Any value)] + {.#Item [#var _var + #original _original + #current (as_expected value)] tx'} {.#Item [#var _var #original _original @@ -128,7 +144,7 @@ (with_updated_var var value tx')}))) (def: .public (write value var) - (All (_ a) (-> a (Var a) (STM Any))) + (All (_ r w) (-> w (Var' r w) (STM Any))) (function (_ tx) (case (var_value var tx) {.#Some _} @@ -136,7 +152,10 @@ []] {.#None} - [{.#Item [var (..read! var) value] tx} + [{.#Item [#var var + #original (..read! var) + #current value] + tx} []]))) (implementation: .public functor @@ -173,12 +192,12 @@ (ma tx'))))) (def: .public (update f var) - (All (_ a) (-> (-> a a) (Var a) (STM [a a]))) + (All (_ r w) (-> (-> r w) (Var' r w) (STM [r w]))) (do ..monad - [a (..read var) - .let [a' (f a)] - _ (..write a' var)] - (in [a a']))) + [before (..read var) + .let [after (f before)] + _ (..write after var)] + (in [before after]))) (def: (can_commit? tx) (-> Tx Bit) @@ -187,8 +206,8 @@ tx)) (def: (commit_var! [_var _original _current]) - (-> (Ex (_ a) (Tx_Frame a)) (IO Any)) - (if (same? _original _current) + (-> (Ex (_ r w) (Tx_Frame r w)) (IO Any)) + (if (same? (as Any _original) (as Any _current)) (io []) (..write! _current _var))) diff --git a/stdlib/source/library/lux/control/concurrency/thread.lux b/stdlib/source/library/lux/control/concurrency/thread.lux index db9c06846..f8d92bc77 100644 --- a/stdlib/source/library/lux/control/concurrency/thread.lux +++ b/stdlib/source/library/lux/control/concurrency/thread.lux @@ -122,14 +122,14 @@ (def: .public (schedule! milli_seconds action) (-> Nat (IO Any) (IO Any)) - (with_expansions [ (these (let [runnable (ffi.object [] [java/lang/Runnable] - [] - (java/lang/Runnable [] (run self []) void - (..execute! action)))] - (case milli_seconds - 0 (java/util/concurrent/Executor::execute runnable runner) - _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS - runner))))] + (with_expansions [ (let [runnable (ffi.object [] [java/lang/Runnable] + [] + (java/lang/Runnable [] (run self []) void + (..execute! action)))] + (case milli_seconds + 0 (java/util/concurrent/Executor::execute runnable runner) + _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS + runner)))] (for @.old @.jvm diff --git a/stdlib/source/library/lux/control/thread.lux b/stdlib/source/library/lux/control/thread.lux index 4b5fdfc35..6a65b2650 100644 --- a/stdlib/source/library/lux/control/thread.lux +++ b/stdlib/source/library/lux/control/thread.lux @@ -10,48 +10,46 @@ ["[0]" io {"+" IO}]] [data [collection - ["[0]" array {"+" Array}]]] + ["[0]" array "_" + ["[1]" \\unsafe {"+" Array}]]]] [type - [abstract {"-" pattern}]]]]) + [abstract {"-" pattern}] + ["[0]" variance {"+" Mutable}]]]]) (type: .public (Thread ! a) (-> ! a)) -(abstract: .public (Box t v) - (Array v) +(abstract: .public (Box'' t a) + (Array a) + + (type: .public (Box' t r w) + (Box'' t (Mutable r w))) + + (type: .public (Box t a) + (Box'' t (Mutable a a))) (def: .public (box init) (All (_ a) (-> a (All (_ !) (Thread ! (Box ! a))))) (function (_ !) (|> (array.empty 1) - (array.has! 0 init) + (array.has! 0 (variance.write init)) abstraction))) (def: .public (read! box) - (All (_ ! a) (-> (Box ! a) (Thread ! a))) + (All (_ ! r w) (-> (Box' ! r w) (Thread ! r))) (function (_ !) - (for @.old - ("jvm aaload" (representation box) 0) - - @.jvm - ("jvm array read object" - (|> 0 - (as (Primitive "java.lang.Long")) - "jvm object cast" - "jvm conversion long-to-int") - (representation box)) - - @.js ("js array read" 0 (representation box)) - @.python ("python array read" 0 (representation box)) - @.lua ("lua array read" 0 (representation box)) - @.ruby ("ruby array read" 0 (representation box)) - @.php ("php array read" 0 (representation box)) - @.scheme ("scheme array read" 0 (representation box))))) + (|> box + representation + (array.item 0) + variance.read))) (def: .public (write! value box) - (All (_ a) (-> a (All (_ !) (-> (Box ! a) (Thread ! Any))))) + (All (_ r w) (-> w (All (_ !) (-> (Box' ! r w) (Thread ! Any))))) (function (_ !) - (|> box representation (array.has! 0 value) abstraction))) + (|> box + representation + (array.has! 0 (variance.write value)) + abstraction))) ) (def: .public (result thread) @@ -97,8 +95,9 @@ ((ffa !) !)))) (def: .public (update! f box) - (All (_ a !) (-> (-> a a) (Box ! a) (Thread ! a))) + (All (_ ! r w) (-> (-> r w) (Box' ! r w) (Thread ! [r w]))) (do ..monad [old (read! box) - _ (write! (f old) box)] - (in old))) + .let [new (f old)] + _ (write! new box)] + (in [old new]))) diff --git a/stdlib/source/library/lux/tool/compiler/default/platform.lux b/stdlib/source/library/lux/tool/compiler/default/platform.lux index bf809bbb5..1facd15f6 100644 --- a/stdlib/source/library/lux/tool/compiler/default/platform.lux +++ b/stdlib/source/library/lux/tool/compiler/default/platform.lux @@ -115,7 +115,10 @@ (let [system (the #&file_system platform) write_artifact! (is (-> [artifact.ID (Maybe Text) Binary] (Action Any)) (function (_ [artifact_id custom content]) - (cache/artifact.cache! system context @module artifact_id content)))] + (with_expansions [ (cache/artifact.cache! system context @module artifact_id content)] + (for @.old (as (Async (Try Any)) + ) + ))))] (do [! ..monad] [_ (is (Async (Try Any)) (cache/module.enable! async.monad system context @module)) @@ -132,12 +135,15 @@ (is (Action (List Any))))) document (# async.monad in (document.marked? key (the [archive.#module module.#document] entry)))] - (|> [(|> entry - (the archive.#module) - (has module.#document document)) - (the archive.#registry entry)] - (_.result (..writer format)) - (cache/module.cache! system context @module))))) + (with_expansions [ (|> [(|> entry + (the archive.#module) + (has module.#document document)) + (the archive.#registry entry)] + (_.result (..writer format)) + (cache/module.cache! system context @module))] + (for @.old (as (Async (Try Any)) + ) + ))))) ... TODO: Inline ASAP (def: initialize_buffer! diff --git a/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux b/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux index 26298275f..bd1c6aa1e 100644 --- a/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux +++ b/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux @@ -332,9 +332,15 @@ (list.only (|>> product.left (dictionary.key? purge) not)) (monad.each ! (function (_ [module_name [@module entry]]) (do ! - [[entry bundles] (..load_definitions fs context @module host_environment entry)] - (in [[module_name entry] - bundles])))))] + [[entry bundles] (with_expansions [ (..load_definitions fs context @module host_environment entry)] + (for @.old (as (Async (Try [(archive.Entry .Module) Bundles])) + ) + ))] + (in (with_expansions [ [[module_name entry] + bundles]] + (for @.old (as [[descriptor.Module (archive.Entry .Module)] Bundles] + ) + )))))))] (in it))) (def: (load_every_reserved_module customs configuration host_environment fs context import contexts archive) diff --git a/stdlib/source/test/lux/control/concurrency/frp.lux b/stdlib/source/test/lux/control/concurrency/frp.lux index 3c804559e..90d5f17fe 100644 --- a/stdlib/source/test/lux/control/concurrency/frp.lux +++ b/stdlib/source/test/lux/control/concurrency/frp.lux @@ -19,7 +19,9 @@ [math ["[0]" random] [number - ["n" nat]]]]] + ["n" nat]]] + [type + ["[0]" variance]]]] [\\library ["[0]" / [// @@ -41,7 +43,7 @@ (in (case [?left ?right] [{.#Some {.#Some [left _]}} {.#Some {.#Some [right _]}}] - (== left right) + (== (variance.read left) (variance.read right)) _ false)))))) @@ -59,7 +61,7 @@ (in {.#End}) {.#Some [head tail]} - (# ! each (|>> {.#Item head}) + (# ! each (|>> {.#Item (variance.read head)}) (take_amount (-- amount_of_polls) [channel sink])))))) (def: .public test @@ -97,7 +99,7 @@ [?actual (async.value channel)] (in (case ?actual {.#Some {.#Some [actual _]}} - (n.= sample actual) + (n.= sample (variance.read actual)) _ false)))) diff --git a/stdlib/source/test/lux/control/thread.lux b/stdlib/source/test/lux/control/thread.lux index 5da8a5e95..cfff55018 100644 --- a/stdlib/source/test/lux/control/thread.lux +++ b/stdlib/source/test/lux/control/thread.lux @@ -77,6 +77,6 @@ (/.result (is (All (_ !) (Thread ! Nat)) (do /.monad [box (/.box sample) - old (/.update! (n.* factor) box)] - (/.read! box)))))))) + [old new] (/.update! (n.* factor) box)] + (in new)))))))) )))) -- cgit v1.2.3