diff options
Diffstat (limited to '')
| -rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 122 | 
1 files changed, 61 insertions, 61 deletions
| diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 4aaee3580..f7c7664f1 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -1,4 +1,4 @@ -(;module: +(.module:    lux    (lux (control [functor #+ Functor]                  [applicative #+ Applicative] @@ -24,7 +24,7 @@     #observers (Dict Text (-> a (IO Unit)))})  (type: #export (Var a) -  {#;doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} +  {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}    (Atom (Var-State a)))  (type: (Tx-Frame a) @@ -36,23 +36,23 @@    (List (Ex [a] (Tx-Frame a))))  (type: #export (STM a) -  {#;doc "A computation which updates a transaction and produces a value."} +  {#.doc "A computation which updates a transaction and produces a value."}    (-> Tx [Tx a]))  (def: #export (var value) -  {#;doc "Creates a new STM var, with a default value."} +  {#.doc "Creates a new STM var, with a default value."}    (All [a] (-> a (Var a))) -  (atom;atom {#value value -              #observers (dict;new text;Hash<Text>)})) +  (atom.atom {#value value +              #observers (dict.new text.Hash<Text>)}))  (def: raw-read    (All [a] (-> (Var a) a)) -  (|>> atom;read io;run (get@ #value))) +  (|>> atom.read io.run (get@ #value)))  (def: (find-var-value var tx)    (All [a] (-> (Var a) Tx (Maybe a)))    (|> tx -      (list;find (function [[_var _original _current]] +      (list.find (function [[_var _original _current]]                     (is (:! (Var Unit) var)                         (:! (Var Unit) _var))))        (:: Monad<Maybe> map (function [[_var _original _current]] @@ -63,35 +63,35 @@    (All [a] (-> (Var a) (STM a)))    (function [tx]      (case (find-var-value var tx) -      (#;Some value) +      (#.Some value)        [tx value] -      #;None +      #.None        (let [value (raw-read var)] -        [(#;Cons [var value value] tx) +        [(#.Cons [var value value] tx)           value]))))  (def: #export (read! var) -  {#;doc "Reads var immediately, without going through a transaction."} +  {#.doc "Reads var immediately, without going through a transaction."}    (All [a] (-> (Var a) (IO a)))    (|> var -      atom;read +      atom.read        (:: Functor<IO> map (get@ #value))))  (def: (update-tx-value var value tx)    (All [a] (-> (Var a) a Tx Tx))    (case tx -    #;Nil -    #;Nil +    #.Nil +    #.Nil -    (#;Cons [_var _original _current] tx') +    (#.Cons [_var _original _current] tx')      (if (is (:! (Var ($ +0)) var)              (:! (Var ($ +0)) _var)) -      (#;Cons [(:! (Var ($ +0)) _var) +      (#.Cons [(:! (Var ($ +0)) _var)                 (:! ($ +0) _original)                 (:! ($ +0) value)]                tx') -      (#;Cons [_var _original _current] +      (#.Cons [_var _original _current]                (update-tx-value var value tx')))      )) @@ -99,59 +99,59 @@    (All [a] (-> a (Var a) (STM Unit)))    (function [tx]      (case (find-var-value var tx) -      (#;Some _) +      (#.Some _)        [(update-tx-value var value tx)         []] -      #;None -      [(#;Cons [var (raw-read var) value] tx) +      #.None +      [(#.Cons [var (raw-read var) value] tx)         []])))  (def: #export (write! new-value var) -  {#;doc "Writes value to var immediately, without going through a transaction."} +  {#.doc "Writes value to var immediately, without going through a transaction."}    (All [a] (-> a (Var a) (IO Unit)))    (do Monad<IO> -    [old (atom;read var) +    [old (atom.read var)       #let [old-value (get@ #value old)             new (set@ #value new-value old)] -     succeeded? (atom;compare-and-swap old new var)] +     succeeded? (atom.compare-and-swap old new var)]      (if succeeded?        (do @          [_ (|> old                 (get@ #observers) -               dict;values -               (monad;map @ (function [f] (f new-value))))] +               dict.values +               (monad.map @ (function [f] (f new-value))))]          (wrap []))        (write! new-value var))))  (def: #export (follow target) -  {#;doc "Creates a channel that will receive all changes to the value of the given var."} -  (All [a] (-> (Var a) (IO (frp;Channel a)))) -  (let [head (frp;channel ($ +0)) +  {#.doc "Creates a channel that will receive all changes to the value of the given var."} +  (All [a] (-> (Var a) (IO (frp.Channel a)))) +  (let [head (frp.channel ($ +0))          channel-var (var head)          observer (function [label value] -                   (case (io;run (|> channel-var raw-read (frp;write value))) -                     #;None +                   (case (io.run (|> channel-var raw-read (frp.write value))) +                     #.None                       ## By closing the output Channel, the                       ## observer becomes obsolete. -                     (atom;update (function [[value observers]] -                                    [value (dict;remove label observers)]) +                     (atom.update (function [[value observers]] +                                    [value (dict.remove label observers)])                                    target) -                     (#;Some tail') +                     (#.Some tail')                       (write! tail' channel-var)))]      (do Monad<IO> -      [_ (atom;update (function [[value observers]] +      [_ (atom.update (function [[value observers]]                          (let [label (nat/encode (list/fold (function [key base]                                                               (case (nat/decode key) -                                                               (#;Left _) +                                                               (#.Left _)                                                                 base -                                                               (#;Right key-num) +                                                               (#.Right key-num)                                                                 (n/max key-num base)))                                                             +0 -                                                           (dict;keys observers)))] -                          [value (dict;put label (observer label) observers)])) +                                                           (dict.keys observers)))] +                          [value (dict.put label (observer label) observers)]))                        target)]        (wrap head)))) @@ -182,19 +182,19 @@          (ma tx')))))  (def: #export (update! f var) -  {#;doc "Will update a Var's value, and return a tuple with the old and the new values."} +  {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}    (All [a] (-> (-> a a) (Var a) (IO [a a])))    (io (loop [_ []] -        (let [(^@ state [value observers]) (io;run (atom;read var)) +        (let [(^@ state [value observers]) (io.run (atom.read var))                value' (f value)] -          (if (io;run (atom;compare-and-swap state +          (if (io.run (atom.compare-and-swap state                                               [value' observers]                                               var))              [value value']              (recur []))))))  (def: #export (update f var) -  {#;doc "Will update a Var's value, and return a tuple with the old and the new values."} +  {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}    (All [a] (-> (-> a a) (Var a) (STM [a a])))    (do Monad<STM>      [a (read var) @@ -204,7 +204,7 @@  (def: (can-commit? tx)    (-> Tx Bool) -  (list;every? (function [[_var _original _current]] +  (list.every? (function [[_var _original _current]]                   (is _original (raw-read _var)))                 tx)) @@ -212,12 +212,12 @@    (-> (Ex [a] (Tx-Frame a)) Unit)    (if (is _original _current)      [] -    (io;run (write! _current _var)))) +    (io.run (write! _current _var))))  (def: fresh-tx Tx (list))  (def: pending-commits -  (Var (Ex [a] [(STM a) (P;Promise a)])) +  (Var (Ex [a] [(STM a) (P.Promise a)]))    (var (:!! [])))  (def: commit-processor-flag @@ -225,46 +225,46 @@    (atom false))  (def: (process-commit [stm-proc output]) -  (-> [(STM Unit) (P;Promise Unit)] Top) +  (-> [(STM Unit) (P.Promise Unit)] Top)    (let [[finished-tx value] (stm-proc fresh-tx)]      (if (can-commit? finished-tx)        (exec (list/map commit-var finished-tx) -        (io;run (P;resolve value output))) -      (io;run (write! [stm-proc output] pending-commits))))) +        (io.run (P.resolve value output))) +      (io.run (write! [stm-proc output] pending-commits)))))  (def: init-processor!    (IO Unit)    (do Monad<IO> -    [flag (atom;read commit-processor-flag)] +    [flag (atom.read commit-processor-flag)]      (if flag        (wrap [])        (do @ -        [was-first? (atom;compare-and-swap flag true commit-processor-flag)] +        [was-first? (atom.compare-and-swap flag true commit-processor-flag)]          (if was-first?            (do Monad<IO>              [inputs (follow pending-commits)]              (exec (|> inputs -                      (:! (frp;Channel [(STM Unit) (P;Promise Unit)])) -                      (P;await (function recur [?inputs] +                      (:! (frp.Channel [(STM Unit) (P.Promise Unit)])) +                      (P.await (function recur [?inputs]                                   (io (case ?inputs -                                       #;Nil +                                       #.Nil                                         [] -                                       (#;Cons head tail) +                                       (#.Cons head tail)                                         (exec (process-commit head) -                                         (P;await recur tail))))))) +                                         (P.await recur tail)))))))                (wrap [])))            (wrap [])))        )))  (def: #export (commit stm-proc) -  {#;doc "Commits a transaction and returns its result (asynchronously). +  {#.doc "Commits a transaction and returns its result (asynchronously).            Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first.            For this reason, it's important to note that transactions must be free from side-effects, such as I/O."} -  (All [a] (-> (STM a) (P;Promise a))) -  (let [output (P;promise ($ +0))] -    (exec (io;run init-processor!) -      (io;run (write! [stm-proc output] pending-commits)) +  (All [a] (-> (STM a) (P.Promise a))) +  (let [output (P.promise ($ +0))] +    (exec (io.run init-processor!) +      (io.run (write! [stm-proc output] pending-commits))        output))) | 
