aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/library/lux/control/concurrency/actor.lux4
-rw-r--r--stdlib/source/library/lux/control/concurrency/async.lux110
-rw-r--r--stdlib/source/library/lux/control/concurrency/atom.lux104
-rw-r--r--stdlib/source/library/lux/control/concurrency/frp.lux93
-rw-r--r--stdlib/source/library/lux/control/concurrency/semaphore.lux2
-rw-r--r--stdlib/source/library/lux/control/concurrency/stm.lux119
-rw-r--r--stdlib/source/library/lux/control/concurrency/thread.lux16
-rw-r--r--stdlib/source/library/lux/control/thread.lux55
-rw-r--r--stdlib/source/library/lux/tool/compiler/default/platform.lux20
-rw-r--r--stdlib/source/library/lux/tool/compiler/meta/io/archive.lux12
-rw-r--r--stdlib/source/test/lux/control/concurrency/frp.lux10
-rw-r--r--stdlib/source/test/lux/control/thread.lux4
12 files changed, 312 insertions, 237 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux
index 06eb74e44..8334f6f41 100644
--- a/stdlib/source/library/lux/control/concurrency/actor.lux
+++ b/stdlib/source/library/lux/control/concurrency/actor.lux
@@ -33,7 +33,7 @@
[//
["[0]" atom {"+" Atom atom}]
["[0]" async {"+" Async Resolver} ("[1]#[0]" monad)]
- ["[0]" frp {"+" Channel}]])
+ ["[0]" frp {"+" Channel Channel'}]])
(exception: .public poisoned)
(exception: .public dead)
@@ -350,7 +350,7 @@
(def: stop! false)
(def: .public (observe! action channel actor)
- (All (_ e s) (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any)))
+ (All (_ r w s) (-> (-> r Stop (Mail s)) (Channel' r w) (Actor s) (IO Any)))
(let [signal (is (Atom Bit)
(atom.atom ..continue!))
stop (is Stop
diff --git a/stdlib/source/library/lux/control/concurrency/async.lux b/stdlib/source/library/lux/control/concurrency/async.lux
index 6070efb3e..e8edda452 100644
--- a/stdlib/source/library/lux/control/concurrency/async.lux
+++ b/stdlib/source/library/lux/control/concurrency/async.lux
@@ -8,26 +8,34 @@
[control
["[0]" pipe]
["[0]" function]
- ["[0]" io {"+" IO io}]]
+ ["[0]" io {"+" IO io}]
+ ["[0]" maybe ("[1]#[0]" functor)]]
[data
["[0]" product]]
[macro
["^" pattern]]
[type {"+" sharing}
- [abstract {"-" pattern}]]]]
+ [abstract {"-" pattern}]
+ ["[0]" variance {"+" Mutable}]]]]
[//
["[0]" thread]
["[0]" atom {"+" Atom atom}]])
-(abstract: .public (Async a)
+(abstract: .public (Async'' a)
(Atom [(Maybe a) (List (-> a (IO Any)))])
- (type: .public (Resolver a)
- (-> a (IO Bit)))
+ (type: .public (Async' r w)
+ (Async'' (Mutable r w)))
+
+ (type: .public (Async a)
+ (Async'' (Mutable a a)))
+
+ (type: .public (Resolver w)
+ (-> w (IO Bit)))
... Sets an async's value if it has not been done yet.
(def: (resolver async)
- (All (_ a) (-> (Async a) (Resolver a)))
+ (All (_ r w) (-> (Async' r w) (Resolver w)))
(function (resolve value)
(let [async (representation async)]
(do [! io.monad]
@@ -38,50 +46,49 @@
{.#None}
(do !
- [.let [new [{.#Some value} {.#None}]]
- succeeded? (atom.compare_and_swap! old new async)]
+ [succeeded? (atom.compare_and_swap! old [{.#Some (variance.write value)} (list)] async)]
(if succeeded?
(do !
- [_ (monad.each ! (function (_ f) (f value))
+ [_ (monad.each ! (function.on (variance.write value))
_observers)]
(in #1))
(resolve value))))))))
(def: .public (resolved value)
(All (_ a) (-> a (Async a)))
- (abstraction (atom [{.#Some value} (list)])))
+ (abstraction (atom [{.#Some (variance.write value)} (list)])))
(def: .public (async _)
- (All (_ a) (-> Any [(Async a) (Resolver a)]))
+ (All (_ r w) (-> Any [(Async' r w) (Resolver w)]))
(let [async (abstraction (atom [{.#None} (list)]))]
[async (..resolver async)]))
(def: .public value
- (All (_ a) (-> (Async a) (IO (Maybe a))))
+ (All (_ r w) (-> (Async' r w) (IO (Maybe r))))
(|>> representation
atom.read!
- (# io.functor each product.left)))
+ (# io.functor each (|>> product.left
+ (maybe#each (|>> variance.read))))))
(def: .public (upon! f async)
- (All (_ a) (-> (-> a (IO Any)) (Async a) (IO Any)))
+ (All (_ r w) (-> (-> r (IO Any)) (Async' r w) (IO Any)))
(do [! io.monad]
[.let [async (representation async)]
(^.let old [_value _observers]) (atom.read! async)]
(case _value
{.#Some value}
- (f value)
+ (f (variance.read value))
{.#None}
- (let [new [_value {.#Item f _observers}]]
- (do !
- [swapped? (atom.compare_and_swap! old new async)]
- (if swapped?
- (in [])
- (upon! f (abstraction async))))))))
+ (do !
+ [swapped? (atom.compare_and_swap! old [_value {.#Item (|>> variance.read f) _observers}] async)]
+ (if swapped?
+ (in [])
+ (upon! f (abstraction async)))))))
)
(def: .public resolved?
- (All (_ a) (-> (Async a) (IO Bit)))
+ (All (_ r w) (-> (Async' r w) (IO Bit)))
(|>> ..value
(# io.functor each
(|>> (pipe.case
@@ -95,7 +102,12 @@
(Functor Async)
(def: (each f fa)
- (let [[fb resolve] (..async [])]
+ (let [[fb resolve] (sharing [a b]
+ (-> a b)
+ f
+
+ [(Async b) (Resolver b)]
+ (..async []))]
(exec
(io.run! (..upon! (|>> f resolve) fa))
fb))))
@@ -106,7 +118,12 @@
(def: &functor ..functor)
(def: (on fa ff)
- (let [[fb resolve] (..async [])]
+ (let [[fb resolve] (sharing [a b]
+ (Async (-> a b))
+ ff
+
+ [(Async b) (Resolver b)]
+ (..async []))]
(exec
(io.run! (..upon! (function (_ f)
(..upon! (|>> f resolve) fa))
@@ -121,19 +138,23 @@
(def: in ..resolved)
(def: (conjoint mma)
- (let [[ma resolve] (async [])]
+ (let [[ma resolve] (sharing [a]
+ (Async (Async a))
+ mma
+
+ [(Async a) (Resolver a)]
+ (..async []))]
(exec
(io.run! (..upon! (..upon! resolve) mma))
ma))))
(def: .public (and left right)
- (All (_ a b) (-> (Async a) (Async b) (Async [a b])))
- (let [[read! write!] (sharing [a b]
- [(Async a) (Async b)]
+ (All (_ lr lw rr rw) (-> (Async' lr lw) (Async' rr rw) (Async [lr rr])))
+ (let [[read! write!] (sharing [lr lw rr rw]
+ [(Async' lr lw) (Async' rr rw)]
[left right]
- [(Async [a b])
- (Resolver [a b])]
+ [(Async [lr rr]) (Resolver [lr rr])]
(..async []))
_ (io.run! (..upon! (function (_ left)
(..upon! (function (_ right)
@@ -143,8 +164,13 @@
read!))
(def: .public (or left right)
- (All (_ a b) (-> (Async a) (Async b) (Async (Or a b))))
- (let [[a|b resolve] (..async [])]
+ (All (_ lr lw rr rw) (-> (Async' lr lw) (Async' rr rw) (Async (Or lr rr))))
+ (let [[left|right resolve] (sharing [lr lw rr rw]
+ [(Async' lr lw) (Async' rr rw)]
+ [left right]
+
+ [(Async (Or lr rr)) (Resolver (Or lr rr))]
+ (..async []))]
(with_expansions
[<sides> (template [<async> <tag>]
[(io.run! (upon! (|>> {<tag>} resolve) <async>))]
@@ -154,11 +180,16 @@
)]
(exec
<sides>
- a|b))))
+ left|right))))
(def: .public (either left right)
- (All (_ a) (-> (Async a) (Async a) (Async a)))
- (let [[left||right resolve] (..async [])]
+ (All (_ a lw rw) (-> (Async' a lw) (Async' a rw) (Async a)))
+ (let [[left||right resolve] (sharing [a lw rw]
+ [(Async' a lw) (Async' a rw)]
+ [left right]
+
+ [(Async a) (Resolver a)]
+ (..async []))]
(`` (exec
(~~ (template [<async>]
[(io.run! (upon! resolve <async>))]
@@ -169,7 +200,12 @@
(def: .public (schedule! milli_seconds computation)
(All (_ a) (-> Nat (IO a) (Async a)))
- (let [[!out resolve] (..async [])]
+ (let [[!out resolve] (sharing [a]
+ (IO a)
+ computation
+
+ [(Async a) (Resolver a)]
+ (..async []))]
(exec
(|> (do io.monad
[value computation]
@@ -191,6 +227,6 @@
(..after milli_seconds []))
(def: .public (within milli_seconds async)
- (All (_ a) (-> Nat (Async a) (Async (Maybe a))))
+ (All (_ r w) (-> Nat (Async' r w) (Async (Maybe r))))
(..or (..delay milli_seconds)
async))
diff --git a/stdlib/source/library/lux/control/concurrency/atom.lux b/stdlib/source/library/lux/control/concurrency/atom.lux
index bb8c732a4..08533b6a5 100644
--- a/stdlib/source/library/lux/control/concurrency/atom.lux
+++ b/stdlib/source/library/lux/control/concurrency/atom.lux
@@ -11,76 +11,68 @@
[data
["[0]" product]
[collection
- ["[0]" array]]]
+ ["[0]" array "_"
+ ["[1]" \\unsafe]]]]
[type
- [abstract {"-" pattern}]]]])
+ [abstract {"-" pattern}]
+ ["[0]" variance {"+" Mutable}]]]])
(with_expansions [<jvm> (these (ffi.import: (java/util/concurrent/atomic/AtomicReference a)
"[1]::[0]"
(new [a])
- (get [] "io" a)
+ (get [] a)
(compareAndSet [a a] boolean)))]
(for @.old <jvm>
@.jvm <jvm>
(these)))
-(with_expansions [<new> (for @.js "js array new"
- @.python "python array new"
- @.lua "lua array new"
- @.ruby "ruby array new"
- @.php "php array new"
- @.scheme "scheme array new"
- (these))
- <write> (for @.js "js array write"
- @.python "python array write"
- @.lua "lua array write"
- @.ruby "ruby array write"
- @.php "php array write"
- @.scheme "scheme array write"
- (these))
-
- <read> (for @.js "js array read"
- @.python "python array read"
- @.lua "lua array read"
- @.ruby "ruby array read"
- @.php "php array read"
- @.scheme "scheme array read"
- (these))]
- (abstract: .public (Atom a)
- (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)]
- (for @.old <jvm>
- @.jvm <jvm>
- (array.Array a)))
+(abstract: .public (Atom'' a)
+ (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)]
+ (for @.old <jvm>
+ @.jvm <jvm>
+ (array.Array a)))
- (def: .public (atom value)
- (All (_ a) (-> a (Atom a)))
- (abstraction (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::new value)]
- (for @.old <jvm>
- @.jvm <jvm>
- (<write> 0 value (<new> 1))))))
+ (type: .public (Atom' r w)
+ (Atom'' (Mutable r w)))
- (def: .public (read! atom)
- (All (_ a) (-> (Atom a) (IO a)))
- (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (representation atom))]
- (for @.old <jvm>
- @.jvm <jvm>
- (io.io (<read> 0 (representation atom))))))
+ (type: .public (Atom a)
+ (Atom'' (Mutable a a)))
- (def: .public (compare_and_swap! current new atom)
- (All (_ a) (-> a a (Atom a) (IO Bit)))
- (io.io (with_expansions [<jvm> (ffi.of_boolean (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (representation atom)))]
- (for @.old <jvm>
- @.jvm <jvm>
- (let [old (<read> 0 (representation atom))]
- (if (same? old current)
- (exec
- (<write> 0 new (representation atom))
- true)
- false))))))
- ))
+ (def: .public (atom value)
+ (All (_ a) (-> a (Atom a)))
+ (abstraction
+ (with_expansions [<jvm> (as_expected (java/util/concurrent/atomic/AtomicReference::new value))]
+ (for @.old <jvm>
+ @.jvm <jvm>
+ (array.has! 0 value (array.empty 1))))))
+
+ (def: .public (read! atom)
+ (All (_ r w) (-> (Atom' r w) (IO r)))
+ (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (representation atom))]
+ (io.io (for @.old (variance.read <jvm>)
+ @.jvm <jvm>
+ (variance.read (array.item 0 (representation atom)))))))
+
+ (def: .public (compare_and_swap! current new atom)
+ (All (_ r w) (-> r w (Atom' r w) (IO Bit)))
+ (io.io (for @.old (ffi.of_boolean
+ (java/util/concurrent/atomic/AtomicReference::compareAndSet (variance.write
+ (`` (as (~~ (type_of new))
+ current)))
+ (variance.write new)
+ (representation atom)))
+ @.jvm (ffi.of_boolean
+ (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (representation atom)))
+ (let [old (array.item 0 (representation atom))]
+ (if (same? old current)
+ (exec
+ (array.has! 0 new (representation atom))
+ true)
+ false)))))
+ )
(def: .public (update! f atom)
- (All (_ a) (-> (-> a a) (Atom a) (IO [a a])))
+ (All (_ r w) (-> (-> r w) (Atom' r w) (IO [r w])))
(loop (again [_ []])
(do io.monad
[old (read! atom)
@@ -91,7 +83,7 @@
(again [])))))
(def: .public (write! value atom)
- (All (_ a) (-> a (Atom a) (IO a)))
+ (All (_ r w) (-> w (Atom' r w) (IO r)))
(|> atom
(..update! (function.constant value))
(io#each product.left)))
diff --git a/stdlib/source/library/lux/control/concurrency/frp.lux b/stdlib/source/library/lux/control/concurrency/frp.lux
index cb0613fae..92aab2a89 100644
--- a/stdlib/source/library/lux/control/concurrency/frp.lux
+++ b/stdlib/source/library/lux/control/concurrency/frp.lux
@@ -12,28 +12,35 @@
["[0]" exception {"+" exception:}]
["[0]" io {"+" IO io}]]
[type {"+" sharing}
- [abstract {"-" pattern}]]]]
+ [abstract {"-" pattern}]
+ ["[0]" variance {"+" Mutable}]]]]
[//
["[0]" atom {"+" Atom}]
- ["[0]" async {"+" Async} ("[1]#[0]" monad)]])
+ ["[0]" async {"+" Async Async'} ("[1]#[0]" monad)]])
+
+(type: .public (Channel'' a)
+ (Async (Maybe [a (Channel'' a)])))
+
+(type: .public (Channel' r w)
+ (Channel'' (Mutable r w)))
(type: .public (Channel a)
- (Async (Maybe [a (Channel a)])))
+ (Channel'' (Mutable a a)))
(exception: .public channel_is_already_closed)
-(type: .public (Sink a)
+(type: .public (Sink w)
(Interface
(is (IO (Try Any))
close)
- (is (-> a (IO (Try Any)))
+ (is (-> w (IO (Try Any)))
feed)))
-(def: (sink resolve)
+(def: (sink resolution)
(All (_ a)
- (-> (async.Resolver (Maybe [a (Channel a)]))
+ (-> (async.Resolver (Maybe [(Mutable a a) (Channel a)]))
(Sink a)))
- (let [sink (atom.atom resolve)]
+ (let [sink (atom.atom resolution)]
(implementation
(def: close
(loop (again [_ []])
@@ -57,13 +64,13 @@
(do [! io.monad]
[current (atom.read! sink)
.let [[next resolve_next] (sharing [a]
- (async.Resolver (Maybe [a (Channel a)]))
+ (async.Resolver (Maybe [(Mutable a a) (Channel a)]))
current
- [(Async (Maybe [a (Channel a)]))
- (async.Resolver (Maybe [a (Channel a)]))]
+ [(Channel a)
+ (async.Resolver (Maybe [(Mutable a a) (Channel a)]))]
(async.async []))]
- fed? (current {.#Some [value next]})]
+ fed? (current {.#Some [(variance.write value) next]})]
(if fed?
... I fed the sink.
(do !
@@ -90,7 +97,8 @@
(async#each
(maybe#each
(function (_ [head tail])
- [(f head) (each f tail)])))))
+ [(variance.write (f (variance.read head)))
+ (each f tail)])))))
(implementation: .public apply
(Apply Channel)
@@ -103,7 +111,8 @@
item_a fa]
(case [item_f item_a]
[{.#Some [head_f tail_f]} {.#Some [head_a tail_a]}]
- (in {.#Some [(head_f head_a) (on tail_a tail_f)]})
+ (in {.#Some [(variance.write ((variance.read head_f) (variance.read head_a)))
+ (on tail_a tail_f)]})
_
(in {.#None})))))
@@ -118,10 +127,15 @@
(def: &functor ..functor)
(def: (in a)
- (async.resolved {.#Some [a ..empty]}))
+ (async.resolved {.#Some [(variance.write a) ..empty]}))
(def: (conjoint mma)
- (let [[output sink] (channel [])]
+ (let [[output sink] (sharing [a]
+ (Channel (Channel a))
+ mma
+
+ [(Channel a) (Sink a)]
+ (channel []))]
(exec
(is (Async Any)
(loop (again [mma mma])
@@ -132,12 +146,12 @@
(do !
[_ (loop (again [ma ma])
(do !
- [?ma ma]
+ [?ma (variance.read ma)]
(case ?ma
{.#Some [a ma']}
(exec
- (io.run! (# sink feed a))
- (again ma'))
+ (io.run! (# sink feed (variance.read a)))
+ (again (variance.write ma')))
{.#None}
(in []))))]
@@ -151,7 +165,7 @@
(-> a (IO (Maybe Any))))
(def: .public (subscribe! subscriber channel)
- (All (_ a) (-> (Subscriber a) (Channel a) (IO Any)))
+ (All (_ r w) (-> (Subscriber r) (Channel' r w) (IO Any)))
(io (exec
(is (Async Any)
(loop (again [channel channel])
@@ -159,7 +173,7 @@
[item channel]
(case item
{.#Some [head tail]}
- (case (io.run! (subscriber head))
+ (case (io.run! (subscriber (variance.read head)))
{.#Some _}
(again tail)
@@ -177,7 +191,7 @@
(case item
{.#Some [head tail]}
(let [tail' (only pass? tail)]
- (if (pass? head)
+ (if (pass? (variance.read head))
(in {.#Some [head tail']})
tail'))
@@ -187,7 +201,7 @@
(def: .public (of_async async)
(All (_ a) (-> (Async a) (Channel a)))
(async#each (function (_ value)
- {.#Some [value ..empty]})
+ {.#Some [(variance.write value) ..empty]})
async))
(def: .public (mix f init channel)
@@ -202,7 +216,7 @@
{.#Some [head tail]}
(do !
- [init' (f head init)]
+ [init' (f (variance.read head) init)]
(mix f init' tail)))))
(def: .public (mixes f init channel)
@@ -211,7 +225,7 @@
(Channel a)))
(<| async#in
{.#Some}
- [init]
+ [(variance.write init)]
(do [! async.monad]
[item channel]
(case item
@@ -220,7 +234,7 @@
{.#Some [head tail]}
(do !
- [init' (f head init)]
+ [init' (f (variance.read head) init)]
(mixes f init' tail))))))
(def: .public (poll milli_seconds action)
@@ -243,12 +257,12 @@
(All (_ s o) (-> (-> s (Async (Maybe [s o]))) s (Channel o)))
(do async.monad
[?next (f init)]
- (case ?next
- {.#Some [state output]}
- (in {.#Some [output (iterations f state)]})
-
- {.#None}
- (in {.#None}))))
+ (in (case ?next
+ {.#Some [state output]}
+ {.#Some [(variance.write output) (iterations f state)]}
+
+ {.#None}
+ {.#None}))))
(def: (distinct' equivalence previous channel)
(All (_ a) (-> (Equivalence a) a (Channel a) (Channel a)))
@@ -256,9 +270,9 @@
[item channel]
(case item
{.#Some [head tail]}
- (if (# equivalence = previous head)
+ (if (# equivalence = previous (variance.read head))
(distinct' equivalence previous tail)
- (in {.#Some [head (distinct' equivalence head tail)]}))
+ (in {.#Some [head (distinct' equivalence (variance.read head) tail)]}))
{.#None}
(in {.#None}))))
@@ -269,7 +283,7 @@
[item channel]
(in (case item
{.#Some [head tail]}
- {.#Some [head (distinct' equivalence head tail)]}
+ {.#Some [head (distinct' equivalence (variance.read head) tail)]}
{.#None}
{.#None}))))
@@ -280,7 +294,7 @@
[item channel]
(case item
{.#Some [head tail]}
- (# ! each (|>> {.#Item head})
+ (# ! each (|>> {.#Item (variance.read head)})
(list tail))
{.#None}
@@ -293,6 +307,7 @@
..empty
{.#Item head tail}
- (async.resolved {.#Some [head (do async.monad
- [_ (async.delay milli_seconds)]
- (sequential milli_seconds tail))]})))
+ (async.resolved {.#Some [(variance.write head)
+ (do async.monad
+ [_ (async.delay milli_seconds)]
+ (sequential milli_seconds tail))]})))
diff --git a/stdlib/source/library/lux/control/concurrency/semaphore.lux b/stdlib/source/library/lux/control/concurrency/semaphore.lux
index 4b5f886a4..05ce1aef6 100644
--- a/stdlib/source/library/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/library/lux/control/concurrency/semaphore.lux
@@ -106,7 +106,7 @@
(|>> representation ..wait!))
(def: release!
- (-> Mutex (Async Any))
+ (-> Mutex (Async (Try Int)))
(|>> representation ..signal!))
(def: .public (synchronize! mutex procedure)
diff --git a/stdlib/source/library/lux/control/concurrency/stm.lux b/stdlib/source/library/lux/control/concurrency/stm.lux
index be8203dde..918283f70 100644
--- a/stdlib/source/library/lux/control/concurrency/stm.lux
+++ b/stdlib/source/library/lux/control/concurrency/stm.lux
@@ -15,8 +15,9 @@
["[0]" list]]]
[macro
["^" pattern]]
- [type
- [abstract {"-" pattern}]]]]
+ [type {"+" sharing}
+ [abstract {"-" pattern}]
+ ["[0]" variance {"+" Mutable}]]]]
[//
["[0]" atom {"+" Atom atom}]
["[0]" async {"+" Async Resolver}]
@@ -25,70 +26,82 @@
(type: (Observer a)
(-> a (IO Any)))
-(abstract: .public (Var a)
+(abstract: .public (Var'' a)
(Atom [a (List (Sink a))])
+ (type: .public (Var' r w)
+ (Var'' (Mutable r w)))
+
+ (type: .public (Var a)
+ (Var'' (Mutable a a)))
+
(def: .public (var value)
(All (_ a) (-> a (Var a)))
- (abstraction (atom.atom [value (list)])))
+ (abstraction (atom.atom [(variance.write value) (list)])))
(def: read!
- (All (_ a) (-> (Var a) a))
- (|>> representation atom.read! io.run! product.left))
-
- (def: (un_follow! sink var)
- (All (_ a) (-> (Sink a) (Var a) (IO Any)))
- (do io.monad
- [_ (atom.update! (function (_ [value observers])
- [value (list.only (|>> (same? sink) not) observers)])
- (representation var))]
- (in [])))
+ (All (_ r w) (-> (Var' r w) r))
+ (|>> representation atom.read! io.run! product.left variance.read))
(def: (write! new_value var)
- (All (_ a) (-> a (Var a) (IO Any)))
+ (All (_ r w) (-> w (Var' r w) (IO Any)))
(do [! io.monad]
[.let [var' (representation var)]
- (^.let old [old_value observers]) (atom.read! var')
- succeeded? (atom.compare_and_swap! old [new_value observers] var')]
+ (^.let old [_ observers]) (atom.read! var')
+ succeeded? (atom.compare_and_swap! old [(variance.write new_value) observers] var')]
(if succeeded?
(do !
- [_ (monad.each ! (function (_ sink)
- (do !
- [result (# sink feed new_value)]
- (case result
- {try.#Success _}
- (in [])
-
- {try.#Failure _}
- (un_follow! sink var))))
- observers)]
+ [banned (monad.only ! (function (_ sink)
+ (do !
+ [result (# sink feed (variance.write new_value))]
+ (in (case result
+ {try.#Success _}
+ false
+
+ {try.#Failure _}
+ true))))
+ observers)
+ _ (atom.update! (function (_ [value audience])
+ (|> audience
+ (list.only (function (_ it)
+ (not (list.any? (same? it) banned))))
+ [value]))
+ var')]
(in []))
(write! new_value var))))
(def: .public (changes target)
(All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)])))
(do io.monad
- [.let [[channel sink] (frp.channel [])]
+ [.let [[channel sink] (sharing [a]
+ (Var a)
+ target
+
+ [(Channel a) (Sink a)]
+ (frp.channel []))]
_ (atom.update! (function (_ [value observers])
- [value {.#Item sink observers}])
+ [value {.#Item (implementation
+ (def: close (# sink close))
+ (def: feed (|>> variance.read (# sink feed))))
+ observers}])
(representation target))]
(in [channel sink])))
)
-(type: (Tx_Frame a)
+(type: (Tx_Frame r w)
(Record
- [#var (Var a)
- #original a
- #current a]))
+ [#var (Var' r w)
+ #original r
+ #current w]))
(type: Tx
- (List (Ex (_ a) (Tx_Frame a))))
+ (List (Ex (_ r w) (Tx_Frame r w))))
(type: .public (STM a)
(-> Tx [Tx a]))
(def: (var_value var tx)
- (All (_ a) (-> (Var a) Tx (Maybe a)))
+ (All (_ r w) (-> (Var' r w) Tx (Maybe r)))
(|> tx
(list.example (function (_ [_var _original _current])
(same? (as (Var Any) var)
@@ -98,7 +111,7 @@
as_expected))
(def: .public (read var)
- (All (_ a) (-> (Var a) (STM a)))
+ (All (_ r w) (-> (Var' r w) (STM r)))
(function (_ tx)
(case (var_value var tx)
{.#Some value}
@@ -106,11 +119,14 @@
{.#None}
(let [value (..read! var)]
- [{.#Item [var value value] tx}
+ [{.#Item [#var var
+ #original value
+ #current (as_expected value)]
+ tx}
value]))))
(def: (with_updated_var var value tx)
- (All (_ a) (-> (Var a) a Tx Tx))
+ (All (_ r w) (-> (Var' r w) w Tx Tx))
(case tx
{.#End}
{.#End}
@@ -118,9 +134,9 @@
{.#Item [_var _original _current] tx'}
(if (same? (as (Var Any) var)
(as (Var Any) _var))
- {.#Item [#var (as (Var Any) _var)
- #original (as Any _original)
- #current (as Any value)]
+ {.#Item [#var _var
+ #original _original
+ #current (as_expected value)]
tx'}
{.#Item [#var _var
#original _original
@@ -128,7 +144,7 @@
(with_updated_var var value tx')})))
(def: .public (write value var)
- (All (_ a) (-> a (Var a) (STM Any)))
+ (All (_ r w) (-> w (Var' r w) (STM Any)))
(function (_ tx)
(case (var_value var tx)
{.#Some _}
@@ -136,7 +152,10 @@
[]]
{.#None}
- [{.#Item [var (..read! var) value] tx}
+ [{.#Item [#var var
+ #original (..read! var)
+ #current value]
+ tx}
[]])))
(implementation: .public functor
@@ -173,12 +192,12 @@
(ma tx')))))
(def: .public (update f var)
- (All (_ a) (-> (-> a a) (Var a) (STM [a a])))
+ (All (_ r w) (-> (-> r w) (Var' r w) (STM [r w])))
(do ..monad
- [a (..read var)
- .let [a' (f a)]
- _ (..write a' var)]
- (in [a a'])))
+ [before (..read var)
+ .let [after (f before)]
+ _ (..write after var)]
+ (in [before after])))
(def: (can_commit? tx)
(-> Tx Bit)
@@ -187,8 +206,8 @@
tx))
(def: (commit_var! [_var _original _current])
- (-> (Ex (_ a) (Tx_Frame a)) (IO Any))
- (if (same? _original _current)
+ (-> (Ex (_ r w) (Tx_Frame r w)) (IO Any))
+ (if (same? (as Any _original) (as Any _current))
(io [])
(..write! _current _var)))
diff --git a/stdlib/source/library/lux/control/concurrency/thread.lux b/stdlib/source/library/lux/control/concurrency/thread.lux
index db9c06846..f8d92bc77 100644
--- a/stdlib/source/library/lux/control/concurrency/thread.lux
+++ b/stdlib/source/library/lux/control/concurrency/thread.lux
@@ -122,14 +122,14 @@
(def: .public (schedule! milli_seconds action)
(-> Nat (IO Any) (IO Any))
- (with_expansions [<jvm> (these (let [runnable (ffi.object [] [java/lang/Runnable]
- []
- (java/lang/Runnable [] (run self []) void
- (..execute! action)))]
- (case milli_seconds
- 0 (java/util/concurrent/Executor::execute runnable runner)
- _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS
- runner))))]
+ (with_expansions [<jvm> (let [runnable (ffi.object [] [java/lang/Runnable]
+ []
+ (java/lang/Runnable [] (run self []) void
+ (..execute! action)))]
+ (case milli_seconds
+ 0 (java/util/concurrent/Executor::execute runnable runner)
+ _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS
+ runner)))]
(for @.old <jvm>
@.jvm <jvm>
diff --git a/stdlib/source/library/lux/control/thread.lux b/stdlib/source/library/lux/control/thread.lux
index 4b5fdfc35..6a65b2650 100644
--- a/stdlib/source/library/lux/control/thread.lux
+++ b/stdlib/source/library/lux/control/thread.lux
@@ -10,48 +10,46 @@
["[0]" io {"+" IO}]]
[data
[collection
- ["[0]" array {"+" Array}]]]
+ ["[0]" array "_"
+ ["[1]" \\unsafe {"+" Array}]]]]
[type
- [abstract {"-" pattern}]]]])
+ [abstract {"-" pattern}]
+ ["[0]" variance {"+" Mutable}]]]])
(type: .public (Thread ! a)
(-> ! a))
-(abstract: .public (Box t v)
- (Array v)
+(abstract: .public (Box'' t a)
+ (Array a)
+
+ (type: .public (Box' t r w)
+ (Box'' t (Mutable r w)))
+
+ (type: .public (Box t a)
+ (Box'' t (Mutable a a)))
(def: .public (box init)
(All (_ a) (-> a (All (_ !) (Thread ! (Box ! a)))))
(function (_ !)
(|> (array.empty 1)
- (array.has! 0 init)
+ (array.has! 0 (variance.write init))
abstraction)))
(def: .public (read! box)
- (All (_ ! a) (-> (Box ! a) (Thread ! a)))
+ (All (_ ! r w) (-> (Box' ! r w) (Thread ! r)))
(function (_ !)
- (for @.old
- ("jvm aaload" (representation box) 0)
-
- @.jvm
- ("jvm array read object"
- (|> 0
- (as (Primitive "java.lang.Long"))
- "jvm object cast"
- "jvm conversion long-to-int")
- (representation box))
-
- @.js ("js array read" 0 (representation box))
- @.python ("python array read" 0 (representation box))
- @.lua ("lua array read" 0 (representation box))
- @.ruby ("ruby array read" 0 (representation box))
- @.php ("php array read" 0 (representation box))
- @.scheme ("scheme array read" 0 (representation box)))))
+ (|> box
+ representation
+ (array.item 0)
+ variance.read)))
(def: .public (write! value box)
- (All (_ a) (-> a (All (_ !) (-> (Box ! a) (Thread ! Any)))))
+ (All (_ r w) (-> w (All (_ !) (-> (Box' ! r w) (Thread ! Any)))))
(function (_ !)
- (|> box representation (array.has! 0 value) abstraction)))
+ (|> box
+ representation
+ (array.has! 0 (variance.write value))
+ abstraction)))
)
(def: .public (result thread)
@@ -97,8 +95,9 @@
((ffa !) !))))
(def: .public (update! f box)
- (All (_ a !) (-> (-> a a) (Box ! a) (Thread ! a)))
+ (All (_ ! r w) (-> (-> r w) (Box' ! r w) (Thread ! [r w])))
(do ..monad
[old (read! box)
- _ (write! (f old) box)]
- (in old)))
+ .let [new (f old)]
+ _ (write! new box)]
+ (in [old new])))
diff --git a/stdlib/source/library/lux/tool/compiler/default/platform.lux b/stdlib/source/library/lux/tool/compiler/default/platform.lux
index bf809bbb5..1facd15f6 100644
--- a/stdlib/source/library/lux/tool/compiler/default/platform.lux
+++ b/stdlib/source/library/lux/tool/compiler/default/platform.lux
@@ -115,7 +115,10 @@
(let [system (the #&file_system platform)
write_artifact! (is (-> [artifact.ID (Maybe Text) Binary] (Action Any))
(function (_ [artifact_id custom content])
- (cache/artifact.cache! system context @module artifact_id content)))]
+ (with_expansions [<it> (cache/artifact.cache! system context @module artifact_id content)]
+ (for @.old (as (Async (Try Any))
+ <it>)
+ <it>))))]
(do [! ..monad]
[_ (is (Async (Try Any))
(cache/module.enable! async.monad system context @module))
@@ -132,12 +135,15 @@
(is (Action (List Any)))))
document (# async.monad in
(document.marked? key (the [archive.#module module.#document] entry)))]
- (|> [(|> entry
- (the archive.#module)
- (has module.#document document))
- (the archive.#registry entry)]
- (_.result (..writer format))
- (cache/module.cache! system context @module)))))
+ (with_expansions [<it> (|> [(|> entry
+ (the archive.#module)
+ (has module.#document document))
+ (the archive.#registry entry)]
+ (_.result (..writer format))
+ (cache/module.cache! system context @module))]
+ (for @.old (as (Async (Try Any))
+ <it>)
+ <it>)))))
... TODO: Inline ASAP
(def: initialize_buffer!
diff --git a/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux b/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux
index 26298275f..bd1c6aa1e 100644
--- a/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux
+++ b/stdlib/source/library/lux/tool/compiler/meta/io/archive.lux
@@ -332,9 +332,15 @@
(list.only (|>> product.left (dictionary.key? purge) not))
(monad.each ! (function (_ [module_name [@module entry]])
(do !
- [[entry bundles] (..load_definitions fs context @module host_environment entry)]
- (in [[module_name entry]
- bundles])))))]
+ [[entry bundles] (with_expansions [<it> (..load_definitions fs context @module host_environment entry)]
+ (for @.old (as (Async (Try [(archive.Entry .Module) Bundles]))
+ <it>)
+ <it>))]
+ (in (with_expansions [<it> [[module_name entry]
+ bundles]]
+ (for @.old (as [[descriptor.Module (archive.Entry .Module)] Bundles]
+ <it>)
+ <it>)))))))]
(in it)))
(def: (load_every_reserved_module customs configuration host_environment fs context import contexts archive)
diff --git a/stdlib/source/test/lux/control/concurrency/frp.lux b/stdlib/source/test/lux/control/concurrency/frp.lux
index 3c804559e..90d5f17fe 100644
--- a/stdlib/source/test/lux/control/concurrency/frp.lux
+++ b/stdlib/source/test/lux/control/concurrency/frp.lux
@@ -19,7 +19,9 @@
[math
["[0]" random]
[number
- ["n" nat]]]]]
+ ["n" nat]]]
+ [type
+ ["[0]" variance]]]]
[\\library
["[0]" /
[//
@@ -41,7 +43,7 @@
(in (case [?left ?right]
[{.#Some {.#Some [left _]}}
{.#Some {.#Some [right _]}}]
- (== left right)
+ (== (variance.read left) (variance.read right))
_
false))))))
@@ -59,7 +61,7 @@
(in {.#End})
{.#Some [head tail]}
- (# ! each (|>> {.#Item head})
+ (# ! each (|>> {.#Item (variance.read head)})
(take_amount (-- amount_of_polls) [channel sink]))))))
(def: .public test
@@ -97,7 +99,7 @@
[?actual (async.value channel)]
(in (case ?actual
{.#Some {.#Some [actual _]}}
- (n.= sample actual)
+ (n.= sample (variance.read actual))
_
false))))
diff --git a/stdlib/source/test/lux/control/thread.lux b/stdlib/source/test/lux/control/thread.lux
index 5da8a5e95..cfff55018 100644
--- a/stdlib/source/test/lux/control/thread.lux
+++ b/stdlib/source/test/lux/control/thread.lux
@@ -77,6 +77,6 @@
(/.result (is (All (_ !) (Thread ! Nat))
(do /.monad
[box (/.box sample)
- old (/.update! (n.* factor) box)]
- (/.read! box))))))))
+ [old new] (/.update! (n.* factor) box)]
+ (in new))))))))
))))