aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux35
-rw-r--r--stdlib/source/lux/control/concurrency/atom.lux2
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux42
-rw-r--r--stdlib/source/lux/control/concurrency/process.lux2
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux16
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux14
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux32
-rw-r--r--stdlib/source/lux/control/concurrency/task.lux25
8 files changed, 82 insertions, 86 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index 9b20dcfde..3e288ca42 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -3,15 +3,15 @@
[control monad
["p" parser]
["ex" exception (#+ exception:)]]
- ["." io (#- run) ("io/." Monad<IO>)]
+ ["." io ("io/." monad)]
[data
["." product]
["e" error]
[text
format]
[collection
- ["." list ("list/." Monoid<List> Monad<List> Fold<List>)]]]
- ["." macro (#+ with-gensyms Monad<Meta>)
+ ["." list ("list/." monoid monad fold)]]]
+ ["." macro (#+ with-gensyms monad)
["." code]
["s" syntax (#+ syntax: Syntax)]
[syntax
@@ -23,7 +23,7 @@
abstract]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver) ("promise/." Monad<Promise>)]
+ ["." promise (#+ Promise Resolver) ("promise/." monad)]
["." task (#+ Task)]])
(exception: #export poisoned)
@@ -33,7 +33,6 @@
(ex.report ["Actor" actor-name]
["Message" message-name]))
-## [Types]
(with-expansions
[<Message> (as-is (-> s (Actor s) (Task s)))
<Obituary> (as-is [Text s (List <Message>)])
@@ -90,7 +89,7 @@
(promise.promise []))
process (loop [state init
[|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]
- (do promise.Monad<Promise>
+ (do promise.monad
[[head tail] |mailbox|
?state' (handle head state self)]
(case ?state'
@@ -120,7 +119,7 @@
(All [s] (-> (Message s) (Actor s) (IO Bit)))
(if (alive? actor)
(let [entry [message (promise.promise [])]]
- (do Monad<IO>
+ (do io.monad
[|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))]
(loop [[|mailbox| resolve] |mailbox|&resolve]
(case (promise.poll |mailbox|)
@@ -139,7 +138,6 @@
)
)
-## [Values]
(def: (default-handle message state self)
(All [s] (-> (Message s) s (Actor s) (Task s)))
(message state self))
@@ -161,7 +159,6 @@
(task.throw poisoned []))
actor))
-## [Syntax]
(do-template [<with> <resolve> <tag> <desc>]
[(def: #export (<with> name)
(-> Name cs.Annotations cs.Annotations)
@@ -170,7 +167,7 @@
(def: #export (<resolve> name)
(-> Name (Meta Name))
- (do Monad<Meta>
+ (do io.monad
[[_ annotations _] (macro.find-def name)]
(case (macro.get-tag-ann (name-of <tag>) annotations)
(#.Some actor-name)
@@ -186,7 +183,7 @@
(def: actor-decl^
(Syntax [Text (List Text)])
(p.either (s.form (p.and s.local-identifier (p.some s.local-identifier)))
- (p.and s.local-identifier (:: p.Monad<Parser> wrap (list)))))
+ (p.and s.local-identifier (:: p.monad wrap (list)))))
(do-template [<name> <desc>]
[(def: #export <name>
@@ -227,7 +224,7 @@
Nat
((stop cause state)
- (:: promise.Monad<Promise> wrap
+ (:: promise.monad wrap
(log! (if (ex.match? ..poisoned cause)
(format "Counter was poisoned: " (%n state))
cause)))))
@@ -236,7 +233,7 @@
(List a)
((handle message state self)
- (do task.Monad<Task>
+ (do task.monad
[#let [_ (log! "BEFORE")]
output (message state self)
#let [_ (log! "AFTER")]]
@@ -268,7 +265,7 @@
(~ (code.local-identifier messageN))
(~ (code.local-identifier stateN))
(~ (code.local-identifier selfN)))
- (do task.Monad<Task>
+ (do task.monad
[]
(~ bodyC))))))
#..end (~ (case ?stop
@@ -279,7 +276,7 @@
(` (function ((~ g!_)
(~ (code.local-identifier causeN))
(~ (code.local-identifier stateN)))
- (do promise.Monad<Promise>
+ (do promise.monad
[]
(~ bodyC))))))}))
(` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init))
@@ -309,7 +306,7 @@
(def: reference^
(s.Syntax [Name (List Text)])
(p.either (s.form (p.and s.identifier (p.some s.local-identifier)))
- (p.and s.identifier (:: p.Monad<Parser> wrap (list)))))
+ (p.and s.identifier (:: p.monad wrap (list)))))
(syntax: #export (message:
{export csr.export}
@@ -367,12 +364,12 @@
(let [[(~ g!task) (~ g!resolve)] (: [(task.Task (~ g!outputT))
(task.Resolver (~ g!outputT))]
(task.task []))]
- (io.run (do io.Monad<IO>
+ (io.run (do io.monad
[(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self))
- (do promise.Monad<Promise>
+ (do promise.monad
[(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs))
(~ g!outputT)])
- (do task.Monad<Task>
+ (do task.monad
[]
(~ body)))]
(case (~ g!return)
diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux
index b1692b6e3..61152d7b6 100644
--- a/stdlib/source/lux/control/concurrency/atom.lux
+++ b/stdlib/source/lux/control/concurrency/atom.lux
@@ -46,7 +46,7 @@
"The retries will be done with the new values of the atom, as they show up.")}
(All [a] (-> (-> a a) (Atom a) (IO a)))
(loop [_ []]
- (do io.Monad<IO>
+ (do io.monad
[old (read atom)
#let [new (f old)]
swapped? (compare-and-swap old new atom)]
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index 18b385a65..84def78d1 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -8,14 +8,14 @@
[equivalence (#+ Equivalence)]]
["." io (#+ IO)]
[data
- [maybe ("maybe/." Functor<Maybe>)]
+ [maybe ("maybe/." functor)]
[collection
- [list ("list/." Monoid<List>)]]]
+ [list ("list/." monoid)]]]
[type (#+ :share)
abstract]]
[//
["." atom (#+ Atom)]
- ["." promise (#+ Promise) ("promise/." Functor<Promise>)]])
+ ["." promise (#+ Promise) ("promise/." functor)]])
(type: #export (Channel a)
{#.doc "An asynchronous channel to distribute values."}
@@ -35,7 +35,7 @@
(structure
(def: close
(loop [_ []]
- (do io.Monad<IO>
+ (do io.monad
[current (atom.read source)
stopped? (current #.None)]
(if stopped?
@@ -52,7 +52,7 @@
(def: (feed value)
(loop [_ []]
- (do io.Monad<IO>
+ (do io.monad
[current (atom.read source)
#let [[next resolve-next] (:share [a]
{(promise.Resolver (Maybe [a (Channel a)]))
@@ -82,7 +82,7 @@
(All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
(io.io (exec (: (Promise Any)
(loop [channel channel]
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
(#.Some [head tail])
@@ -93,18 +93,18 @@
(wrap [])))))
[])))
-(structure: #export _ (Functor Channel)
+(structure: #export functor (Functor Channel)
(def: (map f)
(promise/map
(maybe/map
(function (_ [head tail])
[(f head) (map f tail)])))))
-(structure: #export _ (Apply Channel)
- (def: functor Functor<Channel>)
+(structure: #export apply (Apply Channel)
+ (def: &functor ..functor)
(def: (apply ff fa)
- (do promise.Monad<Promise>
+ (do promise.monad
[cons-f ff
cons-a fa]
(case [cons-f cons-a]
@@ -114,8 +114,8 @@
_
(wrap #.None)))))
-(structure: #export _ (Monad Channel)
- (def: functor Functor<Channel>)
+(structure: #export monad (Monad Channel)
+ (def: &functor ..functor)
(def: (wrap a)
(promise.resolved (#.Some [a (promise.resolved #.None)])))
@@ -128,7 +128,7 @@
(def: #export (filter pass? channel)
(All [a] (-> (Predicate a) (Channel a) (Channel a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
(#.Some [head tail])
@@ -151,7 +151,7 @@
(All [a b]
(-> (-> b a (Promise a)) a (Channel b)
(Promise a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
#.None
@@ -167,7 +167,7 @@
(All [a b]
(-> (-> b a (Promise a)) a (Channel b)
(Channel a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
#.None
@@ -182,7 +182,7 @@
(All [a] (-> Nat (IO a) (Channel a)))
(let [[output source] (channel [])]
(exec (io.run (loop [_ []]
- (do io.Monad<IO>
+ (do io.monad
[value action
_ (:: source feed value)]
(promise.await recur (promise.wait milli-seconds)))))
@@ -194,7 +194,7 @@
(def: #export (iterate f init)
(All [a] (-> (-> a (Promise (Maybe a))) a (Channel a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[?next (f init)]
(case ?next
(#.Some next)
@@ -205,7 +205,7 @@
(def: (distinct' equivalence previous channel)
(All [a] (-> (Equivalence a) a (Channel a) (Channel a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
(#.Some [head tail])
@@ -218,7 +218,7 @@
(def: #export (distinct equivalence channel)
(All [a] (-> (Equivalence a) (Channel a) (Channel a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
(#.Some [head tail])
@@ -230,7 +230,7 @@
(def: #export (consume channel)
{#.doc "Reads the entirety of a channel's content and returns it as a list."}
(All [a] (-> (Channel a) (Promise (List a))))
- (do promise.Monad<Promise>
+ (do promise.monad
[cons channel]
(case cons
(#.Some [head tail])
@@ -247,6 +247,6 @@
(promise.resolved #.None)
(#.Cons head tail)
- (promise.resolved (#.Some [head (do promise.Monad<Promise>
+ (promise.resolved (#.Some [head (do promise.monad
[_ (promise.wait milli-seconds)]
(sequential milli-seconds tail))]))))
diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux
index a67734747..d1d2ac245 100644
--- a/stdlib/source/lux/control/concurrency/process.lux
+++ b/stdlib/source/lux/control/concurrency/process.lux
@@ -87,7 +87,7 @@
(def: #export run!
(IO Any)
(loop [_ []]
- (do io.Monad<IO>
+ (do io.monad
[processes (atom.read runner)]
(case processes
## And... we're done!
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
index 33a04190b..244951139 100644
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -25,7 +25,7 @@
{#.doc "Sets an promise's value if it has not been done yet."}
(All [a] (-> (Promise a) (Resolver a)))
(function (resolve value)
- (do io.Monad<IO>
+ (do io.monad
[(^@ old [_value _observers]) (atom.read promise)]
(case _value
(#.Some _)
@@ -82,14 +82,14 @@
(#.Some _)
#1))
-(structure: #export _ (Functor Promise)
+(structure: #export functor (Functor Promise)
(def: (map f fa)
(let [[fb resolve] (..promise [])]
(exec (io.run (await (|>> f resolve) fa))
fb))))
-(structure: #export _ (Apply Promise)
- (def: functor Functor<Promise>)
+(structure: #export apply (Apply Promise)
+ (def: &functor ..functor)
(def: (apply ff fa)
(let [[fb resolve] (..promise [])]
@@ -98,8 +98,8 @@
ff))
fb))))
-(structure: #export _ (Monad Promise)
- (def: functor Functor<Promise>)
+(structure: #export monad (Monad Promise)
+ (def: &functor ..functor)
(def: wrap ..resolved)
@@ -113,7 +113,7 @@
(def: #export (and left right)
{#.doc "Sequencing combinator."}
(All [a b] (-> (Promise a) (Promise b) (Promise [a b])))
- (do Monad<Promise>
+ (do ..monad
[a left
b right]
(wrap [a b])))
@@ -148,7 +148,7 @@
"Returns a Promise that will eventually host its result.")}
(All [a] (-> Nat (IO a) (Promise a)))
(let [[!out resolve] (..promise [])]
- (exec (|> (do io.Monad<IO>
+ (exec (|> (do io.monad
[value computation]
(resolve value))
(process.schedule millis-delay)
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
index 46762ecf3..ddc73b300 100644
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -29,7 +29,7 @@
(io.run
(loop [signal (: (Promise Any)
(promise.promise #.None))]
- (do io.Monad<IO>
+ (do io.monad
[state (atom.read semaphore)
#let [[ready? state'] (: [Bit State]
(case (get@ #open-positions state)
@@ -50,7 +50,7 @@
(let [semaphore (:representation semaphore)]
(promise.future
(loop [_ []]
- (do io.Monad<IO>
+ (do io.monad
[state (atom.read semaphore)
#let [[?signal state'] (: [(Maybe (Promise Any)) State]
(case (get@ #waiting-list state)
@@ -91,7 +91,7 @@
(def: #export (synchronize mutex procedure)
(All [a] (-> Mutex (IO (Promise a)) (Promise a)))
- (do promise.Monad<Promise>
+ (do promise.monad
[_ (acquire mutex)
output (io.run procedure)
_ (release mutex)]
@@ -120,15 +120,15 @@
(-> Nat Semaphore (Promise Any))
(loop [step 0]
(if (n/< times step)
- (do promise.Monad<Promise>
+ (do promise.monad
[_ (signal turnstile)]
(recur (inc step)))
- (:: promise.Monad<Promise> wrap []))))
+ (:: promise.monad wrap []))))
(do-template [<phase> <update> <goal> <turnstile>]
[(def: (<phase> (^:representation barrier))
(-> Barrier (Promise Any))
- (do promise.Monad<Promise>
+ (do promise.monad
[#let [limit (refinement.un-refine (get@ #limit barrier))
goal <goal>
count (io.run (atom.update <update> (get@ #count barrier)))]
@@ -143,7 +143,7 @@
(def: #export (block barrier)
(-> Barrier (Promise Any))
- (do promise.Monad<Promise>
+ (do promise.monad
[_ (start barrier)]
(end barrier)))
)
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 34122abd4..5bb537025 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -15,7 +15,7 @@
[//
["." atom (#+ Atom atom)]
["." promise (#+ Promise Resolver)]
- ["." frp ("frp/." Functor<Channel>)]])
+ ["." frp ("frp/." functor)]])
(type: #export (Observer a)
(-> a (IO Any)))
@@ -39,11 +39,11 @@
(All [a] (-> (Var a) (IO a)))
(|> var
atom.read
- (:: io.Functor<IO> map product.left)))
+ (:: io.functor map product.left)))
(def: (write! new-value (^:representation var))
(All [a] (-> a (Var a) (IO Any)))
- (do io.Monad<IO>
+ (do io.monad
[(^@ old [_value _observers]) (atom.read var)
succeeded? (atom.compare-and-swap old [new-value _observers] var)]
(if succeeded?
@@ -55,7 +55,7 @@
(def: #export (follow target)
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
(All [a] (-> (Var a) (IO (frp.Channel a))))
- (do io.Monad<IO>
+ (do io.monad
[#let [[channel source] (frp.channel [])
target (:representation target)]
_ (atom.update (function (_ [value observers])
@@ -82,8 +82,8 @@
(list.find (function (_ [_var _original _current])
(is? (:coerce (Var Any) var)
(:coerce (Var Any) _var))))
- (:: maybe.Monad<Maybe> map (function (_ [_var _original _current])
- _current))
+ (:: maybe.monad map (function (_ [_var _original _current])
+ _current))
(:assume)
))
@@ -137,8 +137,8 @@
(let [[tx' a] (fa tx)]
[tx' (f a)]))))
-(structure: #export _ (Apply STM)
- (def: functor Functor<STM>)
+(structure: #export apply (Apply STM)
+ (def: &functor ..functor)
(def: (apply ff fa)
(function (_ tx)
@@ -146,8 +146,8 @@
[tx'' a] (fa tx')]
[tx'' (f a)]))))
-(structure: #export _ (Monad STM)
- (def: functor Functor<STM>)
+(structure: #export monad (Monad STM)
+ (def: &functor ..functor)
(def: (wrap a)
(function (_ tx) [tx a]))
@@ -160,7 +160,7 @@
(def: #export (update f var)
{#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (STM [a a])))
- (do Monad<STM>
+ (do ..monad
[a (read var)
#let [a' (f a)]
_ (write a' var)]
@@ -198,12 +198,12 @@
(def: (issue-commit commit)
(All [a] (-> (Commit a) (IO Any)))
(let [entry [commit (promise.promise [])]]
- (do io.Monad<IO>
+ (do io.monad
[|commits|&resolve (atom.read pending-commits)]
(loop [[|commits| resolve] |commits|&resolve]
(case (promise.poll |commits|)
#.None
- (do io.Monad<IO>
+ (do io.monad
[resolved? (resolve entry)]
(if resolved?
(atom.write (product.right entry) pending-commits)
@@ -217,14 +217,14 @@
(let [[stm-proc output resolve] commit
[finished-tx value] (stm-proc fresh-tx)]
(if (can-commit? finished-tx)
- (do io.Monad<IO>
+ (do io.monad
[_ (monad.map @ commit-var! finished-tx)]
(resolve value))
(issue-commit commit))))
(def: init-processor!
(IO Any)
- (do io.Monad<IO>
+ (do io.monad
[flag (atom.read commit-processor-flag)]
(if flag
(wrap [])
@@ -247,7 +247,7 @@
"For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
(All [a] (-> (STM a) (Promise a)))
(let [[output resolver] (promise.promise [])]
- (exec (io.run (do io.Monad<IO>
+ (exec (io.run (do io.monad
[_ init-processor!]
(issue-commit [stm-proc output resolver])))
output)))
diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux
index a5bf17819..1f16da8ca 100644
--- a/stdlib/source/lux/control/concurrency/task.lux
+++ b/stdlib/source/lux/control/concurrency/task.lux
@@ -29,16 +29,15 @@
(def: #export (throw exception message)
(All [e a] (-> (Exception e) e (Task a)))
- (:: promise.Monad<Promise> wrap
- (ex.throw exception message)))
+ (:: promise.monad wrap (ex.throw exception message)))
(def: #export (try computation)
(All [a] (-> (Task a) (Task (Error a))))
- (:: promise.Functor<Promise> map (|>> #error.Success) computation))
+ (:: promise.functor map (|>> #error.Success) computation))
-(structure: #export _ (Functor Task)
+(structure: #export functor (Functor Task)
(def: (map f fa)
- (:: promise.Functor<Promise> map
+ (:: promise.functor map
(function (_ fa')
(case fa'
(#error.Failure error)
@@ -48,25 +47,25 @@
(#error.Success (f a))))
fa)))
-(structure: #export _ (Apply Task)
- (def: functor Functor<Task>)
+(structure: #export apply (Apply Task)
+ (def: &functor ..functor)
(def: (apply ff fa)
- (do promise.Monad<Promise>
+ (do promise.monad
[ff' ff
fa' fa]
- (wrap (do error.Monad<Error>
+ (wrap (do error.monad
[f ff'
a fa']
(wrap (f a)))))))
-(structure: #export _ (Monad Task)
- (def: functor Functor<Task>)
+(structure: #export monad (Monad Task)
+ (def: &functor ..functor)
(def: wrap return)
(def: (join mma)
- (do promise.Monad<Promise>
+ (do promise.monad
[mma' mma]
(case mma'
(#error.Failure error)
@@ -81,4 +80,4 @@
(def: #export (from-promise promise)
(All [a] (-> (Promise a) (Task a)))
- (:: promise.Functor<Promise> map (|>> #error.Success) promise))
+ (:: promise.functor map (|>> #error.Success) promise))