From 9d20deda529f590c3092ac24546ba31da3c8f643 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Thu, 18 Aug 2022 03:13:52 -0400 Subject: Added support for event-loop concurrency. --- .../library/lux/control/concurrency/event.lux | 107 +++++++++++++++++++++ .../library/lux/control/concurrency/thread.lux | 79 +++++---------- .../language/lux/phase/generation/js/loop.lux | 4 +- .../language/lux/phase/generation/js/runtime.lux | 92 +++++++++--------- .../language/lux/phase/generation/js/when.lux | 6 +- stdlib/source/library/lux/world/environment.lux | 4 +- stdlib/source/library/lux/world/file.lux | 2 +- 7 files changed, 184 insertions(+), 110 deletions(-) create mode 100644 stdlib/source/library/lux/control/concurrency/event.lux (limited to 'stdlib/source/library') diff --git a/stdlib/source/library/lux/control/concurrency/event.lux b/stdlib/source/library/lux/control/concurrency/event.lux new file mode 100644 index 000000000..680bf50dd --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/event.lux @@ -0,0 +1,107 @@ +... https://en.wikipedia.org/wiki/Event_loop +(.require + [library + [lux (.except loop) + [abstract + ["[0]" monad (.only do)]] + [control + ["[0]" io (.only IO)] + ["[0]" try (.only Try)] + ["[0]" exception (.only Exception)]] + [data + [text + ["%" \\format]] + [collection + ["[0]" list]]] + [math + [number + ["n" nat]]] + [world + [time + ["[0]" instant (.only Instant) (.use "[1]#[0]" order)] + ["[0]" duration]]]]] + [// + ["[0]" atom (.only Atom)]]) + +(def Action + (type_literal (IO Any))) + +(type Event + (Record + [#when Instant + #what Action])) + +(def Scheduler + (type_literal (-> Nat Action (IO Any)))) + +(def Loop + (type_literal (IO (Try Nat)))) + +(exception.def .public (error_during_execution [loop error]) + (Exception [Text Text]) + (exception.report + (list ["Loop" (%.text loop)] + ["Error" error]))) + +(def (execute! loop action) + (-> Text Action (Try Any)) + (when (try (io.run! action)) + {try.#Failure error} + (exception.except ..error_during_execution [loop error]) + + success + success)) + +(exception.def .public (already_started loop) + (Exception Text) + (exception.report + (list ["Loop" (%.text loop)]))) + +(def .public (loop name) + (-> Text [Scheduler Loop]) + (let [state (is (Atom [Bit (List Event)]) + (atom.atom [false (list)]))] + [(is Scheduler + (function (schedule! milli_seconds action) + (do io.monad + [now instant.now + _ (atom.update! (function (_ [stated? events]) + [stated? + (list.partial [#when (instant.after (duration.of_millis (.int milli_seconds)) + now) + #what action] + events)]) + state)] + (in [])))) + (is Loop + (.loop (retry! [_ []]) + (do [! io.monad] + [started?,events (atom.read! state) + .let [[started? events] started?,events]] + (if started? + (in (exception.except ..already_started [name])) + (do ! + [swapped? (atom.compare_and_swap! started?,events [true events] state)] + (if swapped? + (.loop (again [events_processed 0]) + (do ! + [started?,events (atom.read! state) + .let [[started? events] started?,events]] + (when events + ... And... we're done! + {.#End} + (in {try.#Success events_processed}) + + _ + (do ! + [now instant.now + .let [[pending ready] (list.partition (function (_ thread) + (instant#< (the #when thread) now)) + events)] + swapped? (atom.compare_and_swap! started?,events [started? pending] state)] + (if swapped? + (do [! (try.with !)] + [_ (monad.each ! (|>> (the #what) (..execute! name) io.io) ready)] + (again (n.+ (list.size ready) events_processed))) + (again events_processed)))))) + (retry! [])))))))])) diff --git a/stdlib/source/library/lux/control/concurrency/thread.lux b/stdlib/source/library/lux/control/concurrency/thread.lux index def0f230d..5b260b4ea 100644 --- a/stdlib/source/library/lux/control/concurrency/thread.lux +++ b/stdlib/source/library/lux/control/concurrency/thread.lux @@ -7,7 +7,6 @@ ["[0]" monad (.only do)]] [control ["[0]" try] - ["[0]" exception] ["[0]" io (.only IO io)]] [data ["[0]" text] @@ -22,9 +21,11 @@ ["[0]" configuration]] [world [time - ["[0]" instant]]]]] + ["[0]" instant (.only Instant) (.use "[1]#[0]" order)] + ["[0]" duration]]]]] [// - ["[0]" atom (.only Atom)]]) + ["[0]" atom (.only Atom)] + ["[0]" event]]) (with_expansions [ (these (ffi.import java/lang/Object "[1]::[0]") @@ -68,11 +69,7 @@ (start [] "io" "?" Any)) ... Default - (type Thread - (Record - [#creation Nat - #delay Nat - #action (IO Any)])) + (these) )) (def .public parallelism @@ -102,12 +99,9 @@ @.python (these) ... Default - (these (def started? - (Atom Bit) - (atom.atom false)) - (def runner - (Atom (List Thread)) - (atom.atom (list)))))) + (these (def schedule!,run! + (let [[module _] (symbol .._)] + (event.loop module)))))) (def (execute! action) (-> (IO Any) Any) @@ -147,13 +141,8 @@ (in [])) ... Default - (do [! io.monad] - [now (at ! each (|>> instant.millis .nat) instant.now) - _ (atom.update! (|>> {.#Item [#creation now - #delay milli_seconds - #action action]}) - ..runner)] - (in []))))) + (let [[schedule! run!] ..schedule!,run!] + (schedule! milli_seconds action))))) (for @.old (these) @.jvm (these) @@ -161,39 +150,15 @@ @.python (these) ... Default - (these (exception.def .public cannot_continue_running_threads) - - ... https://en.wikipedia.org/wiki/Event_loop - ... Starts the event-loop. - (def .public run! - (IO Any) - (do [! io.monad] - [started? (atom.read! ..started?)] - (if started? - (in []) - (do ! - [_ (atom.write! true ..started?)] - (loop (again [_ []]) - (do ! - [threads (atom.read! ..runner)] - (when threads - ... And... we're done! - {.#End} - (in []) - - _ - (do ! - [now (at ! each (|>> instant.millis .nat) instant.now) - .let [[ready pending] (list.partition (function (_ thread) - (|> (the #creation thread) - (n.+ (the #delay thread)) - (n.<= now))) - threads)] - swapped? (atom.compare_and_swap! threads pending ..runner)] - (if swapped? - (do ! - [_ (monad.each ! (|>> (the #action) ..execute! io.io) ready)] - (again [])) - (panic! (exception.error ..cannot_continue_running_threads [])))) - ))))))) - )) + (def .public run! + (IO Any) + (let [[schedule! run!] ..schedule!,run!] + (do io.monad + [outcome run!] + (when outcome + {try.#Success _} + (in []) + + {try.#Failure error} + (in (debug.log! error)))))) + ) diff --git a/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/loop.lux b/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/loop.lux index c475281e4..92811921c 100644 --- a/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/loop.lux +++ b/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/loop.lux @@ -45,7 +45,7 @@ (all _.then (if initial? (_.define $binding binding) - (_.set $binding binding)) + (_.statement (_.set $binding binding))) body )) @@ -56,7 +56,7 @@ (let [variable (//when.register (n.+ offset register))] (if initial? (_.define variable (_.at (_.i32 (.int register)) $iteration)) - (_.set variable (_.at (_.i32 (.int register)) $iteration)))))) + (_.statement (_.set variable (_.at (_.i32 (.int register)) $iteration))))))) list.reversed (list#mix _.then body) (_.then (_.define $iteration (_.array bindings)))))) diff --git a/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/runtime.lux b/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/runtime.lux index 8848c781d..506287a9c 100644 --- a/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/runtime.lux +++ b/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/runtime.lux @@ -143,8 +143,8 @@ tuple)) (with_expansions [ (these (all _.then - (_.set lefts (_.- last_index_right lefts)) - (_.set tuple (_.at last_index_right tuple))))] + (_.statement (_.set lefts (_.- last_index_right lefts))) + (_.statement (_.set tuple (_.at last_index_right tuple)))))] (runtime (tuple//left lefts tuple) (with_vars [last_index_right] @@ -182,9 +182,9 @@ (with_vars [tag is_last value] (_.closure (list tag is_last value) (all _.then - (_.set (_.the ..variant_tag_field @this) tag) - (_.set (_.the ..variant_flag_field @this) is_last) - (_.set (_.the ..variant_value_field @this) value) + (_.statement (_.set (_.the ..variant_tag_field @this) tag)) + (_.statement (_.set (_.the ..variant_flag_field @this) is_last)) + (_.statement (_.set (_.the ..variant_value_field @this) value)) ))))) (def .public (variant tag last? value) @@ -199,10 +199,10 @@ actual::value (|> sum (_.the ..variant_value_field)) is_last? (_.= ..unit actual::right?) recur! (all _.then - (_.set expected::lefts (|> expected::lefts - (_.- actual::lefts) - (_.- (_.i32 +1)))) - (_.set sum actual::value))] + (_.statement (_.set expected::lefts (|> expected::lefts + (_.- actual::lefts) + (_.- (_.i32 +1))))) + (_.statement (_.set sum actual::value)))] (<| (_.while (_.boolean true)) (_.if (_.= expected::lefts actual::lefts) (_.if (_.= expected::right? actual::right?) @@ -260,8 +260,8 @@ (..last_index inputs) (_.>= (_.i32 +0) idx) (_.-- idx) - (_.set output (..some (_.array (list (_.at idx inputs) - output))))) + (_.statement (_.set output (..some (_.array (list (_.at idx inputs) + output)))))) (_.return output)))) (def runtime//lux @@ -280,8 +280,8 @@ (with_vars [high low] (_.closure (list high low) (all _.then - (_.set (_.the ..i64_high_field @this) high) - (_.set (_.the ..i64_low_field @this) low) + (_.statement (_.set (_.the ..i64_high_field @this) high)) + (_.statement (_.set (_.the ..i64_low_field @this) low)) ))))) (def .public (i64 high low) @@ -308,7 +308,7 @@ (def (cap_shift! shift) (-> Var Statement) - (_.set shift (|> shift (_.bit_and (_.i32 +63))))) + (_.statement (_.set shift (|> shift (_.bit_and (_.i32 +63)))))) (def (no_shift! shift input) (-> Var Var (-> Expression Expression)) @@ -458,18 +458,18 @@ (_.define x16 (|> (high_16 x00) (_.+ l16) (_.+ r16))) - (_.set x00 (low_16 x00)) + (_.statement (_.set x00 (low_16 x00))) (_.define x32 (|> (high_16 x16) (_.+ l32) (_.+ r32))) - (_.set x16 (low_16 x16)) + (_.statement (_.set x16 (low_16 x16))) (_.define x48 (|> (high_16 x32) (_.+ l48) (_.+ r48) low_16)) - (_.set x32 (low_16 x32)) + (_.statement (_.set x32 (low_16 x32))) (_.return (..i64 (_.bit_or (up_16 x48) x32) (_.bit_or (up_16 x16) x00))) @@ -527,26 +527,28 @@ (_.define x00 (_.* l00 r00)) (_.define x16 (high_16 x00)) - (_.set x00 (low_16 x00)) + (_.statement (_.set x00 (low_16 x00))) - (_.set x16 (|> x16 (_.+ (_.* l16 r00)))) - (_.define x32 (high_16 x16)) (_.set x16 (low_16 x16)) - (_.set x16 (|> x16 (_.+ (_.* l00 r16)))) - (_.set x32 (|> x32 (_.+ (high_16 x16)))) (_.set x16 (low_16 x16)) - - (_.set x32 (|> x32 (_.+ (_.* l32 r00)))) - (_.define x48 (high_16 x32)) (_.set x32 (low_16 x32)) - (_.set x32 (|> x32 (_.+ (_.* l16 r16)))) - (_.set x48 (|> x48 (_.+ (high_16 x32)))) (_.set x32 (low_16 x32)) - (_.set x32 (|> x32 (_.+ (_.* l00 r32)))) - (_.set x48 (|> x48 (_.+ (high_16 x32)))) (_.set x32 (low_16 x32)) + (_.statement (_.set x16 (|> x16 (_.+ (_.* l16 r00))))) + (_.define x32 (high_16 x16)) + (_.statement (_.set x16 (|> x16 low_16 (_.+ (_.* l00 r16))))) + (_.statement (_.set x32 (|> x32 (_.+ (high_16 x16))))) + (_.statement (_.set x16 (low_16 x16))) + + (_.statement (_.set x32 (|> x32 (_.+ (_.* l32 r00))))) + (_.define x48 (high_16 x32)) + (_.statement (_.set x32 (|> x32 low_16 (_.+ (_.* l16 r16))))) + (_.statement (_.set x48 (|> x48 (_.+ (high_16 x32))))) + (_.statement (_.set x32 (|> x32 low_16 (_.+ (_.* l00 r32))))) + (_.statement (_.set x48 (|> x48 (_.+ (high_16 x32))))) + (_.statement (_.set x32 (low_16 x32))) - (_.set x48 (|> x48 - (_.+ (_.* l48 r00)) - (_.+ (_.* l32 r16)) - (_.+ (_.* l16 r32)) - (_.+ (_.* l00 r48)) - low_16)) + (_.statement (_.set x48 (|> x48 + (_.+ (_.* l48 r00)) + (_.+ (_.* l32 r16)) + (_.+ (_.* l16 r32)) + (_.+ (_.* l00 r48)) + low_16))) (_.return (..i64 (_.bit_or (up_16 x48) x32) (_.bit_or (up_16 x16) x00))) @@ -641,14 +643,14 @@ (i64::< approximate_remainder remainder)) (all _.then - (_.set approximate (_.- delta approximate)) - (_.set approximate_result approximate_result') - (_.set approximate_remainder approx_remainder))) - (_.set result (i64::+ (_.? (i64::= i64::zero approximate_result) - i64::one - approximate_result) - result)) - (_.set remainder (i64::- approximate_remainder remainder)))))) + (_.statement (_.set approximate (_.- delta approximate))) + (_.statement (_.set approximate_result approximate_result')) + (_.statement (_.set approximate_remainder approx_remainder)))) + (_.statement (_.set result (i64::+ (_.? (i64::= i64::zero approximate_result) + i64::one + approximate_result) + result))) + (_.statement (_.set remainder (i64::- approximate_remainder remainder))))))) (_.return result))))) (runtime @@ -761,7 +763,7 @@ (runtime (js//set object field input) (all _.then - (_.set (_.at field object) input) + (_.statement (_.set (_.at field object) input)) (_.return object))) (runtime @@ -781,7 +783,7 @@ (runtime (array//write idx value array) (all _.then - (_.set (_.at (_.the ..i64_low_field idx) array) value) + (_.statement (_.set (_.at (_.the ..i64_low_field idx) array) value)) (_.return array))) (runtime diff --git a/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/when.lux b/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/when.lux index 7487beb55..c7790d6d1 100644 --- a/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/when.lux +++ b/stdlib/source/library/lux/meta/compiler/language/lux/phase/generation/js/when.lux @@ -140,7 +140,7 @@ (def restore_cursor! Statement - (_.set @cursor (|> @savepoint (_.do "pop" (list))))) + (_.statement (_.set @cursor (|> @savepoint (_.do "pop" (list)))))) (def fail_pm! _.break) @@ -154,8 +154,8 @@ [(def ( simple? idx) (-> Bit Nat Statement) (all _.then - (_.set @temp (//runtime.sum//get ..peek_cursor - (|> idx .int _.i32))) + (_.statement (_.set @temp (//runtime.sum//get ..peek_cursor + (|> idx .int _.i32)))) (.if simple? (_.when (_.= _.null @temp) ..fail_pm!) diff --git a/stdlib/source/library/lux/world/environment.lux b/stdlib/source/library/lux/world/environment.lux index 69d8dce95..31dafca6d 100644 --- a/stdlib/source/library/lux/world/environment.lux +++ b/stdlib/source/library/lux/world/environment.lux @@ -375,7 +375,7 @@ ) @.python (os/path::expanduser "~") @.lua (..run_command "~" "echo ~") - @.ruby (io.io (Dir::home)) + @.ruby (Dir::home) ... @.php (do io.monad ... [output (..getenv/1 ["HOME"])] ... (in (if (bit#= false (as Bit output)) @@ -409,7 +409,7 @@ (if (same? default on_windows) (..run_command default "pwd") (in on_windows))) - @.ruby (io.io (FileUtils::pwd)) + @.ruby (FileUtils::pwd) ... @.php (do io.monad ... [output (..getcwd [])] ... (in (if (bit#= false (as Bit output)) diff --git a/stdlib/source/library/lux/world/file.lux b/stdlib/source/library/lux/world/file.lux index 16e205fe7..554d8c4f6 100644 --- a/stdlib/source/library/lux/world/file.lux +++ b/stdlib/source/library/lux/world/file.lux @@ -672,7 +672,7 @@ (def ruby_separator Text - (..RubyFile::SEPARATOR)) + (io.run! (..RubyFile::SEPARATOR))) (`` (def .public default (System IO) -- cgit v1.2.3