aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--stdlib/source/lux/concurrency/session.lux146
-rw-r--r--stdlib/test/test/lux/concurrency/session.lux138
2 files changed, 176 insertions, 108 deletions
diff --git a/stdlib/source/lux/concurrency/session.lux b/stdlib/source/lux/concurrency/session.lux
index 8f896a905..cbbe34325 100644
--- a/stdlib/source/lux/concurrency/session.lux
+++ b/stdlib/source/lux/concurrency/session.lux
@@ -4,6 +4,7 @@
["." monad (#+ do)]
["p" parser]]
[data
+ ["." lazy (#+ Lazy)]
[text
format]
[collection
@@ -57,8 +58,8 @@
(def: #export end
{#.doc (doc "The last step in a session.")}
- [End End]
- [[] []])
+ (Lazy [End End])
+ (lazy.freeze [[] []]))
(def: #export $end
{#.doc (doc (counterpart-doc (name-of ..end)))}
@@ -72,34 +73,43 @@
## Read (Receive) / Write (Send)
(type: #export (Read value next)
{#.doc (doc "A reading step.")}
- (Promise [value next]))
+ (-> [] (Promise [value next])))
(type: #export (Write value next)
{#.doc (doc "A writing step.")}
(-> value (Promise next)))
-(template: (!write <read> <write>)
- (function (_ value)
- (exec (io.run (promise.resolve [value <read>] channel))
- (promise.promise (#.Some <write>)))))
-
-(def: #export receive
+(def: #export (receive session)
{#.doc (doc "A Read step for 'me', and a Write step for 'you'.")}
(All [value mine yours]
- (-> [mine yours] [(Read value mine) (Write value yours)]))
- (function (_ [mine yours])
- (let [channel (promise.promise #.None)]
- [channel
- (!write mine yours)])))
-
-(def: #export send
+ (-> (Lazy [mine yours])
+ (Lazy [(Read value mine) (Write value yours)])))
+ (let [channel (promise.promise #.None)]
+ (lazy.freeze [(function (_ _)
+ (let [[mine yours] (lazy.thaw session)]
+ (do promise.Monad<Promise>
+ [value channel]
+ (wrap [value mine]))))
+ (function (_ value)
+ (let [[mine yours] (lazy.thaw session)]
+ (exec (io.run (promise.resolve value channel))
+ (promise.promise (#.Some yours)))))])))
+
+(def: #export (send session)
{#.doc (doc "A Write step for 'me', and a Read step for 'you'.")}
(All [value mine yours]
- (-> [mine yours] [(Write value mine) (Read value yours)]))
- (function (_ [mine yours])
- (let [channel (promise.promise #.None)]
- [(!write yours mine)
- channel])))
+ (-> (Lazy [mine yours])
+ (Lazy [(Write value mine) (Read value yours)])))
+ (let [channel (promise.promise #.None)]
+ (lazy.freeze [(function (_ value)
+ (let [[mine yours] (lazy.thaw session)]
+ (exec (io.run (promise.resolve value channel))
+ (promise.promise (#.Some mine)))))
+ (function (_ _)
+ (let [[mine yours] (lazy.thaw session)]
+ (do promise.Monad<Promise>
+ [value channel]
+ (wrap [value yours]))))])))
(do-template [<name> <alias> <me> <you> <counterpart>]
[(def: #export (<name> value [mine yours])
@@ -125,53 +135,61 @@
(type: #export (Join left right)
{#.doc (doc "A joining step, which communicates which path was taken.")}
- (Promise (| left right)))
+ (-> [] (Promise (| left right))))
(def: #export (fork left right)
{#.doc (doc "A Fork step for 'me', and a Join step for 'you'.")}
(All [myL yourL myR yourR]
- (-> [myL yourL] [myR yourR]
- [(Fork myL myR) (Join yourL yourR)]))
- (let [[myL yourL] left
- [myR yourR] right
- channel (:share [yourL yourR]
- {[yourL yourR]
- [yourL yourR]}
- {(Join yourL yourR)
- (promise.promise #.None)})]
- [(function (_ signal)
- (case signal
- (#.Left go)
- (exec (io.run (promise.resolve (#.Left yourL) channel))
- (promise.promise (#.Some (go myL))))
-
- (#.Right go)
- (exec (io.run (promise.resolve (#.Right yourR) channel))
- (promise.promise (#.Some (go myR))))))
- channel]))
+ (-> (Lazy [myL yourL]) (Lazy [myR yourR])
+ (Lazy [(Fork myL myR) (Join yourL yourR)])))
+ (let [channel (: (Promise Bit)
+ (promise.promise #.None))]
+ (lazy.freeze [(function (_ signal)
+ (let [[myL yourL] (lazy.thaw left)
+ [myR yourR] (lazy.thaw right)]
+ (case signal
+ (#.Left go)
+ (exec (io.run (promise.resolve #0 channel))
+ (promise.promise (#.Some (go myL))))
+
+ (#.Right go)
+ (exec (io.run (promise.resolve #1 channel))
+ (promise.promise (#.Some (go myR)))))))
+ (function (_ _)
+ (let [[myL yourL] (lazy.thaw left)
+ [myR yourR] (lazy.thaw right)]
+ (do promise.Monad<Promise>
+ [choice channel]
+ (wrap (case choice
+ #0 (#.Left yourL)
+ #1 (#.Right yourR))))))])))
(def: #export (join left right)
{#.doc (doc "A Join step for 'me', and a Fork step for 'you'.")}
(All [myL yourL myR yourR]
- (-> [myL yourL] [myR yourR]
- [(Join myL myR) (Fork yourL yourR)]))
- (let [[myL yourL] left
- [myR yourR] right
- channel (:share [myL myR]
- {[myL myR]
- [myL myR]}
- {(Join myL myR)
- (promise.promise #.None)})]
- [channel
- (function (_ signal)
- (case signal
- (#.Left go)
- (exec (io.run (promise.resolve (#.Left myL) channel))
- (promise.promise (#.Some (go yourL))))
-
- (#.Right go)
- (exec (io.run (promise.resolve (#.Right myR) channel))
- (promise.promise (#.Some (go yourR))))))]))
+ (-> (Lazy [myL yourL]) (Lazy [myR yourR])
+ (Lazy [(Join myL myR) (Fork yourL yourR)])))
+ (let [channel (: (Promise Bit)
+ (promise.promise #.None))]
+ (lazy.freeze [(function (_ _)
+ (let [[myL yourL] (lazy.thaw left)
+ [myR yourR] (lazy.thaw right)]
+ (do promise.Monad<Promise>
+ [choice channel]
+ (wrap (case choice
+ #0 (#.Left myL)
+ #1 (#.Right myR))))))
+ (function (_ signal)
+ (let [[myL yourL] (lazy.thaw left)
+ [myR yourR] (lazy.thaw right)]
+ (case signal
+ (#.Left go)
+ (exec (io.run (promise.resolve #0 channel))
+ (promise.promise (#.Some (go yourL))))
+
+ (#.Right go)
+ (exec (io.run (promise.resolve #1 channel))
+ (promise.promise (#.Some (go yourR)))))))])))
(do-template [<name> <alias> <+> <-> <counterpart>]
[(def: #export (<name> [myL yourL] [myR yourR])
@@ -202,7 +220,7 @@
(def: #export (rec scope)
(All [session]
- (-> (-> session
- session)
- session))
- (scope (rec scope)))
+ (-> (-> (Lazy session)
+ (Lazy session))
+ (Lazy session)))
+ (lazy.freeze (lazy.thaw (scope (rec scope)))))
diff --git a/stdlib/test/test/lux/concurrency/session.lux b/stdlib/test/test/lux/concurrency/session.lux
index ad0343d15..ea27bf4b9 100644
--- a/stdlib/test/test/lux/concurrency/session.lux
+++ b/stdlib/test/test/lux/concurrency/session.lux
@@ -2,9 +2,12 @@
[lux #*
[control
[monad (#+ do)]]
+ [data
+ ["." lazy (#+ Lazy)]]
+ ["." io (#+ IO)]
[concurrency
["." promise]
- ["$" session (#+ Session choice: << >> ?? !! +<< -<< \/ /\)]]
+ ["$" session (#+ Session << >> ?? !! +<< -<< \/ /\ $rec)]]
[data
[text
format]]
@@ -12,67 +15,114 @@
["r" random]]]
lux/test)
-(choice: Calculation [#Add #Negate])
+(def: $transfer (<| << (!! Int) >>))
-(def: $add (<| (!! Int) (!! Int) (?? Int) >>))
-(def: $negate (<| (!! Int) (?? Int) >>))
+(def: transfer
+ (Lazy (:~ $transfer))
+ (<| $.send $.end))
-(def: (add-session _)
- (-> [] (:~ (<< $add)))
- (<| $.send $.send $.receive $.end))
+(context: "Transfer."
+ (do @
+ [#let [[$me $you] (lazy.thaw ..transfer)]
+ expectation r.int]
+ ($_ seq
+ (wrap (do promise.Monad<Promise>
+ [$me ($me expectation)]
+ (assert "Client [Transfer]"
+ true)))
+ (wrap (do promise.Monad<Promise>
+ [[actual $end] ($you [])]
+ (assert "Server [Transfer]"
+ (i/= expectation actual))))
+ )))
-(def: (negate-session _)
- (-> [] (:~ (<< $negate)))
- (<| $.send $.receive $.end))
+(def: $endless
+ (<< ($rec (function (_ $recur)
+ (<| (!! Int) $recur)))))
-(def: $calculation
- Session
- ($_ \/
- $add
- $negate))
+(def: endless
+ (Lazy (:~ $endless))
+ ($.rec (function (_ recur)
+ (<| $.send recur))))
-(def: (calculation-session _)
- (-> [] (:~ (<< $calculation)))
- ($_ $.fork
- (add-session [])
- (negate-session [])))
+(context: "Endless."
+ (do @
+ [#let [[$me $you] (lazy.thaw ..endless)]
+ expectation0 r.int
+ expectation1 r.int
+ expectation2 r.int]
+ ($_ seq
+ (wrap (do promise.Monad<Promise>
+ [$me ($me expectation0)
+ $me ($me expectation1)
+ $me ($me expectation2)]
+ (assert "Client [Endless]"
+ true)))
+ (wrap (do promise.Monad<Promise>
+ [[actual0 $you] ($you [])
+ [actual1 $you] ($you [])
+ [actual2 $you] ($you [])]
+ (assert "Server [Endless]"
+ (and (i/= expectation0 actual0)
+ (i/= expectation1 actual1)
+ (i/= expectation2 actual2)))))
+ )))
-(def: (__my-calculation _)
- (-> [] (:~ (+<< $calculation)))
- ($.my (calculation-session [])))
+(def: $calculation
+ Session
+ ($rec
+ (function (_ $recur)
+ ($_ \/
+ (<| (!! Int) (!! Int) (?? Int) $recur)
+ (<| (!! Int) (?? Int) $recur)
+ >>))))
-(def: (__your-calculation _)
- (-> [] (:~ (-<< $calculation)))
- ($.your (calculation-session [])))
+(def: calculation
+ (Lazy (:~ (<< $calculation)))
+ ($.rec
+ (function (_ recur)
+ ($_ $.fork
+ (<| $.send $.send $.receive recur)
+ (<| $.send $.receive recur)
+ $.end))))
-(context: "Sessions."
+(context: "Complex session."
(do @
- [#let [[$me $you] (calculation-session [])]
+ [#let [[$me $you] (lazy.thaw calculation)]
param r.int
- subject r.int]
+ subject r.int
+ #let [expectation (i/+ param subject)]]
($_ seq
(wrap (do promise.Monad<Promise>
- [$me ($me (#Add id))
+ [$me ($me (#.Left id))
$me ($me param)
$me ($me subject)
- [output $end] $me]
+ [output $end] ($me [])]
(assert "Client [#Add]"
- (i/= (i/+ param subject) output))))
+ (i/= expectation output))))
(wrap (do promise.Monad<Promise>
- [choice $you]
- (case choice
- (#Add $you)
+ [add|<negate|quit> ($you [])]
+ (case add|<negate|quit>
+ (#.Left $add)
(do @
- [[param-input $you] $you
- [subject-input $you] $you
- $end ($you (i/+ param-input subject-input))]
+ [[param-input $add] ($add [])
+ [subject-input $add] ($add [])
+ $recur ($add (i/+ param-input subject-input))]
(assert "Server [#Add]"
true))
-
- (#Negate $you)
+
+ (#.Right $negate|quit)
(do @
- [[subject $you] $you
- $end ($you (i/* -1 subject))]
- (assert "Server [#Negate]"
- true)))))
+ [negate|quit ($negate|quit [])]
+ (case negate|quit
+ (#.Left $negate)
+ (do @
+ [[subject $negate] ($negate [])
+ $recur ($negate (i/* -1 subject))]
+ (assert "Server [#Negate]"
+ true))
+
+ (#.Right $quit)
+ (assert "Server [#Quit]"
+ true))))))
)))