From 0c77e8ddf20ea95197f31596216ddf702d3898d2 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sun, 9 Dec 2018 20:56:39 -0400 Subject: Working session recursion. --- stdlib/source/lux/concurrency/session.lux | 146 +++++++++++++++------------ stdlib/test/test/lux/concurrency/session.lux | 138 +++++++++++++++++-------- 2 files changed, 176 insertions(+), 108 deletions(-) diff --git a/stdlib/source/lux/concurrency/session.lux b/stdlib/source/lux/concurrency/session.lux index 8f896a905..cbbe34325 100644 --- a/stdlib/source/lux/concurrency/session.lux +++ b/stdlib/source/lux/concurrency/session.lux @@ -4,6 +4,7 @@ ["." monad (#+ do)] ["p" parser]] [data + ["." lazy (#+ Lazy)] [text format] [collection @@ -57,8 +58,8 @@ (def: #export end {#.doc (doc "The last step in a session.")} - [End End] - [[] []]) + (Lazy [End End]) + (lazy.freeze [[] []])) (def: #export $end {#.doc (doc (counterpart-doc (name-of ..end)))} @@ -72,34 +73,43 @@ ## Read (Receive) / Write (Send) (type: #export (Read value next) {#.doc (doc "A reading step.")} - (Promise [value next])) + (-> [] (Promise [value next]))) (type: #export (Write value next) {#.doc (doc "A writing step.")} (-> value (Promise next))) -(template: (!write ) - (function (_ value) - (exec (io.run (promise.resolve [value ] channel)) - (promise.promise (#.Some ))))) - -(def: #export receive +(def: #export (receive session) {#.doc (doc "A Read step for 'me', and a Write step for 'you'.")} (All [value mine yours] - (-> [mine yours] [(Read value mine) (Write value yours)])) - (function (_ [mine yours]) - (let [channel (promise.promise #.None)] - [channel - (!write mine yours)]))) - -(def: #export send + (-> (Lazy [mine yours]) + (Lazy [(Read value mine) (Write value yours)]))) + (let [channel (promise.promise #.None)] + (lazy.freeze [(function (_ _) + (let [[mine yours] (lazy.thaw session)] + (do promise.Monad + [value channel] + (wrap [value mine])))) + (function (_ value) + (let [[mine yours] (lazy.thaw session)] + (exec (io.run (promise.resolve value channel)) + (promise.promise (#.Some yours)))))]))) + +(def: #export (send session) {#.doc (doc "A Write step for 'me', and a Read step for 'you'.")} (All [value mine yours] - (-> [mine yours] [(Write value mine) (Read value yours)])) - (function (_ [mine yours]) - (let [channel (promise.promise #.None)] - [(!write yours mine) - channel]))) + (-> (Lazy [mine yours]) + (Lazy [(Write value mine) (Read value yours)]))) + (let [channel (promise.promise #.None)] + (lazy.freeze [(function (_ value) + (let [[mine yours] (lazy.thaw session)] + (exec (io.run (promise.resolve value channel)) + (promise.promise (#.Some mine))))) + (function (_ _) + (let [[mine yours] (lazy.thaw session)] + (do promise.Monad + [value channel] + (wrap [value yours]))))]))) (do-template [ ] [(def: #export ( value [mine yours]) @@ -125,53 +135,61 @@ (type: #export (Join left right) {#.doc (doc "A joining step, which communicates which path was taken.")} - (Promise (| left right))) + (-> [] (Promise (| left right)))) (def: #export (fork left right) {#.doc (doc "A Fork step for 'me', and a Join step for 'you'.")} (All [myL yourL myR yourR] - (-> [myL yourL] [myR yourR] - [(Fork myL myR) (Join yourL yourR)])) - (let [[myL yourL] left - [myR yourR] right - channel (:share [yourL yourR] - {[yourL yourR] - [yourL yourR]} - {(Join yourL yourR) - (promise.promise #.None)})] - [(function (_ signal) - (case signal - (#.Left go) - (exec (io.run (promise.resolve (#.Left yourL) channel)) - (promise.promise (#.Some (go myL)))) - - (#.Right go) - (exec (io.run (promise.resolve (#.Right yourR) channel)) - (promise.promise (#.Some (go myR)))))) - channel])) + (-> (Lazy [myL yourL]) (Lazy [myR yourR]) + (Lazy [(Fork myL myR) (Join yourL yourR)]))) + (let [channel (: (Promise Bit) + (promise.promise #.None))] + (lazy.freeze [(function (_ signal) + (let [[myL yourL] (lazy.thaw left) + [myR yourR] (lazy.thaw right)] + (case signal + (#.Left go) + (exec (io.run (promise.resolve #0 channel)) + (promise.promise (#.Some (go myL)))) + + (#.Right go) + (exec (io.run (promise.resolve #1 channel)) + (promise.promise (#.Some (go myR))))))) + (function (_ _) + (let [[myL yourL] (lazy.thaw left) + [myR yourR] (lazy.thaw right)] + (do promise.Monad + [choice channel] + (wrap (case choice + #0 (#.Left yourL) + #1 (#.Right yourR))))))]))) (def: #export (join left right) {#.doc (doc "A Join step for 'me', and a Fork step for 'you'.")} (All [myL yourL myR yourR] - (-> [myL yourL] [myR yourR] - [(Join myL myR) (Fork yourL yourR)])) - (let [[myL yourL] left - [myR yourR] right - channel (:share [myL myR] - {[myL myR] - [myL myR]} - {(Join myL myR) - (promise.promise #.None)})] - [channel - (function (_ signal) - (case signal - (#.Left go) - (exec (io.run (promise.resolve (#.Left myL) channel)) - (promise.promise (#.Some (go yourL)))) - - (#.Right go) - (exec (io.run (promise.resolve (#.Right myR) channel)) - (promise.promise (#.Some (go yourR))))))])) + (-> (Lazy [myL yourL]) (Lazy [myR yourR]) + (Lazy [(Join myL myR) (Fork yourL yourR)]))) + (let [channel (: (Promise Bit) + (promise.promise #.None))] + (lazy.freeze [(function (_ _) + (let [[myL yourL] (lazy.thaw left) + [myR yourR] (lazy.thaw right)] + (do promise.Monad + [choice channel] + (wrap (case choice + #0 (#.Left myL) + #1 (#.Right myR)))))) + (function (_ signal) + (let [[myL yourL] (lazy.thaw left) + [myR yourR] (lazy.thaw right)] + (case signal + (#.Left go) + (exec (io.run (promise.resolve #0 channel)) + (promise.promise (#.Some (go yourL)))) + + (#.Right go) + (exec (io.run (promise.resolve #1 channel)) + (promise.promise (#.Some (go yourR)))))))]))) (do-template [ <+> <-> ] [(def: #export ( [myL yourL] [myR yourR]) @@ -202,7 +220,7 @@ (def: #export (rec scope) (All [session] - (-> (-> session - session) - session)) - (scope (rec scope))) + (-> (-> (Lazy session) + (Lazy session)) + (Lazy session))) + (lazy.freeze (lazy.thaw (scope (rec scope))))) diff --git a/stdlib/test/test/lux/concurrency/session.lux b/stdlib/test/test/lux/concurrency/session.lux index ad0343d15..ea27bf4b9 100644 --- a/stdlib/test/test/lux/concurrency/session.lux +++ b/stdlib/test/test/lux/concurrency/session.lux @@ -2,9 +2,12 @@ [lux #* [control [monad (#+ do)]] + [data + ["." lazy (#+ Lazy)]] + ["." io (#+ IO)] [concurrency ["." promise] - ["$" session (#+ Session choice: << >> ?? !! +<< -<< \/ /\)]] + ["$" session (#+ Session << >> ?? !! +<< -<< \/ /\ $rec)]] [data [text format]] @@ -12,67 +15,114 @@ ["r" random]]] lux/test) -(choice: Calculation [#Add #Negate]) +(def: $transfer (<| << (!! Int) >>)) -(def: $add (<| (!! Int) (!! Int) (?? Int) >>)) -(def: $negate (<| (!! Int) (?? Int) >>)) +(def: transfer + (Lazy (:~ $transfer)) + (<| $.send $.end)) -(def: (add-session _) - (-> [] (:~ (<< $add))) - (<| $.send $.send $.receive $.end)) +(context: "Transfer." + (do @ + [#let [[$me $you] (lazy.thaw ..transfer)] + expectation r.int] + ($_ seq + (wrap (do promise.Monad + [$me ($me expectation)] + (assert "Client [Transfer]" + true))) + (wrap (do promise.Monad + [[actual $end] ($you [])] + (assert "Server [Transfer]" + (i/= expectation actual)))) + ))) -(def: (negate-session _) - (-> [] (:~ (<< $negate))) - (<| $.send $.receive $.end)) +(def: $endless + (<< ($rec (function (_ $recur) + (<| (!! Int) $recur))))) -(def: $calculation - Session - ($_ \/ - $add - $negate)) +(def: endless + (Lazy (:~ $endless)) + ($.rec (function (_ recur) + (<| $.send recur)))) -(def: (calculation-session _) - (-> [] (:~ (<< $calculation))) - ($_ $.fork - (add-session []) - (negate-session []))) +(context: "Endless." + (do @ + [#let [[$me $you] (lazy.thaw ..endless)] + expectation0 r.int + expectation1 r.int + expectation2 r.int] + ($_ seq + (wrap (do promise.Monad + [$me ($me expectation0) + $me ($me expectation1) + $me ($me expectation2)] + (assert "Client [Endless]" + true))) + (wrap (do promise.Monad + [[actual0 $you] ($you []) + [actual1 $you] ($you []) + [actual2 $you] ($you [])] + (assert "Server [Endless]" + (and (i/= expectation0 actual0) + (i/= expectation1 actual1) + (i/= expectation2 actual2))))) + ))) -(def: (__my-calculation _) - (-> [] (:~ (+<< $calculation))) - ($.my (calculation-session []))) +(def: $calculation + Session + ($rec + (function (_ $recur) + ($_ \/ + (<| (!! Int) (!! Int) (?? Int) $recur) + (<| (!! Int) (?? Int) $recur) + >>)))) -(def: (__your-calculation _) - (-> [] (:~ (-<< $calculation))) - ($.your (calculation-session []))) +(def: calculation + (Lazy (:~ (<< $calculation))) + ($.rec + (function (_ recur) + ($_ $.fork + (<| $.send $.send $.receive recur) + (<| $.send $.receive recur) + $.end)))) -(context: "Sessions." +(context: "Complex session." (do @ - [#let [[$me $you] (calculation-session [])] + [#let [[$me $you] (lazy.thaw calculation)] param r.int - subject r.int] + subject r.int + #let [expectation (i/+ param subject)]] ($_ seq (wrap (do promise.Monad - [$me ($me (#Add id)) + [$me ($me (#.Left id)) $me ($me param) $me ($me subject) - [output $end] $me] + [output $end] ($me [])] (assert "Client [#Add]" - (i/= (i/+ param subject) output)))) + (i/= expectation output)))) (wrap (do promise.Monad - [choice $you] - (case choice - (#Add $you) + [add| ($you [])] + (case add| + (#.Left $add) (do @ - [[param-input $you] $you - [subject-input $you] $you - $end ($you (i/+ param-input subject-input))] + [[param-input $add] ($add []) + [subject-input $add] ($add []) + $recur ($add (i/+ param-input subject-input))] (assert "Server [#Add]" true)) - - (#Negate $you) + + (#.Right $negate|quit) (do @ - [[subject $you] $you - $end ($you (i/* -1 subject))] - (assert "Server [#Negate]" - true))))) + [negate|quit ($negate|quit [])] + (case negate|quit + (#.Left $negate) + (do @ + [[subject $negate] ($negate []) + $recur ($negate (i/* -1 subject))] + (assert "Server [#Negate]" + true)) + + (#.Right $quit) + (assert "Server [#Quit]" + true)))))) ))) -- cgit v1.2.3