aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux389
-rw-r--r--stdlib/source/lux/control/concurrency/atom.lux102
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux295
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux199
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux173
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux273
-rw-r--r--stdlib/source/lux/control/concurrency/thread.lux169
7 files changed, 0 insertions, 1600 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
deleted file mode 100644
index 9e17193b2..000000000
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ /dev/null
@@ -1,389 +0,0 @@
-(.module: {#.doc "The actor model of concurrency."}
- [lux #*
- [abstract
- monad]
- [control
- [pipe (#+ case>)]
- ["." function]
- ["." try (#+ Try)]
- ["." exception (#+ exception:)]
- ["." io (#+ IO io)]
- ["<>" parser
- ["<.>" code (#+ Parser)]]]
- [data
- ["." product]
- [text
- ["%" format (#+ format)]]
- [collection
- ["." list ("#\." monoid monad fold)]]]
- ["." macro (#+ with_gensyms)
- ["." code]
- [syntax (#+ syntax:)
- ["|.|" input]
- ["|.|" export]
- ["|.|" annotations]]]
- [math
- [number
- ["n" nat]]]
- ["." meta (#+ monad)
- ["." annotation]]
- [type (#+ :share)
- ["." abstract (#+ abstract: :representation :abstraction)]]]
- [//
- ["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver) ("#\." monad)]
- ["." frp (#+ Channel)]])
-
-(exception: #export poisoned)
-(exception: #export dead)
-
-(with_expansions
- [<Mail> (as_is (-> s (Actor s) (Promise (Try s))))
- <Obituary> (as_is [Text s (List <Mail>)])
- <Mailbox> (as_is (Rec Mailbox
- [(Promise [<Mail> Mailbox])
- (Resolver [<Mail> Mailbox])]))]
-
- (def: (pending [read write])
- (All [a]
- (-> (Rec Mailbox
- [(Promise [a Mailbox])
- (Resolver [a Mailbox])])
- (IO (List a))))
- (do {! io.monad}
- [current (promise.poll read)]
- (case current
- (#.Some [head tail])
- (\ ! map (|>> (#.Cons head))
- (pending tail))
-
- #.None
- (wrap #.Nil))))
-
- (abstract: #export (Actor s)
- {#obituary [(Promise <Obituary>)
- (Resolver <Obituary>)]
- #mailbox (Atom <Mailbox>)}
-
- (type: #export (Mail s)
- <Mail>)
-
- (type: #export (Obituary s)
- <Obituary>)
-
- (type: #export (Behavior o s)
- {#.doc "An actor's behavior when mail is received and when a fatal error occurs."}
- {#on_init (-> o s)
- #on_mail (-> (Mail s) s (Actor s) (Promise (Try s)))})
-
- (def: #export (spawn! behavior init)
- {#.doc "Given a behavior and initial state, spawns an actor and returns it."}
- (All [o s] (-> (Behavior o s) o (IO (Actor s))))
- (io (let [[on_init on_mail] behavior
- self (:share [o s]
- (Behavior o s)
- behavior
-
- (Actor s)
- (:abstraction {#obituary (promise.promise [])
- #mailbox (atom (promise.promise []))}))
- process (loop [state (on_init init)
- [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]
- (do {! promise.monad}
- [[head tail] |mailbox|
- ?state' (on_mail head state self)]
- (case ?state'
- (#try.Failure error)
- (let [[_ resolve] (get@ #obituary (:representation self))]
- (exec (io.run
- (do io.monad
- [pending (..pending tail)]
- (resolve [error state (#.Cons head pending)])))
- (wrap [])))
-
- (#try.Success state')
- (recur state' tail))))]
- self)))
-
- (def: #export (alive? actor)
- (All [s] (-> (Actor s) (IO Bit)))
- (let [[obituary _] (get@ #obituary (:representation actor))]
- (|> obituary
- promise.poll
- (\ io.functor map
- (|>> (case> #.None
- yes
-
- _
- no))))))
-
- (def: #export (obituary actor)
- (All [s] (-> (Actor s) (IO (Maybe (Obituary s)))))
- (let [[obituary _] (get@ #obituary (:representation actor))]
- (promise.poll obituary)))
-
- (def: #export await
- (All [s] (-> (Actor s) (Promise (Obituary s))))
- (|>> :representation
- (get@ #obituary)
- product.left))
-
- (def: #export (mail! mail actor)
- {#.doc "Send mail to an actor.."}
- (All [s] (-> (Mail s) (Actor s) (IO (Try Any))))
- (do {! io.monad}
- [alive? (..alive? actor)]
- (if alive?
- (let [entry [mail (promise.promise [])]]
- (do !
- [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))]
- (loop [[|mailbox| resolve] |mailbox|&resolve]
- (do !
- [|mailbox| (promise.poll |mailbox|)]
- (case |mailbox|
- #.None
- (do !
- [resolved? (resolve entry)]
- (if resolved?
- (do !
- [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))]
- (wrap (exception.return [])))
- (recur |mailbox|&resolve)))
-
- (#.Some [_ |mailbox|'])
- (recur |mailbox|'))))))
- (wrap (exception.throw ..dead [])))))
-
- (type: #export (Message s o)
- (-> s (Actor s) (Promise (Try [s o]))))
-
- (def: (mail message)
- (All [s o] (-> (Message s o) [(Promise (Try o)) (Mail s)]))
- (let [[promise resolve] (:share [s o]
- (Message s o)
- message
-
- [(Promise (Try o))
- (Resolver (Try o))]
- (promise.promise []))]
- [promise
- (function (_ state self)
- (do {! promise.monad}
- [outcome (message state self)]
- (case outcome
- (#try.Success [state' return])
- (exec (io.run (resolve (#try.Success return)))
- (promise.resolved (#try.Success state')))
-
- (#try.Failure error)
- (exec (io.run (resolve (#try.Failure error)))
- (promise.resolved (#try.Failure error))))))]))
-
- (def: #export (tell! message actor)
- {#.doc "Communicate with an actor through message passing."}
- (All [s o] (-> (Message s o) (Actor s) (Promise (Try o))))
- (let [[promise mail] (..mail message)]
- (do promise.monad
- [outcome (promise.future (..mail! mail actor))]
- (case outcome
- (#try.Success)
- promise
-
- (#try.Failure error)
- (wrap (#try.Failure error))))))
- )
- )
-
-(def: (default_on_mail mail state self)
- (All [s] (-> (Mail s) s (Actor s) (Promise (Try s))))
- (mail state self))
-
-(def: #export default
- (All [s] (Behavior s s))
- {#on_init function.identity
- #on_mail ..default_on_mail})
-
-(def: #export (poison! actor)
- {#.doc (doc "Kills the actor by sending mail that will kill it upon processing,"
- "but allows the actor to handle previous mail.")}
- (All [s] (-> (Actor s) (IO (Try Any))))
- (..mail! (function (_ state self)
- (promise.resolved (exception.throw ..poisoned [])))
- actor))
-
-(def: actor_decl^
- (Parser [Text (List Text)])
- (<>.either (<code>.form (<>.and <code>.local_identifier (<>.some <code>.local_identifier)))
- (<>.and <code>.local_identifier (\ <>.monad wrap (list)))))
-
-(type: On_MailC
- [[Text Text Text] Code])
-
-(type: BehaviorC
- [(Maybe On_MailC) (List Code)])
-
-(def: argument
- (Parser Text)
- <code>.local_identifier)
-
-(def: behavior^
- (Parser BehaviorC)
- (let [on_mail_args ($_ <>.and ..argument ..argument ..argument)]
- ($_ <>.and
- (<>.maybe (<code>.form (<>.and (<code>.form (<>.after (<code>.this! (' on_mail)) on_mail_args))
- <code>.any)))
- (<>.some <code>.any))))
-
-(def: (on_mail g!_ ?on_mail)
- (-> Code (Maybe On_MailC) Code)
- (case ?on_mail
- #.None
- (` (~! ..default_on_mail))
-
- (#.Some [[mailN stateN selfN] bodyC])
- (` (function ((~ g!_)
- (~ (code.local_identifier mailN))
- (~ (code.local_identifier stateN))
- (~ (code.local_identifier selfN)))
- (~ bodyC)))))
-
-(with_expansions [<examples> (as_is (actor: #export (Stack a)
- (List a)
-
- ((on_mail mail state self)
- (do (try.with promise.monad)
- [#let [_ (log! "BEFORE")]
- output (mail state self)
- #let [_ (log! "AFTER")]]
- (wrap output)))
-
- (message: #export (push {value a} state self (List a))
- (let [state' (#.Cons value state)]
- (promise.resolved (#try.Success [state' state'])))))
-
- (actor: #export Counter
- Nat
-
- (message: #export (count! {increment Nat} state self Any)
- (let [state' (n.+ increment state)]
- (promise.resolved (#try.Success [state' state']))))
-
- (message: #export (read! state self Nat)
- (promise.resolved (#try.Success [state state])))))]
- (syntax: #export (actor:
- {export |export|.parser}
- {[name vars] actor_decl^}
- {annotations (<>.default |annotations|.empty |annotations|.parser)}
- state_type
- {[?on_mail messages] behavior^})
- {#.doc (doc "Defines an actor, with its behavior and internal state."
- "Messages for the actor must be defined after the on_mail handler."
- <examples>)}
- (with_gensyms [g!_]
- (do meta.monad
- [g!type (macro.gensym (format name "_abstract_type"))
- #let [g!actor (code.local_identifier name)
- g!vars (list\map code.local_identifier vars)]]
- (wrap (list (` ((~! abstract:) (~+ (|export|.format export)) ((~ g!type) (~+ g!vars))
- (~ state_type)
-
- (def: (~+ (|export|.format export)) (~ g!actor)
- (All [(~+ g!vars)]
- (..Behavior (~ state_type) ((~ g!type) (~+ g!vars))))
- {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type)))
- #..on_mail (~ (..on_mail g!_ ?on_mail))})
-
- (~+ messages))))))))
-
- (syntax: #export (actor {[state_type init] (<code>.record (<>.and <code>.any <code>.any))}
- {[?on_mail messages] behavior^})
- (with_gensyms [g!_]
- (wrap (list (` (: ((~! io.IO) (..Actor (~ state_type)))
- (..spawn! (: (..Behavior (~ state_type) (~ state_type))
- {#..on_init (|>>)
- #..on_mail (~ (..on_mail g!_ ?on_mail))})
- (: (~ state_type)
- (~ init)))))))))
-
- (type: Signature
- {#vars (List Text)
- #name Text
- #inputs (List |input|.Input)
- #state Text
- #self Text
- #output Code})
-
- (def: signature^
- (Parser Signature)
- (<code>.form ($_ <>.and
- (<>.default (list) (<code>.tuple (<>.some <code>.local_identifier)))
- <code>.local_identifier
- (<>.some |input|.parser)
- <code>.local_identifier
- <code>.local_identifier
- <code>.any)))
-
- (def: reference^
- (Parser [Name (List Text)])
- (<>.either (<code>.form (<>.and <code>.identifier (<>.some <code>.local_identifier)))
- (<>.and <code>.identifier (\ <>.monad wrap (list)))))
-
- (syntax: #export (message:
- {export |export|.parser}
- {signature signature^}
- {annotations (<>.default |annotations|.empty |annotations|.parser)}
- body)
- {#.doc (doc "A message can access the actor's state through the state parameter."
- "A message can also access the actor itself through the self parameter."
- "A message's output must be a promise containing a 2-tuple with the updated state and a return value."
- "A message may succeed or fail (in case of failure, the actor dies)."
-
- <examples>)}
- (with_gensyms [g!_ g!return]
- (do meta.monad
- [actor_scope abstract.current
- #let [g!type (code.local_identifier (get@ #abstract.name actor_scope))
- g!message (code.local_identifier (get@ #name signature))
- g!actor_vars (get@ #abstract.type_vars actor_scope)
- g!all_vars (|> signature (get@ #vars) (list\map code.local_identifier) (list\compose g!actor_vars))
- g!inputsC (|> signature (get@ #inputs) (list\map product.left))
- g!inputsT (|> signature (get@ #inputs) (list\map product.right))
- g!state (|> signature (get@ #state) code.local_identifier)
- g!self (|> signature (get@ #self) code.local_identifier)]]
- (wrap (list (` (def: (~+ (|export|.format export)) ((~ g!message) (~+ g!inputsC))
- (~ (|annotations|.format annotations))
- (All [(~+ g!all_vars)]
- (-> (~+ g!inputsT)
- (..Message (~ (get@ #abstract.abstraction actor_scope))
- (~ (get@ #output signature)))))
- (function ((~ g!_) (~ g!state) (~ g!self))
- (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope))
- (~ g!state))]
- (|> (~ body)
- (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))
- (~ (get@ #output signature))])))
- (:as ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))
- (~ (get@ #output signature))]))))))))
- ))))))
-
-(type: #export Stop
- (IO Any))
-
-(def: continue! true)
-(def: stop! false)
-
-(def: #export (observe action channel actor)
- (All [e s] (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any)))
- (let [signal (: (Atom Bit)
- (atom.atom ..continue!))
- stop (: Stop
- (atom.write ..stop! signal))]
- (frp.subscribe (function (_ event)
- (do {! io.monad}
- [continue? (atom.read signal)]
- (if continue?
- (do !
- [outcome (..mail! (action event stop) actor)]
- (wrap (try.to_maybe outcome)))
- (wrap #.None))))
- channel)))
diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux
deleted file mode 100644
index e3b711785..000000000
--- a/stdlib/source/lux/control/concurrency/atom.lux
+++ /dev/null
@@ -1,102 +0,0 @@
-(.module:
- [lux #*
- ["." ffi]
- ["@" target]
- [abstract
- [monad (#+ do)]]
- [control
- ["." function]
- ["." io (#- run) ("#\." functor)]]
- [data
- ["." product]
- [collection
- ["." array]]]
- [type
- abstract]])
-
-(with_expansions [<jvm> (as_is (ffi.import: (java/util/concurrent/atomic/AtomicReference a)
- ["#::."
- (new [a])
- (get [] a)
- (compareAndSet [a a] boolean)]))]
- (for {@.old <jvm>
- @.jvm <jvm>}
- (as_is)))
-
-(with_expansions [<new> (for {@.js "js array new"
- @.python "python array new"
- @.lua "lua array new"
- @.ruby "ruby array new"
- @.php "php array new"
- @.scheme "scheme array new"}
- (as_is))
- <write> (for {@.js "js array write"
- @.python "python array write"
- @.lua "lua array write"
- @.ruby "ruby array write"
- @.php "php array write"
- @.scheme "scheme array write"}
- (as_is))
-
- <read> (for {@.js "js array read"
- @.python "python array read"
- @.lua "lua array read"
- @.ruby "ruby array read"
- @.php "php array read"
- @.scheme "scheme array read"}
- (as_is))]
- (abstract: #export (Atom a)
- (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)]
- (for {@.old <jvm>
- @.jvm <jvm>}
- (array.Array a)))
-
- {#.doc "Atomic references that are safe to mutate concurrently."}
-
- (def: #export (atom value)
- (All [a] (-> a (Atom a)))
- (:abstraction (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::new value)]
- (for {@.old <jvm>
- @.jvm <jvm>}
- (<write> 0 value (<new> 1))))))
-
- (def: #export (read atom)
- (All [a] (-> (Atom a) (IO a)))
- (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (:representation atom))]
- (for {@.old <jvm>
- @.jvm <jvm>}
- (<read> 0 (:representation atom))))))
-
- (def: #export (compare_and_swap current new atom)
- {#.doc (doc "Only mutates an atom if you can present its current value."
- "That guarantees that atom was not updated since you last read from it.")}
- (All [a] (-> a a (Atom a) (IO Bit)))
- (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))]
- (for {@.old <jvm>
- @.jvm <jvm>}
- (let [old (<read> 0 (:representation atom))]
- (if (is? old current)
- (exec (<write> 0 new (:representation atom))
- true)
- false))))))
- ))
-
-(def: #export (update f atom)
- {#.doc (doc "Updates an atom by applying a function to its current value."
- "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds."
- "The retries will be done with the new values of the atom, as they show up.")}
- (All [a] (-> (-> a a) (Atom a) (IO [a a])))
- (loop [_ []]
- (do io.monad
- [old (read atom)
- #let [new (f old)]
- swapped? (..compare_and_swap old new atom)]
- (if swapped?
- (wrap [old new])
- (recur [])))))
-
-(def: #export (write value atom)
- (All [a] (-> a (Atom a) (IO a)))
- (|> atom
- (..update (function.constant value))
- (io\map product.left)))
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
deleted file mode 100644
index 452c153f1..000000000
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ /dev/null
@@ -1,295 +0,0 @@
-(.module:
- [lux #*
- [abstract
- [predicate (#+ Predicate)]
- [equivalence (#+ Equivalence)]
- [functor (#+ Functor)]
- [apply (#+ Apply)]
- ["." monad (#+ Monad do)]]
- [control
- ["." try (#+ Try)]
- ["." exception (#+ exception:)]
- ["." io (#+ IO io)]]
- [data
- ["." maybe ("#\." functor)]]
- [type (#+ :share)
- abstract]]
- [//
- ["." atom (#+ Atom)]
- ["." promise (#+ Promise) ("#\." functor)]])
-
-(type: #export (Channel a)
- {#.doc "An asynchronous channel to distribute values."}
- (Promise (Maybe [a (Channel a)])))
-
-(exception: #export channel_is_already_closed)
-
-(interface: #export (Sink a)
- (: (IO (Try Any))
- close)
- (: (-> a (IO (Try Any)))
- feed))
-
-(def: (sink resolve)
- (All [a]
- (-> (promise.Resolver (Maybe [a (Channel a)]))
- (Sink a)))
- (let [sink (atom.atom resolve)]
- (implementation
- (def: close
- (loop [_ []]
- (do {! io.monad}
- [current (atom.read sink)
- stopped? (current #.None)]
- (if stopped?
- ## I closed the sink.
- (wrap (exception.return []))
- ## Someone else interacted with the sink.
- (do !
- [latter (atom.read sink)]
- (if (is? current latter)
- ## Someone else closed the sink.
- (wrap (exception.throw ..channel_is_already_closed []))
- ## Someone else fed the sink while I was closing it.
- (recur [])))))))
-
- (def: (feed value)
- (loop [_ []]
- (do {! io.monad}
- [current (atom.read sink)
- #let [[next resolve_next] (:share [a]
- (promise.Resolver (Maybe [a (Channel a)]))
- current
-
- [(Promise (Maybe [a (Channel a)]))
- (promise.Resolver (Maybe [a (Channel a)]))]
- (promise.promise []))]
- fed? (current (#.Some [value next]))]
- (if fed?
- ## I fed the sink.
- (do !
- [_ (atom.compare_and_swap current resolve_next sink)]
- (wrap (exception.return [])))
- ## Someone else interacted with the sink.
- (do !
- [latter (atom.read sink)]
- (if (is? current latter)
- ## Someone else closed the sink while I was feeding it.
- (wrap (exception.throw ..channel_is_already_closed []))
- ## Someone else fed the sink.
- (recur []))))))))))
-
-(def: #export (channel _)
- (All [a] (-> Any [(Channel a) (Sink a)]))
- (let [[promise resolve] (promise.promise [])]
- [promise (..sink resolve)]))
-
-(implementation: #export functor
- (Functor Channel)
-
- (def: (map f)
- (promise\map
- (maybe\map
- (function (_ [head tail])
- [(f head) (map f tail)])))))
-
-(implementation: #export apply
- (Apply Channel)
-
- (def: &functor ..functor)
-
- (def: (apply ff fa)
- (do promise.monad
- [cons_f ff
- cons_a fa]
- (case [cons_f cons_a]
- [(#.Some [head_f tail_f]) (#.Some [head_a tail_a])]
- (wrap (#.Some [(head_f head_a) (apply tail_f tail_a)]))
-
- _
- (wrap #.None)))))
-
-(def: empty
- Channel
- (promise.resolved #.None))
-
-(implementation: #export monad
- (Monad Channel)
-
- (def: &functor ..functor)
-
- (def: (wrap a)
- (promise.resolved (#.Some [a ..empty])))
-
- (def: (join mma)
- (let [[output sink] (channel [])]
- (exec (: (Promise Any)
- (loop [mma mma]
- (do {! promise.monad}
- [?mma mma]
- (case ?mma
- (#.Some [ma mma'])
- (do !
- [_ (loop [ma ma]
- (do !
- [?ma ma]
- (case ?ma
- (#.Some [a ma'])
- (exec (io.run (\ sink feed a))
- (recur ma'))
-
- #.None
- (wrap []))))]
- (recur mma'))
-
- #.None
- (wrap (: Any (io.run (\ sink close))))))))
- output))))
-
-(type: #export (Subscriber a)
- (-> a (IO (Maybe Any))))
-
-(def: #export (subscribe subscriber channel)
- (All [a] (-> (Subscriber a) (Channel a) (IO Any)))
- (io (exec (: (Promise Any)
- (loop [channel channel]
- (do promise.monad
- [cons channel]
- (case cons
- (#.Some [head tail])
- (case (io.run (subscriber head))
- (#.Some _)
- (recur tail)
-
- #.None
- (wrap []))
-
- #.None
- (wrap [])))))
- [])))
-
-(def: #export (filter pass? channel)
- (All [a] (-> (Predicate a) (Channel a) (Channel a)))
- (do promise.monad
- [cons channel]
- (case cons
- (#.Some [head tail])
- (let [tail' (filter pass? tail)]
- (if (pass? head)
- (wrap (#.Some [head tail']))
- tail'))
-
- #.None
- (wrap #.None))))
-
-(def: #export (from_promise promise)
- (All [a] (-> (Promise a) (Channel a)))
- (promise\map (function (_ value)
- (#.Some [value ..empty]))
- promise))
-
-(def: #export (fold f init channel)
- {#.doc "Asynchronous fold over channels."}
- (All [a b]
- (-> (-> b a (Promise a)) a (Channel b)
- (Promise a)))
- (do {! promise.monad}
- [cons channel]
- (case cons
- #.None
- (wrap init)
-
- (#.Some [head tail])
- (do !
- [init' (f head init)]
- (fold f init' tail)))))
-
-(def: #export (folds f init channel)
- {#.doc "A channel of folds."}
- (All [a b]
- (-> (-> b a (Promise a)) a (Channel b)
- (Channel a)))
- (do {! promise.monad}
- [cons channel]
- (case cons
- #.None
- (wrap (#.Some [init (wrap #.None)]))
-
- (#.Some [head tail])
- (do !
- [init' (f head init)]
- (wrap (#.Some [init (folds f init' tail)]))))))
-
-(def: #export (poll milli_seconds action)
- (All [a]
- (-> Nat (IO a) [(Channel a) (Sink a)]))
- (let [[output sink] (channel [])]
- (exec (io.run (loop [_ []]
- (do io.monad
- [value action
- _ (\ sink feed value)]
- (promise.await recur (promise.wait milli_seconds)))))
- [output sink])))
-
-(def: #export (periodic milli_seconds)
- (-> Nat [(Channel Any) (Sink Any)])
- (..poll milli_seconds (io [])))
-
-(def: #export (iterate f init)
- (All [s o] (-> (-> s (Promise (Maybe [s o]))) s (Channel o)))
- (do promise.monad
- [?next (f init)]
- (case ?next
- (#.Some [state output])
- (wrap (#.Some [output (iterate f state)]))
-
- #.None
- (wrap #.None))))
-
-(def: (distinct' equivalence previous channel)
- (All [a] (-> (Equivalence a) a (Channel a) (Channel a)))
- (do promise.monad
- [cons channel]
- (case cons
- (#.Some [head tail])
- (if (\ equivalence = previous head)
- (distinct' equivalence previous tail)
- (wrap (#.Some [head (distinct' equivalence head tail)])))
-
- #.None
- (wrap #.None))))
-
-(def: #export (distinct equivalence channel)
- (All [a] (-> (Equivalence a) (Channel a) (Channel a)))
- (do promise.monad
- [cons channel]
- (case cons
- (#.Some [head tail])
- (wrap (#.Some [head (distinct' equivalence head tail)]))
-
- #.None
- (wrap #.None))))
-
-(def: #export (consume channel)
- {#.doc "Reads the entirety of a channel's content and returns it as a list."}
- (All [a] (-> (Channel a) (Promise (List a))))
- (do {! promise.monad}
- [cons channel]
- (case cons
- (#.Some [head tail])
- (\ ! map (|>> (#.Cons head))
- (consume tail))
-
- #.None
- (wrap #.Nil))))
-
-(def: #export (sequential milli_seconds values)
- (All [a] (-> Nat (List a) (Channel a)))
- (case values
- #.Nil
- ..empty
-
- (#.Cons head tail)
- (promise.resolved (#.Some [head (do promise.monad
- [_ (promise.wait milli_seconds)]
- (sequential milli_seconds tail))]))))
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
deleted file mode 100644
index 8e0acf8b9..000000000
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ /dev/null
@@ -1,199 +0,0 @@
-(.module:
- [lux (#- and or)
- [abstract
- [functor (#+ Functor)]
- [apply (#+ Apply)]
- ["." monad (#+ Monad do)]]
- [control
- [pipe (#+ case>)]
- ["." function]
- ["." io (#+ IO io)]]
- [data
- ["." product]]
- [type (#+ :share)
- abstract]]
- [//
- ["." thread]
- ["." atom (#+ Atom atom)]])
-
-(abstract: #export (Promise a)
- (Atom [(Maybe a) (List (-> a (IO Any)))])
-
- {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."}
-
- (type: #export (Resolver a)
- (-> a (IO Bit)))
-
- (def: (resolver promise)
- {#.doc "Sets an promise's value if it has not been done yet."}
- (All [a] (-> (Promise a) (Resolver a)))
- (function (resolve value)
- (let [promise (:representation promise)]
- (do {! io.monad}
- [(^@ old [_value _observers]) (atom.read promise)]
- (case _value
- (#.Some _)
- (wrap #0)
-
- #.None
- (do !
- [#let [new [(#.Some value) #.None]]
- succeeded? (atom.compare_and_swap old new promise)]
- (if succeeded?
- (do !
- [_ (monad.map ! (function (_ f) (f value))
- _observers)]
- (wrap #1))
- (resolve value))))))))
-
- (def: #export (resolved value)
- (All [a] (-> a (Promise a)))
- (:abstraction (atom [(#.Some value) (list)])))
-
- (def: #export (promise _)
- (All [a] (-> Any [(Promise a) (Resolver a)]))
- (let [promise (:abstraction (atom [#.None (list)]))]
- [promise (..resolver promise)]))
-
- (def: #export poll
- {#.doc "Polls a promise's value."}
- (All [a] (-> (Promise a) (IO (Maybe a))))
- (|>> :representation
- atom.read
- (\ io.functor map product.left)))
-
- (def: #export (await f promise)
- (All [a] (-> (-> a (IO Any)) (Promise a) (IO Any)))
- (do {! io.monad}
- [#let [promise (:representation promise)]
- (^@ old [_value _observers]) (atom.read promise)]
- (case _value
- (#.Some value)
- (f value)
-
- #.None
- (let [new [_value (#.Cons f _observers)]]
- (do !
- [swapped? (atom.compare_and_swap old new promise)]
- (if swapped?
- (wrap [])
- (await f (:abstraction promise))))))))
- )
-
-(def: #export resolved?
- {#.doc "Checks whether a promise's value has already been resolved."}
- (All [a] (-> (Promise a) (IO Bit)))
- (|>> ..poll
- (\ io.functor map
- (|>> (case> #.None
- #0
-
- (#.Some _)
- #1)))))
-
-(implementation: #export functor
- (Functor Promise)
-
- (def: (map f fa)
- (let [[fb resolve] (..promise [])]
- (exec (io.run (..await (|>> f resolve) fa))
- fb))))
-
-(implementation: #export apply
- (Apply Promise)
-
- (def: &functor ..functor)
-
- (def: (apply ff fa)
- (let [[fb resolve] (..promise [])]
- (exec (io.run (..await (function (_ f)
- (..await (|>> f resolve) fa))
- ff))
- fb))))
-
-(implementation: #export monad
- (Monad Promise)
-
- (def: &functor ..functor)
-
- (def: wrap ..resolved)
-
- (def: (join mma)
- (let [[ma resolve] (promise [])]
- (exec (io.run (..await (..await resolve) mma))
- ma))))
-
-(def: #export (and left right)
- {#.doc "Sequencing combinator."}
- (All [a b] (-> (Promise a) (Promise b) (Promise [a b])))
- (let [[read! write!] (:share [a b]
- [(Promise a) (Promise b)]
- [left right]
-
- [(Promise [a b])
- (Resolver [a b])]
- (..promise []))
- _ (io.run (..await (function (_ left)
- (..await (function (_ right)
- (write! [left right]))
- right))
- left))]
- read!))
-
-(def: #export (or left right)
- {#.doc "Heterogeneous alternative combinator."}
- (All [a b] (-> (Promise a) (Promise b) (Promise (| a b))))
- (let [[a|b resolve] (..promise [])]
- (with_expansions
- [<sides> (template [<promise> <tag>]
- [(io.run (await (|>> <tag> resolve) <promise>))]
-
- [left #.Left]
- [right #.Right]
- )]
- (exec <sides>
- a|b))))
-
-(def: #export (either left right)
- {#.doc "Homogeneous alternative combinator."}
- (All [a] (-> (Promise a) (Promise a) (Promise a)))
- (let [[left||right resolve] (..promise [])]
- (`` (exec (~~ (template [<promise>]
- [(io.run (await resolve <promise>))]
-
- [left]
- [right]))
- left||right))))
-
-(def: #export (schedule millis_delay computation)
- {#.doc (doc "Runs an I/O computation on its own thread (after a specified delay)."
- "Returns a Promise that will eventually host its result.")}
- (All [a] (-> Nat (IO a) (Promise a)))
- (let [[!out resolve] (..promise [])]
- (exec (|> (do io.monad
- [value computation]
- (resolve value))
- (thread.schedule millis_delay)
- io.run)
- !out)))
-
-(def: #export future
- {#.doc (doc "Runs an I/O computation on its own thread."
- "Returns a Promise that will eventually host its result.")}
- (All [a] (-> (IO a) (Promise a)))
- (..schedule 0))
-
-(def: #export (delay time_millis value)
- {#.doc "Delivers a value after a certain period has passed."}
- (All [a] (-> Nat a (Promise a)))
- (..schedule time_millis (io value)))
-
-(def: #export (wait time_millis)
- {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."}
- (-> Nat (Promise Any))
- (..delay time_millis []))
-
-(def: #export (time_out time_millis promise)
- {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."}
- (All [a] (-> Nat (Promise a) (Promise (Maybe a))))
- (..or (wait time_millis) promise))
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
deleted file mode 100644
index 0e8fa2b94..000000000
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ /dev/null
@@ -1,173 +0,0 @@
-(.module:
- [lux #*
- [abstract
- [monad (#+ do)]]
- [control
- [pipe (#+ if>)]
- ["." io (#+ IO)]
- ["." try (#+ Try)]
- ["." exception (#+ exception:)]]
- [data
- [text
- ["%" format (#+ format)]]
- [collection
- ["." queue (#+ Queue)]]]
- [math
- [number
- ["n" nat]
- ["i" int]]]
- [type
- abstract
- ["." refinement]]]
- [//
- ["." atom (#+ Atom)]
- ["." promise (#+ Promise Resolver)]])
-
-(type: State
- {#max_positions Nat
- #open_positions Int
- #waiting_list (Queue (Resolver Any))})
-
-(abstract: #export Semaphore
- (Atom State)
-
- {#.doc "A tool for controlling access to resources by multiple concurrent processes."}
-
- (def: most_positions_possible
- (.nat (\ i.interval top)))
-
- (def: #export (semaphore initial_open_positions)
- (-> Nat Semaphore)
- (let [max_positions (n.min initial_open_positions
- ..most_positions_possible)]
- (:abstraction (atom.atom {#max_positions max_positions
- #open_positions (.int max_positions)
- #waiting_list queue.empty}))))
-
- (def: #export (wait semaphore)
- (Ex [k] (-> Semaphore (Promise Any)))
- (let [semaphore (:representation semaphore)
- [signal sink] (: [(Promise Any) (Resolver Any)]
- (promise.promise []))]
- (exec (io.run
- (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))]
- (do io.monad
- [[_ state'] (atom.update (|>> (update@ #open_positions dec)
- (if> [<had_open_position?>]
- []
- [(update@ #waiting_list (queue.push sink))]))
- semaphore)]
- (with_expansions [<go_ahead> (sink [])
- <get_in_line> (wrap false)]
- (if (|> state' <had_open_position?>)
- <go_ahead>
- <get_in_line>)))))
- signal)))
-
- (exception: #export (semaphore_is_maxed_out {max_positions Nat})
- (exception.report
- ["Max Positions" (%.nat max_positions)]))
-
- (def: #export (signal semaphore)
- (Ex [k] (-> Semaphore (Promise (Try Int))))
- (let [semaphore (:representation semaphore)]
- (promise.future
- (do {! io.monad}
- [[pre post] (atom.update (function (_ state)
- (if (i.= (.int (get@ #max_positions state))
- (get@ #open_positions state))
- state
- (|> state
- (update@ #open_positions inc)
- (update@ #waiting_list queue.pop))))
- semaphore)]
- (if (is? pre post)
- (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)]))
- (do !
- [_ (case (queue.peek (get@ #waiting_list pre))
- #.None
- (wrap true)
-
- (#.Some sink)
- (sink []))]
- (wrap (#try.Success (get@ #open_positions post)))))))))
- )
-
-(abstract: #export Mutex
- Semaphore
-
- {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."}
-
- (def: #export (mutex _)
- (-> Any Mutex)
- (:abstraction (semaphore 1)))
-
- (def: acquire
- (-> Mutex (Promise Any))
- (|>> :representation ..wait))
-
- (def: release
- (-> Mutex (Promise Any))
- (|>> :representation ..signal))
-
- (def: #export (synchronize mutex procedure)
- (All [a] (-> Mutex (IO (Promise a)) (Promise a)))
- (do promise.monad
- [_ (..acquire mutex)
- output (io.run procedure)
- _ (..release mutex)]
- (wrap output)))
- )
-
-(def: #export limit
- (refinement.refinement (n.> 0)))
-
-(type: #export Limit
- (:~ (refinement.type limit)))
-
-(abstract: #export Barrier
- {#limit Limit
- #count (Atom Nat)
- #start_turnstile Semaphore
- #end_turnstile Semaphore}
-
- {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
-
- (def: #export (barrier limit)
- (-> Limit Barrier)
- (:abstraction {#limit limit
- #count (atom.atom 0)
- #start_turnstile (..semaphore 0)
- #end_turnstile (..semaphore 0)}))
-
- (def: (un_block times turnstile)
- (-> Nat Semaphore (Promise Any))
- (loop [step 0]
- (if (n.< times step)
- (do promise.monad
- [outcome (..signal turnstile)]
- (recur (inc step)))
- (\ promise.monad wrap []))))
-
- (template [<phase> <update> <goal> <turnstile>]
- [(def: (<phase> (^:representation barrier))
- (-> Barrier (Promise Any))
- (do promise.monad
- [#let [limit (refinement.un_refine (get@ #limit barrier))
- goal <goal>
- [_ count] (io.run (atom.update <update> (get@ #count barrier)))
- reached? (n.= goal count)]]
- (if reached?
- (..un_block (dec limit) (get@ <turnstile> barrier))
- (..wait (get@ <turnstile> barrier)))))]
-
- [start inc limit #start_turnstile]
- [end dec 0 #end_turnstile]
- )
-
- (def: #export (block barrier)
- (-> Barrier (Promise Any))
- (do promise.monad
- [_ (..start barrier)]
- (..end barrier)))
- )
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
deleted file mode 100644
index d375059a4..000000000
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ /dev/null
@@ -1,273 +0,0 @@
-(.module:
- [lux #*
- [abstract
- [functor (#+ Functor)]
- [apply (#+ Apply)]
- ["." monad (#+ Monad do)]]
- [control
- ["." io (#+ IO io)]
- ["." try]]
- [data
- ["." product]
- ["." maybe]
- [collection
- ["." list]]]
- [type
- abstract]]
- [//
- ["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver)]
- ["." frp (#+ Channel Sink)]])
-
-(type: (Observer a)
- (-> a (IO Any)))
-
-(abstract: #export (Var a)
- (Atom [a (List (Sink a))])
-
- {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
-
- (def: #export (var value)
- {#.doc "Creates a new STM var, with a default value."}
- (All [a] (-> a (Var a)))
- (:abstraction (atom.atom [value (list)])))
-
- (def: read!
- (All [a] (-> (Var a) a))
- (|>> :representation atom.read io.run product.left))
-
- (def: (un_follow sink var)
- (All [a] (-> (Sink a) (Var a) (IO Any)))
- (do io.monad
- [_ (atom.update (function (_ [value observers])
- [value (list.filter (|>> (is? sink) not) observers)])
- (:representation var))]
- (wrap [])))
-
- (def: (write! new_value var)
- (All [a] (-> a (Var a) (IO Any)))
- (do {! io.monad}
- [#let [var' (:representation var)]
- (^@ old [old_value observers]) (atom.read var')
- succeeded? (atom.compare_and_swap old [new_value observers] var')]
- (if succeeded?
- (do !
- [_ (monad.map ! (function (_ sink)
- (do !
- [result (\ sink feed new_value)]
- (case result
- (#try.Success _)
- (wrap [])
-
- (#try.Failure _)
- (un_follow sink var))))
- observers)]
- (wrap []))
- (write! new_value var))))
-
- (def: #export (follow target)
- {#.doc "Creates a channel that will receive all changes to the value of the given var."}
- (All [a] (-> (Var a) (IO [(Channel a) (Sink a)])))
- (do io.monad
- [#let [[channel sink] (frp.channel [])]
- _ (atom.update (function (_ [value observers])
- [value (#.Cons sink observers)])
- (:representation target))]
- (wrap [channel sink])))
- )
-
-(type: (Tx_Frame a)
- {#var (Var a)
- #original a
- #current a})
-
-(type: Tx
- (List (Ex [a] (Tx_Frame a))))
-
-(type: #export (STM a)
- {#.doc "A computation which updates a transaction and produces a value."}
- (-> Tx [Tx a]))
-
-(def: (find_var_value var tx)
- (All [a] (-> (Var a) Tx (Maybe a)))
- (|> tx
- (list.find (function (_ [_var _original _current])
- (is? (:as (Var Any) var)
- (:as (Var Any) _var))))
- (\ maybe.monad map (function (_ [_var _original _current])
- _current))
- (:assume)
- ))
-
-(def: #export (read var)
- (All [a] (-> (Var a) (STM a)))
- (function (_ tx)
- (case (find_var_value var tx)
- (#.Some value)
- [tx value]
-
- #.None
- (let [value (..read! var)]
- [(#.Cons [var value value] tx)
- value]))))
-
-(def: (update_tx_value var value tx)
- (All [a] (-> (Var a) a Tx Tx))
- (case tx
- #.Nil
- #.Nil
-
- (#.Cons [_var _original _current] tx')
- (if (is? (:as (Var Any) var)
- (:as (Var Any) _var))
- (#.Cons {#var (:as (Var Any) _var)
- #original (:as Any _original)
- #current (:as Any value)}
- tx')
- (#.Cons {#var _var
- #original _original
- #current _current}
- (update_tx_value var value tx')))))
-
-(def: #export (write value var)
- {#.doc "Writes value to var."}
- (All [a] (-> a (Var a) (STM Any)))
- (function (_ tx)
- (case (find_var_value var tx)
- (#.Some _)
- [(update_tx_value var value tx)
- []]
-
- #.None
- [(#.Cons [var (..read! var) value] tx)
- []])))
-
-(implementation: #export functor
- (Functor STM)
-
- (def: (map f fa)
- (function (_ tx)
- (let [[tx' a] (fa tx)]
- [tx' (f a)]))))
-
-(implementation: #export apply
- (Apply STM)
-
- (def: &functor ..functor)
-
- (def: (apply ff fa)
- (function (_ tx)
- (let [[tx' f] (ff tx)
- [tx'' a] (fa tx')]
- [tx'' (f a)]))))
-
-(implementation: #export monad
- (Monad STM)
-
- (def: &functor ..functor)
-
- (def: (wrap a)
- (function (_ tx)
- [tx a]))
-
- (def: (join mma)
- (function (_ tx)
- (let [[tx' ma] (mma tx)]
- (ma tx')))))
-
-(def: #export (update f var)
- {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
- (All [a] (-> (-> a a) (Var a) (STM [a a])))
- (do ..monad
- [a (..read var)
- #let [a' (f a)]
- _ (..write a' var)]
- (wrap [a a'])))
-
-(def: (can_commit? tx)
- (-> Tx Bit)
- (list.every? (function (_ [_var _original _current])
- (is? _original (..read! _var)))
- tx))
-
-(def: (commit_var! [_var _original _current])
- (-> (Ex [a] (Tx_Frame a)) (IO Any))
- (if (is? _original _current)
- (io [])
- (..write! _current _var)))
-
-(def: fresh_tx Tx (list))
-
-(type: (Commit a)
- [(STM a)
- (Promise a)
- (Resolver a)])
-
-(def: pending_commits
- (Atom (Rec Commits
- [(Promise [(Ex [a] (Commit a)) Commits])
- (Resolver [(Ex [a] (Commit a)) Commits])]))
- (atom (promise.promise [])))
-
-(def: commit_processor_flag
- (Atom Bit)
- (atom #0))
-
-(def: (issue_commit commit)
- (All [a] (-> (Commit a) (IO Any)))
- (let [entry [commit (promise.promise [])]]
- (do {! io.monad}
- [|commits|&resolve (atom.read pending_commits)]
- (loop [[|commits| resolve] |commits|&resolve]
- (do !
- [|commits| (promise.poll |commits|)]
- (case |commits|
- #.None
- (do io.monad
- [resolved? (resolve entry)]
- (if resolved?
- (atom.write (product.right entry) pending_commits)
- (recur |commits|&resolve)))
-
- (#.Some [head tail])
- (recur tail)))))))
-
-(def: (process_commit commit)
- (All [a] (-> (Commit a) (IO Any)))
- (let [[stm_proc output resolve] commit
- [finished_tx value] (stm_proc fresh_tx)]
- (if (can_commit? finished_tx)
- (do {! io.monad}
- [_ (monad.map ! commit_var! finished_tx)]
- (resolve value))
- (issue_commit commit))))
-
-(def: init_processor!
- (IO Any)
- (do {! io.monad}
- [flag (atom.read commit_processor_flag)]
- (if flag
- (wrap [])
- (do !
- [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)]
- (if was_first?
- (do !
- [[promise resolve] (atom.read pending_commits)]
- (promise.await (function (recur [head [tail _resolve]])
- (do !
- [_ (process_commit head)]
- (promise.await recur tail)))
- promise))
- (wrap [])))
- )))
-
-(def: #export (commit stm_proc)
- {#.doc (doc "Commits a transaction and returns its result (asynchronously)."
- "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
- "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
- (All [a] (-> (STM a) (Promise a)))
- (let [[output resolver] (promise.promise [])]
- (exec (io.run (do io.monad
- [_ init_processor!]
- (issue_commit [stm_proc output resolver])))
- output)))
diff --git a/stdlib/source/lux/control/concurrency/thread.lux b/stdlib/source/lux/control/concurrency/thread.lux
deleted file mode 100644
index d6dc71c37..000000000
--- a/stdlib/source/lux/control/concurrency/thread.lux
+++ /dev/null
@@ -1,169 +0,0 @@
-(.module:
- [lux #*
- ["@" target]
- ["." ffi]
- [abstract
- ["." monad (#+ do)]]
- [control
- ["." try]
- ["." exception (#+ exception:)]
- ["." io (#+ IO io)]]
- [data
- ["." text]
- [collection
- ["." list]]]
- [math
- [number
- ["n" nat]
- ["f" frac]]]
- [time
- ["." instant]]]
- [//
- ["." atom (#+ Atom)]])
-
-(with_expansions [<jvm> (as_is (ffi.import: java/lang/Object)
-
- (ffi.import: java/lang/Runtime
- ["#::."
- (#static getRuntime [] java/lang/Runtime)
- (availableProcessors [] int)])
-
- (ffi.import: java/lang/Runnable)
-
- (ffi.import: java/util/concurrent/TimeUnit
- ["#::."
- (#enum MILLISECONDS)])
-
- (ffi.import: java/util/concurrent/Executor
- ["#::."
- (execute [java/lang/Runnable] #io void)])
-
- (ffi.import: (java/util/concurrent/ScheduledFuture a))
-
- (ffi.import: java/util/concurrent/ScheduledThreadPoolExecutor
- ["#::."
- (new [int])
- (schedule [java/lang/Runnable long java/util/concurrent/TimeUnit] #io (java/util/concurrent/ScheduledFuture java/lang/Object))]))]
- (for {@.old (as_is <jvm>)
- @.jvm (as_is <jvm>)
-
- @.js
- (as_is (ffi.import: (setTimeout [ffi.Function ffi.Number] #io Any)))
-
- @.python
- (ffi.import: threading/Timer
- ["#::."
- (new [ffi.Float ffi.Function])
- (start [] #io #? Any)])}
-
- ## Default
- (type: Thread
- {#creation Nat
- #delay Nat
- #action (IO Any)})
- ))
-
-(def: #export parallelism
- Nat
- (with_expansions [<jvm> (|> (java/lang/Runtime::getRuntime)
- (java/lang/Runtime::availableProcessors)
- .nat)]
- (for {@.old <jvm>
- @.jvm <jvm>}
- ## Default
- 1)))
-
-(with_expansions [<jvm> (as_is (def: runner
- java/util/concurrent/ScheduledThreadPoolExecutor
- (java/util/concurrent/ScheduledThreadPoolExecutor::new (.int ..parallelism))))]
- (for {@.old <jvm>
- @.jvm <jvm>
- @.js (as_is)
- @.python (as_is)}
-
- ## Default
- (def: runner
- (Atom (List Thread))
- (atom.atom (list)))))
-
-(def: (execute! action)
- (-> (IO Any) Any)
- (case (try (io.run action))
- (#try.Failure error)
- (exec
- ("lux io log" ($_ "lux text concat"
- "ERROR DURING THREAD EXECUTION:" text.new_line
- error))
- [])
-
- (#try.Success _)
- []))
-
-(def: #export (schedule milli_seconds action)
- (-> Nat (IO Any) (IO Any))
- (with_expansions [<jvm> (as_is (let [runnable (ffi.object [] [java/lang/Runnable]
- []
- (java/lang/Runnable [] (run self) void
- (..execute! action)))]
- (case milli_seconds
- 0 (java/util/concurrent/Executor::execute runnable runner)
- _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS
- runner))))]
- (for {@.old <jvm>
- @.jvm <jvm>
-
- @.js
- (..setTimeout [(ffi.closure [] (..execute! action))
- (n.frac milli_seconds)])
-
- @.python
- (do io.monad
- [_ (|> (ffi.lambda [] (..execute! action))
- [(|> milli_seconds n.frac (f./ +1,000.0))]
- threading/Timer::new
- (threading/Timer::start []))]
- (wrap []))}
-
- ## Default
- (do {! io.monad}
- [now (\ ! map (|>> instant.to_millis .nat) instant.now)
- _ (atom.update (|>> (#.Cons {#creation now
- #delay milli_seconds
- #action action}))
- ..runner)]
- (wrap [])))))
-
-(for {@.old (as_is)
- @.jvm (as_is)
- @.js (as_is)
- @.python (as_is)}
-
- ## Default
- (as_is (exception: #export cannot_continue_running_threads)
-
- (def: #export run!
- (IO Any)
- (loop [_ []]
- (do {! io.monad}
- [threads (atom.read ..runner)]
- (case threads
- ## And... we're done!
- #.Nil
- (wrap [])
-
- _
- (do !
- [now (\ ! map (|>> instant.to_millis .nat) instant.now)
- #let [[ready pending] (list.partition (function (_ thread)
- (|> (get@ #creation thread)
- (n.+ (get@ #delay thread))
- (n.<= now)))
- threads)]
- swapped? (atom.compare_and_swap threads pending ..runner)]
- (if swapped?
- (do !
- [_ (monad.map ! (|>> (get@ #action) ..execute! io.io) ready)]
- (recur []))
- (error! (exception.construct ..cannot_continue_running_threads []))))
- ))))
- ))