From 8ff8934813562f28f79cc08014947eb282256e6a Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Thu, 23 Feb 2017 19:46:22 -0400 Subject: - Re-designed (and implemented) the primitives for running processes/threads as Lux procedures. --- luxc/src/lux/analyser/proc/common.clj | 31 +++++++++ luxc/src/lux/compiler/jvm.clj | 3 +- luxc/src/lux/compiler/jvm/proc/common.clj | 39 +++++++++++ luxc/src/lux/compiler/jvm/rt.clj | 106 +++++++++++++++++++++++++++++- luxc/src/lux/type.clj | 5 ++ stdlib/source/lux/concurrency/promise.lux | 47 ++----------- 6 files changed, 186 insertions(+), 45 deletions(-) diff --git a/luxc/src/lux/analyser/proc/common.clj b/luxc/src/lux/analyser/proc/common.clj index 51e0f3528..7703aa8a6 100644 --- a/luxc/src/lux/analyser/proc/common.clj +++ b/luxc/src/lux/analyser/proc/common.clj @@ -436,6 +436,30 @@ (return (&/|list (&&/|meta exo-type _cursor (&&/$proc (&/T ["atom" "compare-and-swap"]) (&/|list =atom =old =new) (&/|list))))))))) +(defn ^:private analyse-process-concurrency-level [analyse exo-type ?values] + (|do [:let [(&/$Nil) ?values] + _ (&type/check exo-type &type/Nat) + _cursor &/cursor] + (return (&/|list (&&/|meta exo-type _cursor + (&&/$proc (&/T ["process" "concurrency-level"]) (&/|list) (&/|list))))))) + +(defn ^:private analyse-process-future [analyse exo-type ?values] + (|do [:let [(&/$Cons ?procedure (&/$Nil)) ?values] + =procedure (&&/analyse-1 analyse (&/$AppT &type/IO &type/Top) ?procedure) + _ (&type/check exo-type &/$UnitT) + _cursor &/cursor] + (return (&/|list (&&/|meta exo-type _cursor + (&&/$proc (&/T ["process" "future"]) (&/|list =procedure) (&/|list))))))) + +(defn ^:private analyse-process-schedule [analyse exo-type ?values] + (|do [:let [(&/$Cons ?milliseconds (&/$Cons ?procedure (&/$Nil))) ?values] + =milliseconds (&&/analyse-1 analyse &type/Nat ?milliseconds) + =procedure (&&/analyse-1 analyse (&/$AppT &type/IO &type/Top) ?procedure) + _ (&type/check exo-type &/$UnitT) + _cursor &/cursor] + (return (&/|list (&&/|meta exo-type _cursor + (&&/$proc (&/T ["process" "schedule"]) (&/|list =milliseconds =procedure) (&/|list))))))) + (defn analyse-proc [analyse exo-type category proc ?values] (case category "lux" @@ -595,6 +619,13 @@ "get" (analyse-atom-get analyse exo-type ?values) "compare-and-swap" (analyse-atom-compare-and-swap analyse exo-type ?values) ) + + "process" + (case proc + "concurrency-level" (analyse-process-concurrency-level analyse exo-type ?values) + "future" (analyse-process-future analyse exo-type ?values) + "schedule" (analyse-process-schedule analyse exo-type ?values) + ) ;; else (&/fail-with-loc (str "[Analyser Error] Unknown host procedure: " [category proc])))) diff --git a/luxc/src/lux/compiler/jvm.clj b/luxc/src/lux/compiler/jvm.clj index 5cc3c1f79..7fd764e56 100644 --- a/luxc/src/lux/compiler/jvm.clj +++ b/luxc/src/lux/compiler/jvm.clj @@ -192,7 +192,8 @@ (.visitSource file-name nil))] _ (if (= "lux" name) (|do [_ &&rt/compile-Function-class - _ &&rt/compile-LuxRT-class] + _ &&rt/compile-LuxRT-class + _ &&rt/compile-LuxRunnable-class] (return nil)) (return nil))] (fn [state] diff --git a/luxc/src/lux/compiler/jvm/proc/common.clj b/luxc/src/lux/compiler/jvm/proc/common.clj index dd59a41f0..0afcdc9e0 100644 --- a/luxc/src/lux/compiler/jvm/proc/common.clj +++ b/luxc/src/lux/compiler/jvm/proc/common.clj @@ -826,6 +826,38 @@ &&/wrap-boolean)]] (return nil))) +(defn ^:private compile-process-concurrency-level [compile ?values special-args] + (|do [:let [(&/$Nil) ?values] + ^MethodVisitor *writer* &/get-writer + :let [_ (doto *writer* + (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "concurrency_level" "I") + (.visitInsn Opcodes/I2L) + &&/wrap-long)]] + (return nil))) + +(defn ^:private compile-process-future [compile ?values special-args] + (|do [:let [(&/$Cons ?procedure (&/$Nil)) ?values] + ^MethodVisitor *writer* &/get-writer + _ (compile ?procedure) + :let [_ (doto *writer* + (.visitTypeInsn Opcodes/CHECKCAST "lux/Function"))] + :let [_ (doto *writer* + (.visitMethodInsn Opcodes/INVOKESTATIC "lux/LuxRT" "future" "(Llux/Function;)Ljava/lang/Object;"))]] + (return nil))) + +(defn ^:private compile-process-schedule [compile ?values special-args] + (|do [:let [(&/$Cons ?milliseconds (&/$Cons ?procedure (&/$Nil))) ?values] + ^MethodVisitor *writer* &/get-writer + _ (compile ?milliseconds) + :let [_ (doto *writer* + &&/unwrap-long)] + _ (compile ?procedure) + :let [_ (doto *writer* + (.visitTypeInsn Opcodes/CHECKCAST "lux/Function"))] + :let [_ (doto *writer* + (.visitMethodInsn Opcodes/INVOKESTATIC "lux/LuxRT" "schedule" "(JLlux/Function;)Ljava/lang/Object;"))]] + (return nil))) + (defn compile-proc [compile category proc ?values special-args] (case category "lux" @@ -985,6 +1017,13 @@ "get" (compile-atom-get compile ?values special-args) "compare-and-swap" (compile-atom-compare-and-swap compile ?values special-args) ) + + "process" + (case proc + "concurrency-level" (compile-process-concurrency-level compile ?values special-args) + "future" (compile-process-future compile ?values special-args) + "schedule" (compile-process-schedule compile ?values special-args) + ) ;; else (&/fail-with-loc (str "[Compiler Error] Unknown procedure: " [category proc])))) diff --git a/luxc/src/lux/compiler/jvm/rt.clj b/luxc/src/lux/compiler/jvm/rt.clj index 7f193a1cd..0f86325f2 100644 --- a/luxc/src/lux/compiler/jvm/rt.clj +++ b/luxc/src/lux/compiler/jvm/rt.clj @@ -25,6 +25,7 @@ (def init-method "") ;; [Resources] +;; Functions (def compile-Function-class (|do [_ (return nil) :let [super-class "java/lang/Object" @@ -65,6 +66,37 @@ (&&/save-class! (second (string/split &&/function-class #"/")) (.toByteArray (doto =class .visitEnd))))) +;; Custom Runnable +(def compile-LuxRunnable-class + (|do [_ (return nil) + :let [=class (doto (new ClassWriter ClassWriter/COMPUTE_MAXS) + (.visit &host/bytecode-version (+ Opcodes/ACC_PUBLIC Opcodes/ACC_FINAL Opcodes/ACC_SUPER) + "lux/LuxRunnable" nil "java/lang/Object" (into-array String ["java/lang/Runnable"]))) + _ (doto (.visitField =class Opcodes/ACC_PUBLIC "procedure" "Llux/Function;" nil nil) + (.visitEnd)) + _ (doto (.visitMethod =class Opcodes/ACC_PUBLIC init-method "(Llux/Function;)V" nil nil) + (.visitCode) + (.visitVarInsn Opcodes/ALOAD 0) + (.visitMethodInsn Opcodes/INVOKESPECIAL "java/lang/Object" init-method "()V") + (.visitVarInsn Opcodes/ALOAD 0) + (.visitVarInsn Opcodes/ALOAD 1) + (.visitFieldInsn Opcodes/PUTFIELD "lux/LuxRunnable" "procedure" "Llux/Function;") + (.visitInsn Opcodes/RETURN) + (.visitMaxs 0 0) + (.visitEnd)) + _ (doto (.visitMethod =class Opcodes/ACC_PUBLIC "run" "()V" nil nil) + (.visitCode) + (.visitVarInsn Opcodes/ALOAD 0) + (.visitFieldInsn Opcodes/GETFIELD "lux/LuxRunnable" "procedure" "Llux/Function;") + (.visitInsn Opcodes/ACONST_NULL) + (.visitMethodInsn Opcodes/INVOKEVIRTUAL "lux/Function" &&/apply-method (&&/apply-signature 1)) + (.visitInsn Opcodes/RETURN) + (.visitMaxs 0 0) + (.visitEnd))]] + (&&/save-class! "LuxRunnable" + (.toByteArray (doto =class .visitEnd))))) + +;; Runtime infrastructure (defn ^:private compile-LuxRT-adt-methods [^ClassWriter =class] (|let [_ (let [$begin (new Label) $not-rec (new Label)] @@ -1281,6 +1313,77 @@ (.visitEnd))) nil)) +(defn ^:private compile-LuxRT-process-methods [^ClassWriter =class] + (do (doto (.visitField =class + (+ Opcodes/ACC_PUBLIC Opcodes/ACC_FINAL Opcodes/ACC_STATIC) + "concurrency_level" "I" nil nil) + (.visitEnd)) + (doto (.visitField =class + (+ Opcodes/ACC_PUBLIC Opcodes/ACC_FINAL Opcodes/ACC_STATIC) + "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;" nil nil) + (.visitEnd)) + (doto (.visitMethod =class Opcodes/ACC_STATIC "" "()V" nil nil) + (.visitCode) + ;; concurrency_level + (.visitMethodInsn Opcodes/INVOKESTATIC "java/lang/Runtime" "getRuntime" "()Ljava/lang/Runtime;") + (.visitMethodInsn Opcodes/INVOKEVIRTUAL "java/lang/Runtime" "availableProcessors" "()I") + (.visitFieldInsn Opcodes/PUTSTATIC "lux/LuxRT" "concurrency_level" "I") + ;; executor + (.visitTypeInsn Opcodes/NEW "java/util/concurrent/ScheduledThreadPoolExecutor") + (.visitInsn Opcodes/DUP) + (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "concurrency_level" "I") + (.visitMethodInsn Opcodes/INVOKESPECIAL "java/util/concurrent/ScheduledThreadPoolExecutor" "" "(I)V") + (.visitFieldInsn Opcodes/PUTSTATIC "lux/LuxRT" "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;") + ;; DONE + (.visitInsn Opcodes/RETURN) + (.visitMaxs 0 0) + (.visitEnd)) + (doto (.visitMethod =class (+ Opcodes/ACC_PUBLIC Opcodes/ACC_STATIC) "future" "(Llux/Function;)Ljava/lang/Object;" nil nil) + (.visitCode) + (.visitTypeInsn Opcodes/NEW "java/lang/Thread") + (.visitInsn Opcodes/DUP) + (.visitTypeInsn Opcodes/NEW "lux/LuxRunnable") + (.visitInsn Opcodes/DUP) + (.visitVarInsn Opcodes/ALOAD 0) + (.visitMethodInsn Opcodes/INVOKESPECIAL "lux/LuxRunnable" "" "(Llux/Function;)V") + (.visitMethodInsn Opcodes/INVOKESPECIAL "java/lang/Thread" "" "(Ljava/lang/Runnable;)V") + (.visitMethodInsn Opcodes/INVOKEVIRTUAL "java/lang/Thread" "start" "()V") + (.visitLdcInsn &/unit-tag) + (.visitInsn Opcodes/ARETURN) + (.visitMaxs 0 0) + (.visitEnd)) + (let [$immediately (new Label)] + (doto (.visitMethod =class (+ Opcodes/ACC_PUBLIC Opcodes/ACC_STATIC) "schedule" "(JLlux/Function;)Ljava/lang/Object;" nil nil) + (.visitCode) + (.visitVarInsn Opcodes/LLOAD 0) + (.visitLdcInsn (long 0)) + (.visitInsn Opcodes/LCMP) + (.visitJumpInsn Opcodes/IFEQ $immediately) + ;; Schedule for later + (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;") + (.visitTypeInsn Opcodes/NEW "lux/LuxRunnable") + (.visitInsn Opcodes/DUP) + (.visitVarInsn Opcodes/ALOAD 2) + (.visitMethodInsn Opcodes/INVOKESPECIAL "lux/LuxRunnable" "" "(Llux/Function;)V") + (.visitVarInsn Opcodes/LLOAD 0) + (.visitFieldInsn Opcodes/GETSTATIC "java/util/concurrent/TimeUnit" "MILLISECONDS" "Ljava/util/concurrent/TimeUnit;") + (.visitMethodInsn Opcodes/INVOKEVIRTUAL "java/util/concurrent/ScheduledThreadPoolExecutor" "schedule" "(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture;") + (.visitLdcInsn &/unit-tag) + (.visitInsn Opcodes/ARETURN) + ;; Run immediately + (.visitLabel $immediately) + (.visitFieldInsn Opcodes/GETSTATIC "lux/LuxRT" "executor" "Ljava/util/concurrent/ScheduledThreadPoolExecutor;") + (.visitTypeInsn Opcodes/NEW "lux/LuxRunnable") + (.visitInsn Opcodes/DUP) + (.visitVarInsn Opcodes/ALOAD 2) + (.visitMethodInsn Opcodes/INVOKESPECIAL "lux/LuxRunnable" "" "(Llux/Function;)V") + (.visitMethodInsn Opcodes/INVOKEINTERFACE "java/util/concurrent/Executor" "execute" "(Ljava/lang/Runnable;)V") + (.visitLdcInsn &/unit-tag) + (.visitInsn Opcodes/ARETURN) + (.visitMaxs 0 0) + (.visitEnd))) + nil)) + (def compile-LuxRT-class (|do [_ (return nil) :let [full-name &&/lux-utils-class @@ -1344,6 +1447,7 @@ (compile-LuxRT-int-methods) (compile-LuxRT-deg-methods) (compile-LuxRT-real-methods) - (compile-LuxRT-text-methods))]] + (compile-LuxRT-text-methods) + (compile-LuxRT-process-methods))]] (&&/save-class! (second (string/split &&/lux-utils-class #"/")) (.toByteArray (doto =class .visitEnd))))) diff --git a/luxc/src/lux/type.clj b/luxc/src/lux/type.clj index 94c4e2ae7..854472c94 100644 --- a/luxc/src/lux/type.clj +++ b/luxc/src/lux/type.clj @@ -45,6 +45,11 @@ (&/$UnivQ empty-env (&/$BoundT 1)))) +(def Top + (&/$NamedT (&/T ["lux" "Top"]) + (&/$ExQ empty-env + (&/$BoundT 1)))) + (def IO (&/$NamedT (&/T ["lux/codata" "IO"]) (&/$UnivQ empty-env diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux index 3c10e785d..ef7efd923 100644 --- a/stdlib/source/lux/concurrency/promise.lux +++ b/stdlib/source/lux/concurrency/promise.lux @@ -12,47 +12,11 @@ [compiler] (macro ["s" syntax #+ syntax: Syntax]) (concurrency [atom #+ Atom atom]) - host )) -(jvm-import java.lang.Runtime - (#static getRuntime [] Runtime) - (availableProcessors [] int)) - -(jvm-import java.lang.Runnable) - -(jvm-import java.lang.Thread - (new [Runnable]) - (start [] void)) - -(jvm-import java.util.concurrent.Executor - (execute [Runnable] void)) - -(jvm-import java.util.concurrent.TimeUnit - (#enum MILLISECONDS)) - -(jvm-import (java.util.concurrent.ScheduledFuture a)) - -(jvm-import java.util.concurrent.ScheduledThreadPoolExecutor - (new [int]) - (schedule [Runnable long TimeUnit] (ScheduledFuture Object))) - (def: #export concurrency-level Nat - (|> (Runtime.getRuntime []) - (Runtime.availableProcessors []) - int-to-nat)) - -(def: executor - ScheduledThreadPoolExecutor - (ScheduledThreadPoolExecutor.new [(nat-to-int concurrency-level)])) - -(syntax: (runnable expr) - (wrap (list (`' (object [java.lang.Runnable] - [] - (java.lang.Runnable (run) void - (exec (~ expr) - []))))))) + (_lux_proc ["process" "concurrency-level"] [])) (type: (Promise-State a) {#value (Maybe a) @@ -218,18 +182,15 @@ {#;doc "Runs an I/O computation on its own process and returns an Promise that will eventually host its result."} (All [a] (-> (IO a) (Promise a))) (let [!out (promise ($ +0))] - (exec (Thread.start [] (Thread.new [(runnable (io;run (resolve (io;run computation) - !out)))])) + (exec (_lux_proc ["process" "future"] [(io (io;run (resolve (io;run computation) + !out)))]) !out))) (def: #export (wait time) {#;doc "Returns a Promise that will be resolved after the specified amount of milliseconds."} (-> Nat (Promise Unit)) (let [!out (promise Unit)] - (exec (ScheduledThreadPoolExecutor.schedule [(runnable (io;run (resolve [] !out))) - (nat-to-int time) - TimeUnit.MILLISECONDS] - executor) + (exec (_lux_proc ["process" "schedule"] [time (resolve [] !out)]) !out))) (def: #export (time-out time promise) -- cgit v1.2.3