diff options
Diffstat (limited to 'stdlib/source/lux/concurrency/process.lux')
-rw-r--r-- | stdlib/source/lux/concurrency/process.lux | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/stdlib/source/lux/concurrency/process.lux b/stdlib/source/lux/concurrency/process.lux new file mode 100644 index 000000000..2ff56c395 --- /dev/null +++ b/stdlib/source/lux/concurrency/process.lux @@ -0,0 +1,104 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + ["ex" exception (#+ exception:)]] + [data + [collection + ["." list]]] + [compiler + ["." host]] + ["." io (#+ IO io)] + [host (#+ import: object)]] + [// + ["." atom (#+ Atom)]]) + +(`` (for {(~~ (static host.jvm)) + (as-is (import: java/lang/Object) + + (import: java/lang/Runtime + (#static getRuntime [] Runtime) + (availableProcessors [] int)) + + (import: java/lang/Runnable) + + (import: java/util/concurrent/TimeUnit + (#enum MILLISECONDS)) + + (import: java/util/concurrent/Executor + (execute [Runnable] #io void)) + + (import: (java/util/concurrent/ScheduledFuture a)) + + (import: java/util/concurrent/ScheduledThreadPoolExecutor + (new [int]) + (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} + + (type: Process + {#creation Nat + #delay Nat + #action (IO Any)}) + )) + +(def: #export parallelism + Nat + (`` (for {(~~ (static host.jvm)) + (|> (Runtime::getRuntime []) + (Runtime::availableProcessors []) + .nat)} + 1))) + +(def: runner + (`` (for {(~~ (static host.jvm)) + (ScheduledThreadPoolExecutor::new [(.int ..parallelism)])} + + (: (Atom (List Process)) + (atom.atom (list)))))) + +(def: #export (schedule milli-seconds action) + (-> Nat (IO Any) (IO Any)) + (`` (for {(~~ (static host.jvm)) + (let [runnable (object [] [Runnable] + [] + (Runnable [] (run) void + (io.run action)))] + (case milli-seconds + 0 (Executor::execute [runnable] + runner) + _ (ScheduledThreadPoolExecutor::schedule [runnable (.int milli-seconds) TimeUnit::MILLISECONDS] + runner)))} + (atom.update (|>> (#.Cons {#creation ("lux io current-time") + #delay milli-seconds + #action action})) + runner)))) + +(`` (for {(~~ (static host.jvm)) + (as-is)} + (as-is (exception: #export (cannot-continue-running-processes) "") + + (def: #export run! + (IO Any) + (loop [_ []] + (do io.Monad<IO> + [processes (atom.read runner)] + (case processes + ## And... we're done! + #.Nil + (wrap []) + + _ + (do @ + [#let [now ("lux io current-time") + [ready pending] (list.partition (function (_ process) + (|> (get@ #creation process) + (n/+ (get@ #delay process)) + (n/<= now))) + processes)] + swapped? (atom.compare-and-swap! processes pending runner)] + (if swapped? + (do @ + [_ (monad.seq @ ready)] + (recur [])) + (error! (ex.construct cannot-continue-running-processes [])))) + )))) + ))) |