From 7e44ee8a2cfb14e35f6283a9eb8d6a2ddc8bd99a Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 31 Jul 2018 20:22:15 -0400 Subject: Now implementing process functionality in stdlib instead of the compiler. --- luxc/src/lux/analyser/proc/common.clj | 19 ---- luxc/src/lux/compiler/jvm.clj | 3 +- luxc/src/lux/compiler/jvm/proc/common.clj | 28 ------ luxc/src/lux/compiler/jvm/rt.clj | 90 +----------------- .../common-lisp/procedure/common.jvm.lux | 19 +--- .../lang/translation/common-lisp/runtime.jvm.lux | 44 +-------- .../lang/translation/js/procedure/common.jvm.lux | 20 ---- .../lang/translation/jvm/procedure/common.jvm.lux | 23 ----- .../luxc/lang/translation/jvm/runtime.jvm.lux | 92 +----------------- .../lang/translation/lua/procedure/common.jvm.lux | 18 ---- .../luxc/lang/translation/lua/runtime.jvm.lux | 45 --------- .../lang/translation/php/procedure/common.jvm.lux | 18 ---- .../luxc/lang/translation/php/runtime.jvm.lux | 25 ----- .../translation/python/procedure/common.jvm.lux | 18 ---- .../luxc/lang/translation/python/runtime.jvm.lux | 19 ---- .../lang/translation/r/procedure/common.jvm.lux | 18 ---- .../source/luxc/lang/translation/r/runtime.jvm.lux | 57 ----------- .../lang/translation/ruby/procedure/common.jvm.lux | 18 ---- .../luxc/lang/translation/ruby/runtime.jvm.lux | 14 --- .../test/test/luxc/lang/translation/common.lux | 31 ------ .../default/phase/extension/analysis/common.lux | 13 +-- .../translation/scheme/extension/common.jvm.lux | 14 --- .../phase/translation/scheme/runtime.jvm.lux | 38 -------- stdlib/source/lux/concurrency/process.lux | 104 +++++++++++++++++++++ stdlib/source/lux/concurrency/promise.lux | 19 ++-- stdlib/source/lux/host.jvm.lux | 7 +- stdlib/source/lux/test.lux | 3 +- .../default/phase/analysis/procedure/common.lux | 15 --- stdlib/test/test/lux/concurrency/stm.lux | 5 +- 29 files changed, 132 insertions(+), 705 deletions(-) create mode 100644 stdlib/source/lux/concurrency/process.lux diff --git a/luxc/src/lux/analyser/proc/common.clj b/luxc/src/lux/analyser/proc/common.clj index 38f6bc6c3..df6fb0051 100644 --- a/luxc/src/lux/analyser/proc/common.clj +++ b/luxc/src/lux/analyser/proc/common.clj @@ -322,22 +322,6 @@ (return (&/|list (&&/|meta exo-type _cursor (&&/$proc (&/T ["box" "write"]) (&/|list valueA boxA) (&/|list))))))))))) -(defn ^:private analyse-process-parallelism [analyse exo-type ?values] - (|do [:let [(&/$Nil) ?values] - _ (&type/check exo-type &type/Nat) - _cursor &/cursor] - (return (&/|list (&&/|meta exo-type _cursor - (&&/$proc (&/T ["process" "parallelism"]) (&/|list) (&/|list))))))) - -(defn ^:private analyse-process-schedule [analyse exo-type ?values] - (|do [:let [(&/$Cons ?milliseconds (&/$Cons ?procedure (&/$Nil))) ?values] - =milliseconds (&&/analyse-1 analyse &type/Nat ?milliseconds) - =procedure (&&/analyse-1 analyse (&/$Apply &type/Any &type/IO) ?procedure) - _ (&type/check exo-type &type/Any) - _cursor &/cursor] - (return (&/|list (&&/|meta exo-type _cursor - (&&/$proc (&/T ["process" "schedule"]) (&/|list =milliseconds =procedure) (&/|list))))))) - (defn analyse-proc [analyse exo-type proc ?values] (try (case proc "lux is" (analyse-lux-is analyse exo-type ?values) @@ -397,9 +381,6 @@ "lux frac max" (analyse-frac-max analyse exo-type ?values) "lux frac int" (analyse-frac-int analyse exo-type ?values) - "lux process parallelism" (analyse-process-parallelism analyse exo-type ?values) - "lux process schedule" (analyse-process-schedule analyse exo-type ?values) - ;; else (&/fail-with-loc (str "[Analyser Error] Unknown host procedure: " proc))) (catch Exception ex diff --git a/luxc/src/lux/compiler/jvm.clj b/luxc/src/lux/compiler/jvm.clj index 60a2c7d2e..8e2966b52 100644 --- a/luxc/src/lux/compiler/jvm.clj +++ b/luxc/src/lux/compiler/jvm.clj @@ -197,8 +197,7 @@ (.visitSource file-name nil))] _ (if (= "lux" name) (|do [_ &&rt/compile-Function-class - _ &&rt/compile-LuxRT-class - _ &&rt/compile-LuxRunnable-class] + _ &&rt/compile-LuxRT-class] (return nil)) (return nil))] (fn [state] diff --git a/luxc/src/lux/compiler/jvm/proc/common.clj b/luxc/src/lux/compiler/jvm/proc/common.clj index 6f05b3e52..6e89155bc 100644 --- a/luxc/src/lux/compiler/jvm/proc/common.clj +++ b/luxc/src/lux/compiler/jvm/proc/common.clj @@ -505,28 +505,6 @@ (.visitLdcInsn &/unit-tag))]] (return nil))) -(defn ^:private compile-process-parallelism [compile ?values special-args] - (|do [:let [(&/$Nil) ?values] - ^MethodVisitor *writer* &/get-writer - :let [_ (doto *writer* - (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "concurrency_level" "I") - (.visitInsn Opcodes/I2L) - &&/wrap-long)]] - (return nil))) - -(defn ^:private compile-process-schedule [compile ?values special-args] - (|do [:let [(&/$Cons ?milliseconds (&/$Cons ?procedure (&/$Nil))) ?values] - ^MethodVisitor *writer* &/get-writer - _ (compile ?milliseconds) - :let [_ (doto *writer* - &&/unwrap-long)] - _ (compile ?procedure) - :let [_ (doto *writer* - (.visitTypeInsn Opcodes/CHECKCAST "lux/Function"))] - :let [_ (doto *writer* - (.visitMethodInsn Opcodes/INVOKESTATIC "lux/LuxRT" "schedule" "(JLlux/Function;)Ljava/lang/Object;"))]] - (return nil))) - (defn compile-proc [compile category proc ?values special-args] (case category "lux" @@ -607,11 +585,5 @@ "write" (compile-box-write compile ?values special-args) ) - "process" - (case proc - "parallelism" (compile-process-parallelism compile ?values special-args) - "schedule" (compile-process-schedule compile ?values special-args) - ) - ;; else (&/fail-with-loc (str "[Compiler Error] Unknown procedure: " [category proc])))) diff --git a/luxc/src/lux/compiler/jvm/rt.clj b/luxc/src/lux/compiler/jvm/rt.clj index 0f9c4cb86..7dd08dc62 100644 --- a/luxc/src/lux/compiler/jvm/rt.clj +++ b/luxc/src/lux/compiler/jvm/rt.clj @@ -66,36 +66,6 @@ (&&/save-class! (second (string/split &&/function-class #"/")) (.toByteArray (doto =class .visitEnd))))) -;; Custom Runnable -(def compile-LuxRunnable-class - (|do [_ (return nil) - :let [=class (doto (new ClassWriter ClassWriter/COMPUTE_MAXS) - (.visit &host/bytecode-version (+ Opcodes/ACC_PUBLIC Opcodes/ACC_FINAL Opcodes/ACC_SUPER) - "lux/LuxRunnable" nil "java/lang/Object" (into-array String ["java/lang/Runnable"]))) - _ (doto (.visitField =class Opcodes/ACC_PUBLIC "procedure" "Llux/Function;" nil nil) - (.visitEnd)) - _ (doto (.visitMethod =class Opcodes/ACC_PUBLIC init-method "(Llux/Function;)V" nil nil) - (.visitCode) - (.visitVarInsn Opcodes/ALOAD 0) - (.visitMethodInsn Opcodes/INVOKESPECIAL "java/lang/Object" init-method "()V") - (.visitVarInsn Opcodes/ALOAD 0) - (.visitVarInsn Opcodes/ALOAD 1) - (.visitFieldInsn Opcodes/PUTFIELD "lux/LuxRunnable" "procedure" "Llux/Function;") - (.visitInsn Opcodes/RETURN) - (.visitMaxs 0 0) - (.visitEnd)) - _ (doto (.visitMethod =class Opcodes/ACC_PUBLIC "run" "()V" nil nil) - (.visitCode) - (.visitVarInsn Opcodes/ALOAD 0) - (.visitFieldInsn Opcodes/GETFIELD "lux/LuxRunnable" "procedure" "Llux/Function;") - (.visitInsn Opcodes/ACONST_NULL) - (.visitMethodInsn Opcodes/INVOKEVIRTUAL "lux/Function" &&/apply-method (&&/apply-signature 1)) - (.visitInsn Opcodes/RETURN) - (.visitMaxs 0 0) - (.visitEnd))]] - (&&/save-class! "LuxRunnable" - (.toByteArray (doto =class .visitEnd))))) - ;; Runtime infrastructure (defn ^:private compile-LuxRT-adt-methods [^ClassWriter =class] (|let [_ (let [$begin (new Label) @@ -415,63 +385,6 @@ (.visitEnd))) nil)) -(defn ^:private compile-LuxRT-process-methods [^ClassWriter =class] - (do (doto (.visitField =class - (+ Opcodes/ACC_PUBLIC Opcodes/ACC_FINAL Opcodes/ACC_STATIC) - "concurrency_level" "I" nil nil) - (.visitEnd)) - (doto (.visitField =class - (+ Opcodes/ACC_PUBLIC Opcodes/ACC_FINAL Opcodes/ACC_STATIC) - "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;" nil nil) - (.visitEnd)) - (doto (.visitMethod =class Opcodes/ACC_STATIC "" "()V" nil nil) - (.visitCode) - ;; concurrency_level - (.visitMethodInsn Opcodes/INVOKESTATIC "java/lang/Runtime" "getRuntime" "()Ljava/lang/Runtime;") - (.visitMethodInsn Opcodes/INVOKEVIRTUAL "java/lang/Runtime" "availableProcessors" "()I") - (.visitFieldInsn Opcodes/PUTSTATIC "lux/LuxRT" "concurrency_level" "I") - ;; executor - (.visitTypeInsn Opcodes/NEW "java/util/concurrent/ScheduledThreadPoolExecutor") - (.visitInsn Opcodes/DUP) - (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "concurrency_level" "I") - (.visitMethodInsn Opcodes/INVOKESPECIAL "java/util/concurrent/ScheduledThreadPoolExecutor" "" "(I)V") - (.visitFieldInsn Opcodes/PUTSTATIC "lux/LuxRT" "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;") - ;; DONE - (.visitInsn Opcodes/RETURN) - (.visitMaxs 0 0) - (.visitEnd)) - (let [$immediately (new Label)] - (doto (.visitMethod =class (+ Opcodes/ACC_PUBLIC Opcodes/ACC_STATIC) "schedule" "(JLlux/Function;)Ljava/lang/Object;" nil nil) - (.visitCode) - (.visitVarInsn Opcodes/LLOAD 0) - (.visitLdcInsn (long 0)) - (.visitInsn Opcodes/LCMP) - (.visitJumpInsn Opcodes/IFEQ $immediately) - ;; Schedule for later - (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;") - (.visitTypeInsn Opcodes/NEW "lux/LuxRunnable") - (.visitInsn Opcodes/DUP) - (.visitVarInsn Opcodes/ALOAD 2) - (.visitMethodInsn Opcodes/INVOKESPECIAL "lux/LuxRunnable" "" "(Llux/Function;)V") - (.visitVarInsn Opcodes/LLOAD 0) - (.visitFieldInsn Opcodes/GETSTATIC "java/util/concurrent/TimeUnit" "MILLISECONDS" "Ljava/util/concurrent/TimeUnit;") - (.visitMethodInsn Opcodes/INVOKEVIRTUAL "java/util/concurrent/ScheduledThreadPoolExecutor" "schedule" "(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture;") - (.visitLdcInsn &/unit-tag) - (.visitInsn Opcodes/ARETURN) - ;; Run immediately - (.visitLabel $immediately) - (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;") - (.visitTypeInsn Opcodes/NEW "lux/LuxRunnable") - (.visitInsn Opcodes/DUP) - (.visitVarInsn Opcodes/ALOAD 2) - (.visitMethodInsn Opcodes/INVOKESPECIAL "lux/LuxRunnable" "" "(Llux/Function;)V") - (.visitMethodInsn Opcodes/INVOKEINTERFACE "java/util/concurrent/Executor" "execute" "(Ljava/lang/Runnable;)V") - (.visitLdcInsn &/unit-tag) - (.visitInsn Opcodes/ARETURN) - (.visitMaxs 0 0) - (.visitEnd))) - nil)) - (def compile-LuxRT-class (|do [_ (return nil) :let [full-name &&/lux-utils-class @@ -577,7 +490,6 @@ (compile-LuxRT-adt-methods) (compile-LuxRT-int-methods) (compile-LuxRT-frac-methods) - (compile-LuxRT-text-methods) - (compile-LuxRT-process-methods))]] + (compile-LuxRT-text-methods))]] (&&/save-class! (second (string/split &&/lux-utils-class #"/")) (.toByteArray (doto =class .visitEnd))))) diff --git a/new-luxc/source/luxc/lang/translation/common-lisp/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/common-lisp/procedure/common.jvm.lux index 585292af0..54a4336fb 100644 --- a/new-luxc/source/luxc/lang/translation/common-lisp/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/common-lisp/procedure/common.jvm.lux @@ -350,22 +350,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (_.int 1)) - -(def: (process//schedule [milli-secondsO procedureO]) - Binary - (runtimeT.process//schedule milli-secondsO procedureO)) - -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule))))) - ## [Bundles] (def: #export procedures Bundle @@ -378,5 +362,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs)) - )) + ))) diff --git a/new-luxc/source/luxc/lang/translation/common-lisp/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/common-lisp/runtime.jvm.lux index c54fde7ce..3e8d2c514 100644 --- a/new-luxc/source/luxc/lang/translation/common-lisp/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/common-lisp/runtime.jvm.lux @@ -296,46 +296,6 @@ (_.progn (list @@io//exit @@io//current-time))) -(def: process//incoming - SVar - (_.var (lang.normalize-name "process//incoming"))) - -(runtime: (process//loop _) - (_.if (_.not (_.null (@@ process//incoming))) - (with-vars [queue process] - (_.let (list [queue (@@ process//incoming)]) - (_.progn (list (_.setq! process//incoming (_.list (list))) - (_.map/3 _.nil - (_.lambda (_.poly (list process)) - (_.funcall (list ..unit) (@@ process))) - (@@ queue)) - (process//loop ..unit))))) - ..unit)) - -(runtime: (process//schedule milli-seconds procedure) - (_.progn - (list - (_.if (_.= (_.int 0) (@@ milli-seconds)) - (_.setq! process//incoming (_.cons (@@ procedure) (@@ process//incoming))) - (with-vars [start scheduled now diff _ignored] - (_.let (list [start (io//current-time ..unit)]) - (_.labels (list [scheduled [(_.poly+ (list) _ignored) - (_.let (list [now (io//current-time ..unit)] - [diff (|> (@@ now) (_.- (@@ start)))]) - (_.if (|> (@@ diff) (_.>= (@@ milli-seconds))) - (_.funcall (list ..unit) (@@ procedure)) - (process//schedule (|> (@@ milli-seconds) (_.- (@@ diff))) - (_.function (@@ scheduled)))))]]) - (_.setq! process//incoming (_.cons (_.function (@@ scheduled)) - (@@ process//incoming))))))) - ..unit))) - -(def: runtime//process - Runtime - (_.progn (list (_.defparameter process//incoming (_.list (list))) - @@process//loop - @@process//schedule))) - (def: runtime Runtime (_.progn (list runtime//lux @@ -344,9 +304,7 @@ runtime//text runtime//array runtime//box - runtime//io - runtime//process)) - ) + runtime//io))) (def: #export artifact Text (format prefix //.file-extension)) diff --git a/new-luxc/source/luxc/lang/translation/js/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/js/procedure/common.jvm.lux index cca49372b..df1be8508 100644 --- a/new-luxc/source/luxc/lang/translation/js/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/js/procedure/common.jvm.lux @@ -320,17 +320,6 @@ Binary (void (format (box//read boxJS) " = " valueJS))) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (frac//to-int "1")) - -(def: (process//schedule [milli-secondsJS procedureJS]) - Binary - (format "setTimeout(" - "function() {" procedureJS "(null)" "}" - "," (int//to-frac milli-secondsJS) ")")) - ## [Bundles] (def: lux-procs Bundle @@ -427,14 +416,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule)) - ))) - (def: #export procedures Bundle (<| (prefix "lux") @@ -446,5 +427,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/jvm/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/jvm/procedure/common.jvm.lux index a6b037947..49b1971f1 100644 --- a/new-luxc/source/luxc/lang/translation/jvm/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/jvm/procedure/common.jvm.lux @@ -433,20 +433,6 @@ (_.int 0) valueI _.AASTORE unitI)) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (|>> (_.INVOKESTATIC "java.lang.Runtime" "getRuntime" ($t.method (list) (#.Some ($t.class "java.lang.Runtime" (list))) (list)) #0) - (_.INVOKEVIRTUAL "java.lang.Runtime" "availableProcessors" ($t.method (list) (#.Some $t.int) (list)) #0) - lux-intI)) - -(def: (process//schedule [millisecondsI procedureI]) - Binary - (|>> millisecondsI (_.unwrap #$.Long) - procedureI (_.CHECKCAST hostL.function-class) - (_.INVOKESTATIC hostL.runtime-class "schedule" - ($t.method (list $t.long $Function) (#.Some $Object) (list)) #0))) - ## [Bundles] (def: lux-procs Bundle @@ -543,14 +529,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule)) - ))) - (def: #export procedures Bundle (<| (prefix "lux") @@ -562,5 +540,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/jvm/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/jvm/runtime.jvm.lux index 28bce7d28..eec57610d 100644 --- a/new-luxc/source/luxc/lang/translation/jvm/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/jvm/runtime.jvm.lux @@ -34,7 +34,6 @@ (def: #export $Function $.Type ($t.class //.function-class (list))) (def: $Throwable $.Type ($t.class "java.lang.Throwable" (list))) (def: $Runtime $.Type ($t.class "java.lang.Runtime" (list))) -(def: $Runnable $.Type ($t.class "java.lang.Runnable" (list))) (def: #export logI Inst @@ -373,64 +372,6 @@ _.ARETURN))) ))) -(def: process-methods - Def - (let [executor-class "java.util.concurrent.ScheduledThreadPoolExecutor" - executorT ($t.class executor-class (list)) - executor-field "executor" - endI (|>> (_.string //.unit) - _.ARETURN) - runnableI (: (-> Inst Inst) - (function (_ functionI) - (|>> (_.NEW //.runnable-class) - _.DUP - functionI - (_.INVOKESPECIAL //.runnable-class "" ($t.method (list $Function) #.None (list)) #0)))) - threadI (: (-> Inst Inst) - (function (_ runnableI) - (|>> (_.NEW "java.lang.Thread") - _.DUP - runnableI - (_.INVOKESPECIAL "java.lang.Thread" "" ($t.method (list $Runnable) #.None (list)) #0))))] - (|>> ($d.field #$.Public ($.++F $.finalF $.staticF) executor-field executorT) - ($d.method #$.Public $.staticM "" ($t.method (list) #.None (list)) - (let [parallelism-levelI (|>> (_.INVOKESTATIC "java.lang.Runtime" "getRuntime" ($t.method (list) (#.Some $Runtime) (list)) #0) - (_.INVOKEVIRTUAL "java.lang.Runtime" "availableProcessors" ($t.method (list) (#.Some $t.int) (list)) #0)) - executorI (|>> (_.NEW executor-class) - _.DUP - parallelism-levelI - (_.INVOKESPECIAL executor-class "" ($t.method (list $t.int) #.None (list)) #0))] - (|>> executorI - (_.PUTSTATIC //.runtime-class executor-field executorT) - _.RETURN))) - ($d.method #$.Public $.staticM "schedule" - ($t.method (list $t.long $Function) (#.Some $Object) (list)) - (let [delayI (_.LLOAD 0) - immediacy-checkI (|>> delayI - (_.long +0) - _.LCMP) - time-unit-class "java.util.concurrent.TimeUnit" - time-unitT ($t.class time-unit-class (list)) - futureT ($t.class "java.util.concurrent.ScheduledFuture" (list)) - executorI (_.GETSTATIC //.runtime-class executor-field executorT) - schedule-laterI (|>> executorI - (runnableI (_.ALOAD 2)) - delayI - (_.GETSTATIC time-unit-class "MILLISECONDS" time-unitT) - (_.INVOKEVIRTUAL executor-class "schedule" ($t.method (list $Runnable $t.long time-unitT) (#.Some futureT) (list)) #0)) - schedule-immediatelyI (|>> executorI - (runnableI (_.ALOAD 2)) - (_.INVOKEVIRTUAL executor-class "execute" ($t.method (list $Runnable) #.None (list)) #0))] - (<| _.with-label (function (_ @immediately)) - (|>> immediacy-checkI - (_.IFEQ @immediately) - schedule-laterI - endI - (_.label @immediately) - schedule-immediatelyI - endI)))) - ))) - (def: translate-runtime (Operation ByteCode) (let [bytecode ($d.class #$.V1_6 #$.Public $.finalC //.runtime-class (list) ["java.lang.Object" (list)] (list) @@ -438,8 +379,7 @@ frac-methods text-methods pm-methods - io-methods - process-methods))] + io-methods))] (do phase.Monad [_ (translation.execute! [//.runtime-class bytecode])] (wrap bytecode)))) @@ -474,33 +414,9 @@ [_ (translation.execute! [//.function-class bytecode])] (wrap bytecode)))) -(def: translate-runnable - (Operation ByteCode) - (let [procedure-field "procedure" - bytecode ($d.class #$.V1_6 #$.Public $.finalC //.runnable-class (list) ["java.lang.Object" (list)] (list ["java.lang.Runnable" (list)]) - (|>> ($d.field #$.Public $.finalF procedure-field $Function) - ($d.method #$.Public $.noneM "" ($t.method (list $Function) #.None (list)) - (|>> (_.ALOAD 0) - (_.INVOKESPECIAL "java.lang.Object" "" ($t.method (list) #.None (list)) #0) - (_.ALOAD 0) - (_.ALOAD 1) - (_.PUTFIELD //.runnable-class procedure-field $Function) - _.RETURN)) - ($d.method #$.Public $.noneM "run" ($t.method (list) #.None (list)) - (|>> (_.ALOAD 0) - (_.GETFIELD //.runnable-class procedure-field $Function) - _.NULL - (_.INVOKEVIRTUAL //.function-class apply-method (apply-signature 1) #0) - _.RETURN)) - ))] - (do phase.Monad - [_ (translation.execute! [//.runnable-class bytecode])] - (wrap bytecode)))) - (def: #export translate - (Operation [ByteCode ByteCode ByteCode]) + (Operation [ByteCode ByteCode]) (do phase.Monad [runtime-bc translate-runtime - function-bc translate-function - runnable-bc translate-runnable] - (wrap [runtime-bc function-bc runnable-bc]))) + function-bc translate-function] + (wrap [runtime-bc function-bc]))) diff --git a/new-luxc/source/luxc/lang/translation/lua/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/lua/procedure/common.jvm.lux index 356adb5c3..4b128f946 100644 --- a/new-luxc/source/luxc/lang/translation/lua/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/lua/procedure/common.jvm.lux @@ -319,15 +319,6 @@ Binary (runtimeT.box//write valueO boxO)) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (lua.int 1)) - -(def: (process//schedule [milli-secondsO procedureO]) - Binary - (runtimeT.process//schedule milli-secondsO procedureO)) - ## [Bundles] (def: lux-procs Bundle @@ -424,14 +415,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule)) - ))) - (def: #export procedures Bundle (<| (prefix "lux") @@ -443,5 +426,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/lua/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/lua/runtime.jvm.lux index 3a18f98e7..ce9c37db5 100644 --- a/new-luxc/source/luxc/lang/translation/lua/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/lua/runtime.jvm.lux @@ -258,50 +258,6 @@ Runtime (format @@box//write)) -(def: process//incoming - Text - (lang.normalize-name "process//incoming")) - -(runtime: (process//loop _) - (let [migrate-incoming! (lua.block! (list (lua.for-step! "idx" (lua.int 1) (lua.length process//incoming) (lua.int 1) - (lua.apply "table.insert" (list "queue" (lua.nth "idx" process//incoming)))) - (lua.set! process//incoming (lua.array (list))))) - consume-queue! (lua.block! (list (lua.local! "survivors" (#.Some (lua.array (list)))) - (lua.local! "active_processes" (#.Some (lua.length "queue"))) - (lua.for-step! "idx" (lua.int 1) "active_processes" (lua.int 1) - (lua.block! (list (lua.local! "process" (#.Some (lua.nth "idx" "queue"))) - (lua.when! (lua.apply "coroutine.resume" (list "process")) - (lua.apply "table.insert" (list "survivors" "process")))))) - (lua.set! "queue" "survivors"))) - loop-body! (lua.block! (list migrate-incoming! - consume-queue!))] - (lua.block! (list (lua.local! "queue" (#.Some (lua.array (list)))) - loop-body! - (lua.while! (|> (lua.length "queue") (lua.> (lua.int 0))) - loop-body!))))) - -(runtime: (process//schedule milli-seconds procedure) - (let [now (lua.apply "os.time" (list))] - (lua.if! (lua.= (lua.int 0) milli-seconds) - (lua.apply "table.insert" (list process//incoming - (lua.function (list) - (lua.return! (lua.apply procedure (list unit)))))) - (lua.block! (list (lua.local! "start" (#.Some now)) - (lua.local! "seconds" (#.Some (lua.// (lua.int 1_000) - milli-seconds))) - (lua.apply "table.insert" (list process//incoming - (lua.function (list) - (lua.block! (list (lua.while! (lua.< "seconds" (lua.- "start" now)) - (lua.apply "coroutine.yield" (list))) - (lua.return! (lua.apply procedure (list unit)))))))) - (lua.return! unit)))))) - -(def: runtime//process - Runtime - (format (lua.global! process//incoming (#.Some (lua.array (list)))) - @@process//loop - @@process//schedule)) - (runtime: (lua//get object field) (lua.block! (list (lua.local! "value" (#.Some (lua.nth field object))) (lua.if! (lua.= lua.nil "value") @@ -325,7 +281,6 @@ runtime//text runtime//array runtime//box - runtime//process runtime//lua)) (def: #export artifact Text (format prefix ".lua")) diff --git a/new-luxc/source/luxc/lang/translation/php/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/php/procedure/common.jvm.lux index e195130c5..774c28acf 100644 --- a/new-luxc/source/luxc/lang/translation/php/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/php/procedure/common.jvm.lux @@ -338,23 +338,6 @@ ## (install "current-time" (nullary (function (_ _) ## (runtimeT.io//current-time! runtimeT.unit))))))) -## ## [[Processes]] -## (def: (process//parallelism-level []) -## Nullary -## (_.int 1)) - -## (def: (process//schedule [milli-secondsO procedureO]) -## Binary -## (runtimeT.process//schedule milli-secondsO procedureO)) - -## (def: process-procs -## Bundle -## (<| (prefix "process") -## (|> (dict.new text.Hash) -## (install "parallelism-level" (nullary process//parallelism-level)) -## (install "schedule" (binary process//schedule)) -## ))) - ## [Bundles] (def: #export procedures Bundle @@ -367,5 +350,4 @@ ## (dict.merge text-procs) ## (dict.merge array-procs) ## (dict.merge io-procs) - ## (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/php/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/php/runtime.jvm.lux index c57bc3d80..d33cdb76c 100644 --- a/new-luxc/source/luxc/lang/translation/php/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/php/runtime.jvm.lux @@ -299,30 +299,6 @@ ## @@array//get ## @@array//put)) -## (runtime: (process//future procedure) -## ($_ _.then! -## (_.import! "threading") -## (let [params (_.dict (list [(_.string "target") procedure]))] -## (_.do! (|> (_.global "threading") -## (_.send-keyword (list) params "Thread") -## (_.send (list) "start")))) -## (_.return! ..unit))) - -## (runtime: (process//schedule milli-seconds procedure) -## ($_ _.then! -## (_.import! "threading") -## (let [seconds (|> milli-seconds (_./ (_.float 1_000.0)))] -## (_.do! (|> (_.global "threading") -## (_.send (list seconds procedure) "Timer") -## (_.send (list) "start")))) -## (_.return! ..unit))) - -## (def: runtime//process -## Runtime -## ($_ _.then! -## @@process//future -## @@process//schedule)) - (def: check-necessary-conditions! Statement (let [condition (_.= (_.int 8) @@ -342,7 +318,6 @@ ## runtime//text ## runtime//array ## runtime//io - ## runtime//process )) (def: #export artifact Text (format prefix //.extension)) diff --git a/new-luxc/source/luxc/lang/translation/python/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/python/procedure/common.jvm.lux index 9a70c8c92..a760dc3a2 100644 --- a/new-luxc/source/luxc/lang/translation/python/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/python/procedure/common.jvm.lux @@ -379,23 +379,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (python.int 1)) - -(def: (process//schedule [milli-secondsO procedureO]) - Binary - (runtimeT.process//schedule milli-secondsO procedureO)) - -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule)) - ))) - ## [Bundles] (def: #export procedures Bundle @@ -408,5 +391,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/python/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/python/runtime.jvm.lux index 571835b79..2cfe7eb1e 100644 --- a/new-luxc/source/luxc/lang/translation/python/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/python/runtime.jvm.lux @@ -342,24 +342,6 @@ Runtime @@box//write) -(runtime: (process//schedule milli-seconds procedure) - ($_ python.then! - (python.import! "threading") - (python.if! (python.= (python.int 0) milli-seconds) - (let [params (python.dict (list [(python.string "target") procedure]))] - (python.do! (|> (python.global "threading") - (python.send-keyword (list) params "Thread") - (python.send (list) "start")))) - (let [seconds (|> milli-seconds (python./ (python.float 1_000.0)))] - (python.do! (|> (python.global "threading") - (python.send (list seconds procedure) "Timer") - (python.send (list) "start"))))) - (python.return! ..unit))) - -(def: runtime//process - Runtime - @@process//schedule) - (def: runtime Runtime ($_ python.then! @@ -371,7 +353,6 @@ runtime//array runtime//box runtime//io - runtime//process )) (def: #export artifact Text (format prefix ".py")) diff --git a/new-luxc/source/luxc/lang/translation/r/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/r/procedure/common.jvm.lux index d8f4f4662..d8b383ff2 100644 --- a/new-luxc/source/luxc/lang/translation/r/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/r/procedure/common.jvm.lux @@ -371,23 +371,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (r.int 1)) - -(def: (process//schedule [milli-secondsO procedureO]) - Binary - (runtimeT.process//schedule milli-secondsO procedureO)) - -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule)) - ))) - ## [Bundles] (def: #export procedures Bundle @@ -400,5 +383,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/r/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/r/runtime.jvm.lux index ee46836cb..dff7c4ae1 100644 --- a/new-luxc/source/luxc/lang/translation/r/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/r/runtime.jvm.lux @@ -774,62 +774,6 @@ ($_ r.then @@box//write)) -(def: process//incoming - SVar - (r.var (lang.normalize-name "process//incoming"))) - -(def: (list-append! value rlist) - (-> Expression SVar Expression) - (r.set-nth! (|> (@@ rlist) r.length (r.+ (r.int 1))) - value - rlist)) - -(runtime: (process//loop _) - (let [empty (r.list (list))] - (with-vars [queue process] - (let [migrate-incoming! ($_ r.then - (r.set! queue empty) - (<| (r.for-in process (@@ process//incoming)) - (list-append! (@@ process) queue)) - (r.set! process//incoming empty)) - consume-queue! (<| (r.for-in process (@@ queue)) - (r.apply (list ..unit) (@@ process)))] - ($_ r.then - migrate-incoming! - consume-queue! - (r.when (|> (r.length (@@ queue)) (r.> (r.int 0))) - (process//loop ..unit))))))) - -(runtime: (process//schedule milli-seconds procedure) - (r.if (r.= (r.int 0) (@@ milli-seconds)) - ($_ r.then - (list-append! (@@ procedure) process//incoming) - ..unit) - (let [to-seconds (|>> (r./ (r.float 1_000.0))) - to-millis (|>> (r.* (r.float 1_000.0)))] - (with-vars [start now seconds _arg elapsed-time] - ($_ r.then - (r.set! start current-time-float) - (r.set! seconds (to-seconds (int//to-float (@@ milli-seconds)))) - (list-append! (r.function (list _arg) - ($_ r.then - (r.set! now current-time-float) - (r.set! elapsed-time (|> (@@ now) (r.- (@@ start)))) - (r.if (|> (@@ elapsed-time) (r.>= (@@ seconds))) - (@@ procedure) - (process//schedule (to-millis (@@ elapsed-time)) - (@@ procedure))))) - process//incoming) - ..unit))))) - -(def: runtime//process - Runtime - ($_ r.then - (r.set! process//incoming (r.list (list))) - @@process//loop - @@process//schedule - )) - (def: runtime Runtime ($_ r.then @@ -846,7 +790,6 @@ runtime//array runtime//box runtime//io - runtime//process )) (def: #export artifact Text (format prefix ".r")) diff --git a/new-luxc/source/luxc/lang/translation/ruby/procedure/common.jvm.lux b/new-luxc/source/luxc/lang/translation/ruby/procedure/common.jvm.lux index 96d42a4a9..1f995b44b 100644 --- a/new-luxc/source/luxc/lang/translation/ruby/procedure/common.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/ruby/procedure/common.jvm.lux @@ -412,23 +412,6 @@ (install "read" (unary box//read)) (install "write" (binary box//write))))) -## [[Processes]] -(def: (process//parallelism-level []) - Nullary - (ruby.int 1)) - -(def: (process//schedule [milli-secondsO procedureO]) - Binary - (runtimeT.process//schedule milli-secondsO procedureO)) - -(def: process-procs - Bundle - (<| (prefix "process") - (|> (dict.new text.Hash) - (install "parallelism-level" (nullary process//parallelism-level)) - (install "schedule" (binary process//schedule)) - ))) - ## [Bundles] (def: #export procedures Bundle @@ -441,5 +424,4 @@ (dict.merge array-procs) (dict.merge io-procs) (dict.merge box-procs) - (dict.merge process-procs) ))) diff --git a/new-luxc/source/luxc/lang/translation/ruby/runtime.jvm.lux b/new-luxc/source/luxc/lang/translation/ruby/runtime.jvm.lux index 32ab5b10c..02de3dc7b 100644 --- a/new-luxc/source/luxc/lang/translation/ruby/runtime.jvm.lux +++ b/new-luxc/source/luxc/lang/translation/ruby/runtime.jvm.lux @@ -230,19 +230,6 @@ Runtime (format @@box//write)) -(runtime: (process//schedule milli-seconds procedure) - (ruby.block! - (list (format "(Thread.new {" - (ruby.when! (ruby.not (ruby.= (ruby.int 0) milli-seconds)) - (ruby.statement (ruby.apply "sleep" (list (ruby./ (ruby.float 1_000.0) milli-seconds))))) - (ruby.statement (ruby.call (list ..unit) procedure)) - "})") - (ruby.return! ..unit)))) - -(def: runtime//process - Runtime - @@process//schedule) - (def: runtime Runtime (format runtime//lux "\n" @@ -251,7 +238,6 @@ runtime//text "\n" runtime//array "\n" runtime//box "\n" - runtime//process "\n" )) (def: #export artifact Text (format prefix ".rb")) diff --git a/new-luxc/test/test/luxc/lang/translation/common.lux b/new-luxc/test/test/luxc/lang/translation/common.lux index 5425be2ea..7fe49fae2 100644 --- a/new-luxc/test/test/luxc/lang/translation/common.lux +++ b/new-luxc/test/test/luxc/lang/translation/common.lux @@ -414,36 +414,6 @@ #0)))) ))) -(def: (process-spec run) - (-> Runner Test) - ($_ seq - (test "Can query the concurrency level of the machine." - (|> (run (#synthesis.Extension "lux process parallelism-level" (list))) - (case> (#e.Success valueV) - (n/>= 1 (:coerce Nat valueV)) - - (#e.Error error) - (exec (log! error) - #0)))) - (do r.Monad - [delay (|> r.nat (:: @ map (n/% 10))) - message (r.ascii/upper-alpha 5)] - (test "Can schedule I/O operations for future execution." - (|> (run (#synthesis.Extension "lux process schedule" - (list (synthesis.i64 delay) - (synthesis.function/abstraction - {#synthesis.environment (list) - #synthesis.arity 1 - #synthesis.body (#synthesis.Extension "lux io log" - (list (synthesis.text (format "SCHEDULE: " message))))})))) - (case> (#e.Success valueV) - #1 - - (#e.Error error) - (exec (log! error) - #0))))) - )) - (def: (all-specs run) (-> Runner Test) ($_ seq @@ -454,7 +424,6 @@ (array-spec run) (io-spec run) (box-spec run) - (process-spec run) )) (context: "[JVM] Common procedures." diff --git a/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux b/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux index ea5215a55..3272f8a29 100644 --- a/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux +++ b/stdlib/source/lux/compiler/default/phase/extension/analysis/common.lux @@ -280,14 +280,6 @@ (bundle.install "write" box::write) ))) -(def: bundle::process - Bundle - (<| (bundle.prefix "process") - (|> bundle.empty - (bundle.install "parallelism" (nullary Nat)) - (bundle.install "schedule" (binary Nat (type (IO Any)) Any)) - ))) - (def: #export bundle Bundle (<| (bundle.prefix "lux") @@ -299,6 +291,5 @@ (dict.merge bundle::text) (dict.merge bundle::array) (dict.merge bundle::box) - (dict.merge bundle::process) - (dict.merge bundle::io)) - )) + (dict.merge bundle::io) + ))) diff --git a/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux b/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux index 3aa2b453d..d1576248d 100644 --- a/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux +++ b/stdlib/source/lux/compiler/default/phase/translation/scheme/extension/common.jvm.lux @@ -290,19 +290,6 @@ (bundle.install "read" (unary box::read)) (bundle.install "write" (binary box::write))))) -## [[Processes]] -(def: (process::parallelism-level []) - Nullary - (_.int +1)) - -(def: bundle::process - Bundle - (<| (bundle.prefix "process") - (|> bundle.empty - (bundle.install "parallelism-level" (nullary process::parallelism-level)) - (bundle.install "schedule" (binary (product.uncurry runtime.process//schedule))) - ))) - ## [Bundles] (def: #export bundle Bundle @@ -315,5 +302,4 @@ (dict.merge bundle::array) (dict.merge bundle::io) (dict.merge bundle::box) - (dict.merge bundle::process) ))) diff --git a/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux b/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux index 0e082a5d8..43748c3b1 100644 --- a/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux +++ b/stdlib/source/lux/compiler/default/phase/translation/scheme/runtime.jvm.lux @@ -302,43 +302,6 @@ (def: runtime//io (_.begin (list @@io//current-time))) -(def: process//incoming - Var - (_.var (name.normalize "process//incoming"))) - -(runtime: (process//loop _) - (_.when (_.not/1 (_.null?/1 process//incoming)) - (with-vars [queue process] - (_.let (list [queue process//incoming]) - (_.begin (list (_.set! process//incoming (_.list/* (list))) - (_.map/2 (_.lambda [(list process) #.None] - (_.apply/1 process ..unit)) - queue) - (process//loop ..unit))))))) - -(runtime: (process//schedule milli-seconds procedure) - (let [process//future (function (_ process) - (_.set! process//incoming (_.cons/2 process process//incoming)))] - (_.begin - (list - (_.if (_.=/2 (_.int +0) milli-seconds) - (process//future procedure) - (with-vars [@start @process @now @ignored] - (_.let (list [@start (io//current-time ..unit)]) - (_.letrec (list [@process (_.lambda [(list) (#.Some @ignored)] - (_.let (list [@now (io//current-time ..unit)]) - (_.if (|> @now (_.-/2 @start) (_.>=/2 milli-seconds)) - (_.apply/1 procedure ..unit) - (process//future @process))))]) - (process//future @process))))) - ..unit)))) - -(def: runtime//process - Computation - (_.begin (list (_.define process//incoming [(list) #.None] (_.list/* (list))) - @@process//loop - @@process//schedule))) - (def: runtime Computation (_.begin (list @@slice @@ -349,7 +312,6 @@ runtime//array runtime//box runtime//io - runtime//process ))) (def: #export translate diff --git a/stdlib/source/lux/concurrency/process.lux b/stdlib/source/lux/concurrency/process.lux new file mode 100644 index 000000000..2ff56c395 --- /dev/null +++ b/stdlib/source/lux/concurrency/process.lux @@ -0,0 +1,104 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + ["ex" exception (#+ exception:)]] + [data + [collection + ["." list]]] + [compiler + ["." host]] + ["." io (#+ IO io)] + [host (#+ import: object)]] + [// + ["." atom (#+ Atom)]]) + +(`` (for {(~~ (static host.jvm)) + (as-is (import: java/lang/Object) + + (import: java/lang/Runtime + (#static getRuntime [] Runtime) + (availableProcessors [] int)) + + (import: java/lang/Runnable) + + (import: java/util/concurrent/TimeUnit + (#enum MILLISECONDS)) + + (import: java/util/concurrent/Executor + (execute [Runnable] #io void)) + + (import: (java/util/concurrent/ScheduledFuture a)) + + (import: java/util/concurrent/ScheduledThreadPoolExecutor + (new [int]) + (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} + + (type: Process + {#creation Nat + #delay Nat + #action (IO Any)}) + )) + +(def: #export parallelism + Nat + (`` (for {(~~ (static host.jvm)) + (|> (Runtime::getRuntime []) + (Runtime::availableProcessors []) + .nat)} + 1))) + +(def: runner + (`` (for {(~~ (static host.jvm)) + (ScheduledThreadPoolExecutor::new [(.int ..parallelism)])} + + (: (Atom (List Process)) + (atom.atom (list)))))) + +(def: #export (schedule milli-seconds action) + (-> Nat (IO Any) (IO Any)) + (`` (for {(~~ (static host.jvm)) + (let [runnable (object [] [Runnable] + [] + (Runnable [] (run) void + (io.run action)))] + (case milli-seconds + 0 (Executor::execute [runnable] + runner) + _ (ScheduledThreadPoolExecutor::schedule [runnable (.int milli-seconds) TimeUnit::MILLISECONDS] + runner)))} + (atom.update (|>> (#.Cons {#creation ("lux io current-time") + #delay milli-seconds + #action action})) + runner)))) + +(`` (for {(~~ (static host.jvm)) + (as-is)} + (as-is (exception: #export (cannot-continue-running-processes) "") + + (def: #export run! + (IO Any) + (loop [_ []] + (do io.Monad + [processes (atom.read runner)] + (case processes + ## And... we're done! + #.Nil + (wrap []) + + _ + (do @ + [#let [now ("lux io current-time") + [ready pending] (list.partition (function (_ process) + (|> (get@ #creation process) + (n/+ (get@ #delay process)) + (n/<= now))) + processes)] + swapped? (atom.compare-and-swap! processes pending runner)] + (if swapped? + (do @ + [_ (monad.seq @ ready)] + (recur [])) + (error! (ex.construct cannot-continue-running-processes [])))) + )))) + ))) diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux index 7062c2082..24f26a24c 100644 --- a/stdlib/source/lux/concurrency/promise.lux +++ b/stdlib/source/lux/concurrency/promise.lux @@ -9,13 +9,10 @@ ["." function] [type abstract] - ["." io (#+ IO io)] - [concurrency - ["." atom (#+ Atom atom)]]]) - -(def: #export parallelism - Nat - ("lux process parallelism")) + ["." io (#+ IO io)]] + [// + ["." process] + ["." atom (#+ Atom atom)]]) (abstract: #export (Promise a) {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} @@ -147,9 +144,11 @@ {#.doc "Runs an I/O computation on its own process (after a specified delay) and returns a Promise that will eventually host its result."} (All [a] (-> Nat (IO a) (Promise a))) (let [!out (promise #.None)] - (exec ("lux process schedule" millis-delay - (io (io.run (resolve (io.run computation) - !out)))) + (exec (|> (do io.Monad + [value computation] + (resolve value !out)) + (process.schedule millis-delay) + io.run) !out))) (def: #export future diff --git a/stdlib/source/lux/host.jvm.lux b/stdlib/source/lux/host.jvm.lux index 5b91fc526..98cf40bfc 100644 --- a/stdlib/source/lux/host.jvm.lux +++ b/stdlib/source/lux/host.jvm.lux @@ -1370,10 +1370,11 @@ {constructor-args (constructor-args^ imports class-vars)} {methods (p.some (overriden-method-def^ imports))}) {#.doc (doc "Allows defining anonymous classes." - "The 1st tuple corresponds to parent interfaces." - "The 2nd tuple corresponds to arguments to the super class constructor." + "The 1st tuple corresponds to class-level type-variables." + "The 2nd tuple corresponds to parent interfaces." + "The 3rd tuple corresponds to arguments to the super class constructor." "An optional super-class can be specified before the 1st tuple. If not specified, java.lang.Object will be assumed." - (object [Runnable] + (object [] [Runnable] [] (Runnable [] (run) void (exec (do-something some-value) diff --git a/stdlib/source/lux/test.lux b/stdlib/source/lux/test.lux index 26e99fbce..5b214579d 100644 --- a/stdlib/source/lux/test.lux +++ b/stdlib/source/lux/test.lux @@ -20,6 +20,7 @@ ["s" syntax (#+ syntax: Syntax)] ["." code]] [concurrency + ["." process] ["." promise (#+ Promise)]] ["." io (#+ IO io)]]) @@ -240,7 +241,7 @@ (~+ (|> tests (list/map (function (_ [module-name test desc]) (` [(~ (code.text module-name)) (~ (code.identifier [module-name test])) (~ (code.text desc))]))) - (list.split-all promise.parallelism) + (list.split-all process.parallelism) (list/map (function (_ group) (list (` [(~ g!successes) (~ g!failures)]) (` ((~! run') (list (~+ group)))) (' #let) (` [(~ g!total-successes) (n/+ (~ g!successes) (~ g!total-successes)) diff --git a/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux b/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux index 8d3e8b8fa..9d733912e 100644 --- a/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux +++ b/stdlib/test/test/lux/compiler/default/phase/analysis/procedure/common.lux @@ -245,21 +245,6 @@ #0))) )))) -(context: "Process procedures" - (<| (times 100) - (do @ - [[primT primC] _primitive.primitive - timeC (|> r.nat (:: @ map code.nat))] - ($_ seq - (test "Can query the level of concurrency." - (check-success+ "lux process parallelism-level" (list) Nat)) - (test "Can schedule an IO computation to run concurrently at some future time." - (check-success+ "lux process schedule" - (list timeC - (` ([(~' _) (~' _)] (~ primC)))) - Any)) - )))) - (context: "IO procedures" (<| (times 100) (do @ diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux index 3506146f4..ee84f193e 100644 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ b/stdlib/test/test/lux/concurrency/stm.lux @@ -10,6 +10,7 @@ [concurrency ["." atom (#+ Atom atom)] ["&" stm] + ["." process] ["." promise] ["." frp (#+ Channel)]] [math @@ -62,7 +63,7 @@ (list.reverse changes))))) (wrap (let [_concurrency-var (&.var 0)] (do promise.Monad - [_ (|> promise.parallelism + [_ (|> process.parallelism (list.n/range 1) (list/map (function (_ _) (|> iterations-per-process @@ -71,6 +72,6 @@ (M.seq @)) last-val (&.commit (&.read _concurrency-var))] (assert "Can modify STM vars concurrently from multiple threads." - (|> promise.parallelism + (|> process.parallelism (n/* iterations-per-process) (n/= last-val)))))))) -- cgit v1.2.3