aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/tool/mediator/parallelism.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/tool/mediator/parallelism.lux')
-rw-r--r--stdlib/source/lux/tool/mediator/parallelism.lux169
1 files changed, 169 insertions, 0 deletions
diff --git a/stdlib/source/lux/tool/mediator/parallelism.lux b/stdlib/source/lux/tool/mediator/parallelism.lux
new file mode 100644
index 000000000..c694f0490
--- /dev/null
+++ b/stdlib/source/lux/tool/mediator/parallelism.lux
@@ -0,0 +1,169 @@
+(.module:
+ [lux (#- Source Module)
+ [control
+ ["." monad (#+ Monad do)]
+ ["ex" exception (#+ exception:)]]
+ [concurrency
+ ["." promise (#+ Promise) ("#/." functor)]
+ ["." task (#+ Task)]
+ ["." stm (#+ Var STM)]]
+ [data
+ ["." error (#+ Error) ("#/." monad)]
+ ["." text ("#/." equivalence)
+ format]
+ [collection
+ ["." list ("#/." functor)]
+ ["." dictionary (#+ Dictionary)]]]
+ ["." io]]
+ ["." // (#+ Source Mediator)
+ [//
+ ["." compiler (#+ Input Output Compilation Compiler)
+ [meta
+ ["." archive (#+ Archive)
+ ["." descriptor (#+ Module Descriptor)]
+ [document (#+ Document)]]
+ [io
+ ["." context]]]]]])
+
+(exception: #export (self-dependency {module Module})
+ (ex.report ["Module" module]))
+
+(exception: #export (circular-dependency {module Module} {dependency Module})
+ (ex.report ["Module" module]
+ ["Dependency" dependency]))
+
+(type: Pending-Compilation
+ (Promise (Error (Ex [d] (Document d)))))
+
+(type: Active-Compilations
+ (Dictionary Module [Descriptor Pending-Compilation]))
+
+(def: (self-dependence? module dependency)
+ (-> Module Module Bit)
+ (text/= module dependency))
+
+(def: (circular-dependence? active dependency)
+ (-> Active-Compilations Module Bit)
+ (case (dictionary.get dependency active)
+ (#.Some [descriptor pending])
+ (case (get@ #descriptor.state descriptor)
+ #.Active
+ true
+
+ _
+ false)
+
+ #.None
+ false))
+
+(def: (ensure-valid-dependencies! active dependencies module)
+ (-> Active-Compilations (List Module) Module (Task Any))
+ (do task.monad
+ [_ (: (Task Any)
+ (if (list.any? (self-dependence? module) dependencies)
+ (task.throw self-dependency module)
+ (wrap [])))]
+ (: (Task Any)
+ (case (list.find (circular-dependence? active) dependencies)
+ (#.Some dependency)
+ (task.throw circular-dependency module dependency)
+
+ #.None
+ (wrap [])))))
+
+(def: (share-compilation archive pending)
+ (-> Active-Compilations Pending-Compilation (Task Archive))
+ (promise/map (|>> (error/map (function (_ document)
+ (archive.add module document archive)))
+ error/join)
+ pending))
+
+(def: (import Monad<!> mediate archive dependencies)
+ (All [!] (-> (Monad !) (Mediator !) Active-Compilations (List Module) (! (List Archive))))
+ (|> dependencies
+ (list/map (mediate archive))
+ (monad.seq Monad<!>)))
+
+(def: (step-compilation archive imports [dependencies process])
+ (All [d o] (-> Archive (List Archive) (Compilation d o)
+ [Archive (Either (Compilation d o)
+ [(Document d) (Output o)])]))
+ (do error.monad
+ [archive' (monad.fold error.monad archive.merge archive imports)
+ outcome (process archive')]
+ (case outcome
+ (#.Right [document output])
+ (do @
+ [archive'' (archive.add module document archive')]
+ (wrap [archive'' (#.Right [document output])]))
+
+ (#.Left continue)
+ (wrap [archive' outcome]))))
+
+(def: (request-compilation file-system sources module compilations)
+ (All [!]
+ (-> (file.System Task) (List Source) Module (Var Active-Compilations)
+ (Task (Either Pending-Compilation
+ [Pending-Compilation Active-Compilations Input]))))
+ (do (:: file-system &monad)
+ [current (|> (stm.read compilations)
+ stm.commit
+ task.from-promise)]
+ (case (dictionary.get module current)
+ (#.Some [descriptor pending])
+ (wrap (#.Left pending))
+
+ #.None
+ (do @
+ [input (context.read file-system sources module)]
+ (do stm.monad
+ [stale (stm.read compilations)]
+ (case (dictionary.get module stale)
+ (#.Some [descriptor pending])
+ (wrap (#.Left [pending current]))
+
+ #.None
+ (do @
+ [#let [base-descriptor {#descriptor.hash (get@ #compiler.hash input)
+ #descriptor.name (get@ #compiler.module input)
+ #descriptor.file (get@ #compiler.file input)
+ #descriptor.references (list)
+ #descriptor.state #.Active}
+ pending (promise.promise (: (Maybe (Error (Ex [d] (Document d))))
+ #.None))]
+ updated (stm.update (dictionary.put (get@ #compiler.module input)
+ [base-descriptor pending])
+ compilations)]
+ (wrap (is? current stale)
+ (#.Right [pending updated input])))))))))
+
+(def: (mediate-compilation Monad<!> mediate compiler input archive pending)
+ (All [! d o] (-> (Monad !) (Mediator ! d o) (Compiler d o) Input Archive Pending-Compilation (Task Archive)))
+ (loop [archive archive
+ compilation (compiler input)]
+ (do Monad<!>
+ [#let [[dependencies process] compilation]
+ _ (ensure-valid-dependencies! active dependencies (get@ #compiler.module input))
+ imports (import @ mediate archive dependencies)
+ [archive' next] (promise/wrap (step-compilation archive imports compilation))]
+ (case next
+ (#.Left continue)
+ (recur archive' continue)
+
+ (#.Right [document output])
+ (exec (io.run (promise.resolve (#error.Success document) pending))
+ (wrap archive'))))))
+
+(def: #export (mediator file-system sources compiler)
+ (//.Instancer Task)
+ (let [compilations (: (Var Active-Compilations)
+ (stm.var (dictionary.new text.hash)))]
+ (function (mediate archive module)
+ (do (:: file-system &monad)
+ [request (request-compilation file-system sources module compilations)]
+ (case request
+ (#.Left pending)
+ (share-compilation archive pending)
+
+ (#.Right [pending active input])
+ (mediate-compilation @ mediate compiler input archive pending))))))