diff options
author | Eduardo Julian | 2018-12-15 12:39:15 -0400 |
---|---|---|
committer | Eduardo Julian | 2018-12-15 12:39:15 -0400 |
commit | f6e280bd4ab41d12083c0eef2c823ad3962d6a04 (patch) | |
tree | 41173ee305fcf42736ae96cf7e61d6fac11b6175 /stdlib/source/lux/control/concurrency/process.lux | |
parent | 07426c47503a84666a9a7824d76e8d5730492d75 (diff) |
Moved the "lux/concurrency" modules under "lux/control".
Diffstat (limited to 'stdlib/source/lux/control/concurrency/process.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/process.lux | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux new file mode 100644 index 000000000..a67734747 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/process.lux @@ -0,0 +1,110 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + ["ex" exception (#+ exception:)]] + [data + [collection + ["." list]]] + [platform + [compiler + ["." host]]] + ["." io (#+ IO io)] + [host (#+ import: object)]] + [// + ["." atom (#+ Atom)]]) + +(`` (for {(~~ (static host.jvm)) + (as-is (import: java/lang/Object) + + (import: java/lang/Runtime + (#static getRuntime [] Runtime) + (availableProcessors [] int)) + + (import: java/lang/Runnable) + + (import: java/util/concurrent/TimeUnit + (#enum MILLISECONDS)) + + (import: java/util/concurrent/Executor + (execute [Runnable] #io void)) + + (import: (java/util/concurrent/ScheduledFuture a)) + + (import: java/util/concurrent/ScheduledThreadPoolExecutor + (new [int]) + (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} + + ## Default + (type: Process + {#creation Nat + #delay Nat + #action (IO Any)}) + )) + +(def: #export parallelism + Nat + (`` (for {(~~ (static host.jvm)) + (|> (Runtime::getRuntime) + (Runtime::availableProcessors) + .nat)} + + ## Default + 1))) + +(def: runner + (`` (for {(~~ (static host.jvm)) + (ScheduledThreadPoolExecutor::new (.int ..parallelism))} + + ## Default + (: (Atom (List Process)) + (atom.atom (list)))))) + +(def: #export (schedule milli-seconds action) + (-> Nat (IO Any) (IO Any)) + (`` (for {(~~ (static host.jvm)) + (let [runnable (object [] [Runnable] + [] + (Runnable [] (run) void + (io.run action)))] + (case milli-seconds + 0 (Executor::execute runnable runner) + _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS + runner)))} + + ## Default + (atom.update (|>> (#.Cons {#creation ("lux io current-time") + #delay milli-seconds + #action action})) + runner)))) + +(`` (for {(~~ (static host.jvm)) + (as-is)} + + ## Default + (as-is (exception: #export (cannot-continue-running-processes) "") + + (def: #export run! + (IO Any) + (loop [_ []] + (do io.Monad<IO> + [processes (atom.read runner)] + (case processes + ## And... we're done! + #.Nil + (wrap []) + + _ + (do @ + [#let [now ("lux io current-time") + [ready pending] (list.partition (function (_ process) + (|> (get@ #creation process) + (n/+ (get@ #delay process)) + (n/<= now))) + processes)] + swapped? (atom.compare-and-swap! processes pending runner)] + (if swapped? + (monad.seq @ ready) + (error! (ex.construct cannot-continue-running-processes [])))) + )))) + ))) |