From 7e44ee8a2cfb14e35f6283a9eb8d6a2ddc8bd99a Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 31 Jul 2018 20:22:15 -0400 Subject: Now implementing process functionality in stdlib instead of the compiler. --- .../default/phase/extension/analysis/common.lux | 13 +-- .../translation/scheme/extension/common.jvm.lux | 14 --- .../phase/translation/scheme/runtime.jvm.lux | 38 -------- stdlib/source/lux/concurrency/process.lux | 104 +++++++++++++++++++++ stdlib/source/lux/concurrency/promise.lux | 19 ++-- stdlib/source/lux/host.jvm.lux | 7 +- stdlib/source/lux/test.lux | 3 +- .../default/phase/analysis/procedure/common.lux | 15 --- stdlib/test/test/lux/concurrency/stm.lux | 5 +- 9 files changed, 124 insertions(+), 94 deletions(-) create mode 100644 stdlib/source/lux/concurrency/process.lux (limited to 'stdlib') diff --git a/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux b/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux index ea5215a55..3272f8a29 100644 --- a/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux +++ b/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux @@ -280,14 +280,6 @@ (bundle.install "write" box::write) ))) -(def: bundle::process - Bundle - (<| (bundle.prefix "process") - (|> bundle.empty - (bundle.install "parallelism" (nullary Nat)) - (bundle.install "schedule" (binary Nat (type (IO Any)) Any)) - ))) - (def: #export bundle Bundle (<| (bundle.prefix "lux") @@ -299,6 +291,5 @@ (dict.merge bundle::text) (dict.merge bundle::array) (dict.merge bundle::box) - (dict.merge bundle::process) - (dict.merge bundle::io)) - )) + (dict.merge bundle::io) + ))) diff --git a/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux b/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux index 3aa2b453d..d1576248d 100644 --- a/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux +++ b/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux @@ -290,19 +290,6 @@ (bundle.install "read" (unary box::read)) (bundle.install "write" (binary box::write))))) -## [[Processes]] -(def: (process::parallelism-level []) - Nullary - (_.int +1)) - -(def: bundle::process - Bundle - (<| (bundle.prefix "process") - (|> bundle.empty - (bundle.install "parallelism-level" (nullary process::parallelism-level)) - (bundle.install "schedule" (binary (product.uncurry runtime.process//schedule))) - ))) - ## [Bundles] (def: #export bundle Bundle @@ -315,5 +302,4 @@ (dict.merge bundle::array) (dict.merge bundle::io) (dict.merge bundle::box) - (dict.merge bundle::process) ))) diff --git a/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux b/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux index 0e082a5d8..43748c3b1 100644 --- a/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux +++ b/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux @@ -302,43 +302,6 @@ (def: runtime//io (_.begin (list @@io//current-time))) -(def: process//incoming - Var - (_.var (name.normalize "process//incoming"))) - -(runtime: (process//loop _) - (_.when (_.not/1 (_.null?/1 process//incoming)) - (with-vars [queue process] - (_.let (list [queue process//incoming]) - (_.begin (list (_.set! process//incoming (_.list/* (list))) - (_.map/2 (_.lambda [(list process) #.None] - (_.apply/1 process ..unit)) - queue) - (process//loop ..unit))))))) - -(runtime: (process//schedule milli-seconds procedure) - (let [process//future (function (_ process) - (_.set! process//incoming (_.cons/2 process process//incoming)))] - (_.begin - (list - (_.if (_.=/2 (_.int +0) milli-seconds) - (process//future procedure) - (with-vars [@start @process @now @ignored] - (_.let (list [@start (io//current-time ..unit)]) - (_.letrec (list [@process (_.lambda [(list) (#.Some @ignored)] - (_.let (list [@now (io//current-time ..unit)]) - (_.if (|> @now (_.-/2 @start) (_.>=/2 milli-seconds)) - (_.apply/1 procedure ..unit) - (process//future @process))))]) - (process//future @process))))) - ..unit)))) - -(def: runtime//process - Computation - (_.begin (list (_.define process//incoming [(list) #.None] (_.list/* (list))) - @@process//loop - @@process//schedule))) - (def: runtime Computation (_.begin (list @@slice @@ -349,7 +312,6 @@ runtime//array runtime//box runtime//io - runtime//process ))) (def: #export translate diff --git a/stdlib/source/lux/concurrency/process.lux b/stdlib/source/lux/concurrency/process.lux new file mode 100644 index 000000000..2ff56c395 --- /dev/null +++ b/stdlib/source/lux/concurrency/process.lux @@ -0,0 +1,104 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + ["ex" exception (#+ exception:)]] + [data + [collection + ["." list]]] + [compiler + ["." host]] + ["." io (#+ IO io)] + [host (#+ import: object)]] + [// + ["." atom (#+ Atom)]]) + +(`` (for {(~~ (static host.jvm)) + (as-is (import: java/lang/Object) + + (import: java/lang/Runtime + (#static getRuntime [] Runtime) + (availableProcessors [] int)) + + (import: java/lang/Runnable) + + (import: java/util/concurrent/TimeUnit + (#enum MILLISECONDS)) + + (import: java/util/concurrent/Executor + (execute [Runnable] #io void)) + + (import: (java/util/concurrent/ScheduledFuture a)) + + (import: java/util/concurrent/ScheduledThreadPoolExecutor + (new [int]) + (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} + + (type: Process + {#creation Nat + #delay Nat + #action (IO Any)}) + )) + +(def: #export parallelism + Nat + (`` (for {(~~ (static host.jvm)) + (|> (Runtime::getRuntime []) + (Runtime::availableProcessors []) + .nat)} + 1))) + +(def: runner + (`` (for {(~~ (static host.jvm)) + (ScheduledThreadPoolExecutor::new [(.int ..parallelism)])} + + (: (Atom (List Process)) + (atom.atom (list)))))) + +(def: #export (schedule milli-seconds action) + (-> Nat (IO Any) (IO Any)) + (`` (for {(~~ (static host.jvm)) + (let [runnable (object [] [Runnable] + [] + (Runnable [] (run) void + (io.run action)))] + (case milli-seconds + 0 (Executor::execute [runnable] + runner) + _ (ScheduledThreadPoolExecutor::schedule [runnable (.int milli-seconds) TimeUnit::MILLISECONDS] + runner)))} + (atom.update (|>> (#.Cons {#creation ("lux io current-time") + #delay milli-seconds + #action action})) + runner)))) + +(`` (for {(~~ (static host.jvm)) + (as-is)} + (as-is (exception: #export (cannot-continue-running-processes) "") + + (def: #export run! + (IO Any) + (loop [_ []] + (do io.Monad + [processes (atom.read runner)] + (case processes + ## And... we're done! + #.Nil + (wrap []) + + _ + (do @ + [#let [now ("lux io current-time") + [ready pending] (list.partition (function (_ process) + (|> (get@ #creation process) + (n/+ (get@ #delay process)) + (n/<= now))) + processes)] + swapped? (atom.compare-and-swap! processes pending runner)] + (if swapped? + (do @ + [_ (monad.seq @ ready)] + (recur [])) + (error! (ex.construct cannot-continue-running-processes [])))) + )))) + ))) diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux index 7062c2082..24f26a24c 100644 --- a/stdlib/source/lux/concurrency/promise.lux +++ b/stdlib/source/lux/concurrency/promise.lux @@ -9,13 +9,10 @@ ["." function] [type abstract] - ["." io (#+ IO io)] - [concurrency - ["." atom (#+ Atom atom)]]]) - -(def: #export parallelism - Nat - ("lux process parallelism")) + ["." io (#+ IO io)]] + [// + ["." process] + ["." atom (#+ Atom atom)]]) (abstract: #export (Promise a) {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} @@ -147,9 +144,11 @@ {#.doc "Runs an I/O computation on its own process (after a specified delay) and returns a Promise that will eventually host its result."} (All [a] (-> Nat (IO a) (Promise a))) (let [!out (promise #.None)] - (exec ("lux process schedule" millis-delay - (io (io.run (resolve (io.run computation) - !out)))) + (exec (|> (do io.Monad + [value computation] + (resolve value !out)) + (process.schedule millis-delay) + io.run) !out))) (def: #export future diff --git a/stdlib/source/lux/host.jvm.lux b/stdlib/source/lux/host.jvm.lux index 5b91fc526..98cf40bfc 100644 --- a/stdlib/source/lux/host.jvm.lux +++ b/stdlib/source/lux/host.jvm.lux @@ -1370,10 +1370,11 @@ {constructor-args (constructor-args^ imports class-vars)} {methods (p.some (overriden-method-def^ imports))}) {#.doc (doc "Allows defining anonymous classes." - "The 1st tuple corresponds to parent interfaces." - "The 2nd tuple corresponds to arguments to the super class constructor." + "The 1st tuple corresponds to class-level type-variables." + "The 2nd tuple corresponds to parent interfaces." + "The 3rd tuple corresponds to arguments to the super class constructor." "An optional super-class can be specified before the 1st tuple. If not specified, java.lang.Object will be assumed." - (object [Runnable] + (object [] [Runnable] [] (Runnable [] (run) void (exec (do-something some-value) diff --git a/stdlib/source/lux/test.lux b/stdlib/source/lux/test.lux index 26e99fbce..5b214579d 100644 --- a/stdlib/source/lux/test.lux +++ b/stdlib/source/lux/test.lux @@ -20,6 +20,7 @@ ["s" syntax (#+ syntax: Syntax)] ["." code]] [concurrency + ["." process] ["." promise (#+ Promise)]] ["." io (#+ IO io)]]) @@ -240,7 +241,7 @@ (~+ (|> tests (list/map (function (_ [module-name test desc]) (` [(~ (code.text module-name)) (~ (code.identifier [module-name test])) (~ (code.text desc))]))) - (list.split-all promise.parallelism) + (list.split-all process.parallelism) (list/map (function (_ group) (list (` [(~ g!successes) (~ g!failures)]) (` ((~! run') (list (~+ group)))) (' #let) (` [(~ g!total-successes) (n/+ (~ g!successes) (~ g!total-successes)) diff --git a/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux b/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux index 8d3e8b8fa..9d733912e 100644 --- a/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux +++ b/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux @@ -245,21 +245,6 @@ #0))) )))) -(context: "Process procedures" - (<| (times 100) - (do @ - [[primT primC] _primitive.primitive - timeC (|> r.nat (:: @ map code.nat))] - ($_ seq - (test "Can query the level of concurrency." - (check-success+ "lux process parallelism-level" (list) Nat)) - (test "Can schedule an IO computation to run concurrently at some future time." - (check-success+ "lux process schedule" - (list timeC - (` ([(~' _) (~' _)] (~ primC)))) - Any)) - )))) - (context: "IO procedures" (<| (times 100) (do @ diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux index 3506146f4..ee84f193e 100644 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ b/stdlib/test/test/lux/concurrency/stm.lux @@ -10,6 +10,7 @@ [concurrency ["." atom (#+ Atom atom)] ["&" stm] + ["." process] ["." promise] ["." frp (#+ Channel)]] [math @@ -62,7 +63,7 @@ (list.reverse changes))))) (wrap (let [_concurrency-var (&.var 0)] (do promise.Monad - [_ (|> promise.parallelism + [_ (|> process.parallelism (list.n/range 1) (list/map (function (_ _) (|> iterations-per-process @@ -71,6 +72,6 @@ (M.seq @)) last-val (&.commit (&.read _concurrency-var))] (assert "Can modify STM vars concurrently from multiple threads." - (|> promise.parallelism + (|> process.parallelism (n/* iterations-per-process) (n/= last-val)))))))) -- cgit v1.2.3