aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/process.lux
diff options
context:
space:
mode:
authorEduardo Julian2018-12-15 12:39:15 -0400
committerEduardo Julian2018-12-15 12:39:15 -0400
commitf6e280bd4ab41d12083c0eef2c823ad3962d6a04 (patch)
tree41173ee305fcf42736ae96cf7e61d6fac11b6175 /stdlib/source/lux/control/concurrency/process.lux
parent07426c47503a84666a9a7824d76e8d5730492d75 (diff)
Moved the "lux/concurrency" modules under "lux/control".
Diffstat (limited to 'stdlib/source/lux/control/concurrency/process.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/process.lux110
1 files changed, 110 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux
new file mode 100644
index 000000000..a67734747
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/process.lux
@@ -0,0 +1,110 @@
+(.module:
+ [lux #*
+ [control
+ ["." monad (#+ do)]
+ ["ex" exception (#+ exception:)]]
+ [data
+ [collection
+ ["." list]]]
+ [platform
+ [compiler
+ ["." host]]]
+ ["." io (#+ IO io)]
+ [host (#+ import: object)]]
+ [//
+ ["." atom (#+ Atom)]])
+
+(`` (for {(~~ (static host.jvm))
+ (as-is (import: java/lang/Object)
+
+ (import: java/lang/Runtime
+ (#static getRuntime [] Runtime)
+ (availableProcessors [] int))
+
+ (import: java/lang/Runnable)
+
+ (import: java/util/concurrent/TimeUnit
+ (#enum MILLISECONDS))
+
+ (import: java/util/concurrent/Executor
+ (execute [Runnable] #io void))
+
+ (import: (java/util/concurrent/ScheduledFuture a))
+
+ (import: java/util/concurrent/ScheduledThreadPoolExecutor
+ (new [int])
+ (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))}
+
+ ## Default
+ (type: Process
+ {#creation Nat
+ #delay Nat
+ #action (IO Any)})
+ ))
+
+(def: #export parallelism
+ Nat
+ (`` (for {(~~ (static host.jvm))
+ (|> (Runtime::getRuntime)
+ (Runtime::availableProcessors)
+ .nat)}
+
+ ## Default
+ 1)))
+
+(def: runner
+ (`` (for {(~~ (static host.jvm))
+ (ScheduledThreadPoolExecutor::new (.int ..parallelism))}
+
+ ## Default
+ (: (Atom (List Process))
+ (atom.atom (list))))))
+
+(def: #export (schedule milli-seconds action)
+ (-> Nat (IO Any) (IO Any))
+ (`` (for {(~~ (static host.jvm))
+ (let [runnable (object [] [Runnable]
+ []
+ (Runnable [] (run) void
+ (io.run action)))]
+ (case milli-seconds
+ 0 (Executor::execute runnable runner)
+ _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS
+ runner)))}
+
+ ## Default
+ (atom.update (|>> (#.Cons {#creation ("lux io current-time")
+ #delay milli-seconds
+ #action action}))
+ runner))))
+
+(`` (for {(~~ (static host.jvm))
+ (as-is)}
+
+ ## Default
+ (as-is (exception: #export (cannot-continue-running-processes) "")
+
+ (def: #export run!
+ (IO Any)
+ (loop [_ []]
+ (do io.Monad<IO>
+ [processes (atom.read runner)]
+ (case processes
+ ## And... we're done!
+ #.Nil
+ (wrap [])
+
+ _
+ (do @
+ [#let [now ("lux io current-time")
+ [ready pending] (list.partition (function (_ process)
+ (|> (get@ #creation process)
+ (n/+ (get@ #delay process))
+ (n/<= now)))
+ processes)]
+ swapped? (atom.compare-and-swap! processes pending runner)]
+ (if swapped?
+ (monad.seq @ ready)
+ (error! (ex.construct cannot-continue-running-processes []))))
+ ))))
+ )))