aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/process.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/concurrency/process.lux')
-rw-r--r--stdlib/source/lux/concurrency/process.lux104
1 files changed, 104 insertions, 0 deletions
diff --git a/stdlib/source/lux/concurrency/process.lux b/stdlib/source/lux/concurrency/process.lux
new file mode 100644
index 000000000..2ff56c395
--- /dev/null
+++ b/stdlib/source/lux/concurrency/process.lux
@@ -0,0 +1,104 @@
+(.module:
+ [lux #*
+ [control
+ ["." monad (#+ do)]
+ ["ex" exception (#+ exception:)]]
+ [data
+ [collection
+ ["." list]]]
+ [compiler
+ ["." host]]
+ ["." io (#+ IO io)]
+ [host (#+ import: object)]]
+ [//
+ ["." atom (#+ Atom)]])
+
+(`` (for {(~~ (static host.jvm))
+ (as-is (import: java/lang/Object)
+
+ (import: java/lang/Runtime
+ (#static getRuntime [] Runtime)
+ (availableProcessors [] int))
+
+ (import: java/lang/Runnable)
+
+ (import: java/util/concurrent/TimeUnit
+ (#enum MILLISECONDS))
+
+ (import: java/util/concurrent/Executor
+ (execute [Runnable] #io void))
+
+ (import: (java/util/concurrent/ScheduledFuture a))
+
+ (import: java/util/concurrent/ScheduledThreadPoolExecutor
+ (new [int])
+ (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))}
+
+ (type: Process
+ {#creation Nat
+ #delay Nat
+ #action (IO Any)})
+ ))
+
+(def: #export parallelism
+ Nat
+ (`` (for {(~~ (static host.jvm))
+ (|> (Runtime::getRuntime [])
+ (Runtime::availableProcessors [])
+ .nat)}
+ 1)))
+
+(def: runner
+ (`` (for {(~~ (static host.jvm))
+ (ScheduledThreadPoolExecutor::new [(.int ..parallelism)])}
+
+ (: (Atom (List Process))
+ (atom.atom (list))))))
+
+(def: #export (schedule milli-seconds action)
+ (-> Nat (IO Any) (IO Any))
+ (`` (for {(~~ (static host.jvm))
+ (let [runnable (object [] [Runnable]
+ []
+ (Runnable [] (run) void
+ (io.run action)))]
+ (case milli-seconds
+ 0 (Executor::execute [runnable]
+ runner)
+ _ (ScheduledThreadPoolExecutor::schedule [runnable (.int milli-seconds) TimeUnit::MILLISECONDS]
+ runner)))}
+ (atom.update (|>> (#.Cons {#creation ("lux io current-time")
+ #delay milli-seconds
+ #action action}))
+ runner))))
+
+(`` (for {(~~ (static host.jvm))
+ (as-is)}
+ (as-is (exception: #export (cannot-continue-running-processes) "")
+
+ (def: #export run!
+ (IO Any)
+ (loop [_ []]
+ (do io.Monad<IO>
+ [processes (atom.read runner)]
+ (case processes
+ ## And... we're done!
+ #.Nil
+ (wrap [])
+
+ _
+ (do @
+ [#let [now ("lux io current-time")
+ [ready pending] (list.partition (function (_ process)
+ (|> (get@ #creation process)
+ (n/+ (get@ #delay process))
+ (n/<= now)))
+ processes)]
+ swapped? (atom.compare-and-swap! processes pending runner)]
+ (if swapped?
+ (do @
+ [_ (monad.seq @ ready)]
+ (recur []))
+ (error! (ex.construct cannot-continue-running-processes []))))
+ ))))
+ )))