aboutsummaryrefslogtreecommitdiff
path: root/stdlib
diff options
context:
space:
mode:
authorEduardo Julian2018-07-31 20:22:15 -0400
committerEduardo Julian2018-07-31 20:22:15 -0400
commit7e44ee8a2cfb14e35f6283a9eb8d6a2ddc8bd99a (patch)
tree98a12aa5a9fd347c1070612a2a1dae69dae879b1 /stdlib
parenteea58ee669f69fddf2cef9e1675c41959e2e0a55 (diff)
Now implementing process functionality in stdlib instead of the compiler.
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux13
-rw-r--r--stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux14
-rw-r--r--stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux38
-rw-r--r--stdlib/source/lux/concurrency/process.lux104
-rw-r--r--stdlib/source/lux/concurrency/promise.lux19
-rw-r--r--stdlib/source/lux/host.jvm.lux7
-rw-r--r--stdlib/source/lux/test.lux3
-rw-r--r--stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux15
-rw-r--r--stdlib/test/test/lux/concurrency/stm.lux5
9 files changed, 124 insertions, 94 deletions
diff --git a/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux b/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux
index ea5215a55..3272f8a29 100644
--- a/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux
+++ b/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux
@@ -280,14 +280,6 @@
(bundle.install "write" box::write)
)))
-(def: bundle::process
- Bundle
- (<| (bundle.prefix "process")
- (|> bundle.empty
- (bundle.install "parallelism" (nullary Nat))
- (bundle.install "schedule" (binary Nat (type (IO Any)) Any))
- )))
-
(def: #export bundle
Bundle
(<| (bundle.prefix "lux")
@@ -299,6 +291,5 @@
(dict.merge bundle::text)
(dict.merge bundle::array)
(dict.merge bundle::box)
- (dict.merge bundle::process)
- (dict.merge bundle::io))
- ))
+ (dict.merge bundle::io)
+ )))
diff --git a/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux b/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux
index 3aa2b453d..d1576248d 100644
--- a/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux
+++ b/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux
@@ -290,19 +290,6 @@
(bundle.install "read" (unary box::read))
(bundle.install "write" (binary box::write)))))
-## [[Processes]]
-(def: (process::parallelism-level [])
- Nullary
- (_.int +1))
-
-(def: bundle::process
- Bundle
- (<| (bundle.prefix "process")
- (|> bundle.empty
- (bundle.install "parallelism-level" (nullary process::parallelism-level))
- (bundle.install "schedule" (binary (product.uncurry runtime.process//schedule)))
- )))
-
## [Bundles]
(def: #export bundle
Bundle
@@ -315,5 +302,4 @@
(dict.merge bundle::array)
(dict.merge bundle::io)
(dict.merge bundle::box)
- (dict.merge bundle::process)
)))
diff --git a/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux b/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux
index 0e082a5d8..43748c3b1 100644
--- a/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux
+++ b/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux
@@ -302,43 +302,6 @@
(def: runtime//io
(_.begin (list @@io//current-time)))
-(def: process//incoming
- Var
- (_.var (name.normalize "process//incoming")))
-
-(runtime: (process//loop _)
- (_.when (_.not/1 (_.null?/1 process//incoming))
- (with-vars [queue process]
- (_.let (list [queue process//incoming])
- (_.begin (list (_.set! process//incoming (_.list/* (list)))
- (_.map/2 (_.lambda [(list process) #.None]
- (_.apply/1 process ..unit))
- queue)
- (process//loop ..unit)))))))
-
-(runtime: (process//schedule milli-seconds procedure)
- (let [process//future (function (_ process)
- (_.set! process//incoming (_.cons/2 process process//incoming)))]
- (_.begin
- (list
- (_.if (_.=/2 (_.int +0) milli-seconds)
- (process//future procedure)
- (with-vars [@start @process @now @ignored]
- (_.let (list [@start (io//current-time ..unit)])
- (_.letrec (list [@process (_.lambda [(list) (#.Some @ignored)]
- (_.let (list [@now (io//current-time ..unit)])
- (_.if (|> @now (_.-/2 @start) (_.>=/2 milli-seconds))
- (_.apply/1 procedure ..unit)
- (process//future @process))))])
- (process//future @process)))))
- ..unit))))
-
-(def: runtime//process
- Computation
- (_.begin (list (_.define process//incoming [(list) #.None] (_.list/* (list)))
- @@process//loop
- @@process//schedule)))
-
(def: runtime
Computation
(_.begin (list @@slice
@@ -349,7 +312,6 @@
runtime//array
runtime//box
runtime//io
- runtime//process
)))
(def: #export translate
diff --git a/stdlib/source/lux/concurrency/process.lux b/stdlib/source/lux/concurrency/process.lux
new file mode 100644
index 000000000..2ff56c395
--- /dev/null
+++ b/stdlib/source/lux/concurrency/process.lux
@@ -0,0 +1,104 @@
+(.module:
+ [lux #*
+ [control
+ ["." monad (#+ do)]
+ ["ex" exception (#+ exception:)]]
+ [data
+ [collection
+ ["." list]]]
+ [compiler
+ ["." host]]
+ ["." io (#+ IO io)]
+ [host (#+ import: object)]]
+ [//
+ ["." atom (#+ Atom)]])
+
+(`` (for {(~~ (static host.jvm))
+ (as-is (import: java/lang/Object)
+
+ (import: java/lang/Runtime
+ (#static getRuntime [] Runtime)
+ (availableProcessors [] int))
+
+ (import: java/lang/Runnable)
+
+ (import: java/util/concurrent/TimeUnit
+ (#enum MILLISECONDS))
+
+ (import: java/util/concurrent/Executor
+ (execute [Runnable] #io void))
+
+ (import: (java/util/concurrent/ScheduledFuture a))
+
+ (import: java/util/concurrent/ScheduledThreadPoolExecutor
+ (new [int])
+ (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))}
+
+ (type: Process
+ {#creation Nat
+ #delay Nat
+ #action (IO Any)})
+ ))
+
+(def: #export parallelism
+ Nat
+ (`` (for {(~~ (static host.jvm))
+ (|> (Runtime::getRuntime [])
+ (Runtime::availableProcessors [])
+ .nat)}
+ 1)))
+
+(def: runner
+ (`` (for {(~~ (static host.jvm))
+ (ScheduledThreadPoolExecutor::new [(.int ..parallelism)])}
+
+ (: (Atom (List Process))
+ (atom.atom (list))))))
+
+(def: #export (schedule milli-seconds action)
+ (-> Nat (IO Any) (IO Any))
+ (`` (for {(~~ (static host.jvm))
+ (let [runnable (object [] [Runnable]
+ []
+ (Runnable [] (run) void
+ (io.run action)))]
+ (case milli-seconds
+ 0 (Executor::execute [runnable]
+ runner)
+ _ (ScheduledThreadPoolExecutor::schedule [runnable (.int milli-seconds) TimeUnit::MILLISECONDS]
+ runner)))}
+ (atom.update (|>> (#.Cons {#creation ("lux io current-time")
+ #delay milli-seconds
+ #action action}))
+ runner))))
+
+(`` (for {(~~ (static host.jvm))
+ (as-is)}
+ (as-is (exception: #export (cannot-continue-running-processes) "")
+
+ (def: #export run!
+ (IO Any)
+ (loop [_ []]
+ (do io.Monad<IO>
+ [processes (atom.read runner)]
+ (case processes
+ ## And... we're done!
+ #.Nil
+ (wrap [])
+
+ _
+ (do @
+ [#let [now ("lux io current-time")
+ [ready pending] (list.partition (function (_ process)
+ (|> (get@ #creation process)
+ (n/+ (get@ #delay process))
+ (n/<= now)))
+ processes)]
+ swapped? (atom.compare-and-swap! processes pending runner)]
+ (if swapped?
+ (do @
+ [_ (monad.seq @ ready)]
+ (recur []))
+ (error! (ex.construct cannot-continue-running-processes []))))
+ ))))
+ )))
diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux
index 7062c2082..24f26a24c 100644
--- a/stdlib/source/lux/concurrency/promise.lux
+++ b/stdlib/source/lux/concurrency/promise.lux
@@ -9,13 +9,10 @@
["." function]
[type
abstract]
- ["." io (#+ IO io)]
- [concurrency
- ["." atom (#+ Atom atom)]]])
-
-(def: #export parallelism
- Nat
- ("lux process parallelism"))
+ ["." io (#+ IO io)]]
+ [//
+ ["." process]
+ ["." atom (#+ Atom atom)]])
(abstract: #export (Promise a)
{#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."}
@@ -147,9 +144,11 @@
{#.doc "Runs an I/O computation on its own process (after a specified delay) and returns a Promise that will eventually host its result."}
(All [a] (-> Nat (IO a) (Promise a)))
(let [!out (promise #.None)]
- (exec ("lux process schedule" millis-delay
- (io (io.run (resolve (io.run computation)
- !out))))
+ (exec (|> (do io.Monad<IO>
+ [value computation]
+ (resolve value !out))
+ (process.schedule millis-delay)
+ io.run)
!out)))
(def: #export future
diff --git a/stdlib/source/lux/host.jvm.lux b/stdlib/source/lux/host.jvm.lux
index 5b91fc526..98cf40bfc 100644
--- a/stdlib/source/lux/host.jvm.lux
+++ b/stdlib/source/lux/host.jvm.lux
@@ -1370,10 +1370,11 @@
{constructor-args (constructor-args^ imports class-vars)}
{methods (p.some (overriden-method-def^ imports))})
{#.doc (doc "Allows defining anonymous classes."
- "The 1st tuple corresponds to parent interfaces."
- "The 2nd tuple corresponds to arguments to the super class constructor."
+ "The 1st tuple corresponds to class-level type-variables."
+ "The 2nd tuple corresponds to parent interfaces."
+ "The 3rd tuple corresponds to arguments to the super class constructor."
"An optional super-class can be specified before the 1st tuple. If not specified, java.lang.Object will be assumed."
- (object [Runnable]
+ (object [] [Runnable]
[]
(Runnable [] (run) void
(exec (do-something some-value)
diff --git a/stdlib/source/lux/test.lux b/stdlib/source/lux/test.lux
index 26e99fbce..5b214579d 100644
--- a/stdlib/source/lux/test.lux
+++ b/stdlib/source/lux/test.lux
@@ -20,6 +20,7 @@
["s" syntax (#+ syntax: Syntax)]
["." code]]
[concurrency
+ ["." process]
["." promise (#+ Promise)]]
["." io (#+ IO io)]])
@@ -240,7 +241,7 @@
(~+ (|> tests
(list/map (function (_ [module-name test desc])
(` [(~ (code.text module-name)) (~ (code.identifier [module-name test])) (~ (code.text desc))])))
- (list.split-all promise.parallelism)
+ (list.split-all process.parallelism)
(list/map (function (_ group)
(list (` [(~ g!successes) (~ g!failures)]) (` ((~! run') (list (~+ group))))
(' #let) (` [(~ g!total-successes) (n/+ (~ g!successes) (~ g!total-successes))
diff --git a/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux b/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux
index 8d3e8b8fa..9d733912e 100644
--- a/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux
+++ b/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux
@@ -245,21 +245,6 @@
#0)))
))))
-(context: "Process procedures"
- (<| (times 100)
- (do @
- [[primT primC] _primitive.primitive
- timeC (|> r.nat (:: @ map code.nat))]
- ($_ seq
- (test "Can query the level of concurrency."
- (check-success+ "lux process parallelism-level" (list) Nat))
- (test "Can schedule an IO computation to run concurrently at some future time."
- (check-success+ "lux process schedule"
- (list timeC
- (` ([(~' _) (~' _)] (~ primC))))
- Any))
- ))))
-
(context: "IO procedures"
(<| (times 100)
(do @
diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux
index 3506146f4..ee84f193e 100644
--- a/stdlib/test/test/lux/concurrency/stm.lux
+++ b/stdlib/test/test/lux/concurrency/stm.lux
@@ -10,6 +10,7 @@
[concurrency
["." atom (#+ Atom atom)]
["&" stm]
+ ["." process]
["." promise]
["." frp (#+ Channel)]]
[math
@@ -62,7 +63,7 @@
(list.reverse changes)))))
(wrap (let [_concurrency-var (&.var 0)]
(do promise.Monad<Promise>
- [_ (|> promise.parallelism
+ [_ (|> process.parallelism
(list.n/range 1)
(list/map (function (_ _)
(|> iterations-per-process
@@ -71,6 +72,6 @@
(M.seq @))
last-val (&.commit (&.read _concurrency-var))]
(assert "Can modify STM vars concurrently from multiple threads."
- (|> promise.parallelism
+ (|> process.parallelism
(n/* iterations-per-process)
(n/= last-val))))))))