diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
| -rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 178 | ||||
| -rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 14 | ||||
| -rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 36 | ||||
| -rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 22 | ||||
| -rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 76 | ||||
| -rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 78 | ||||
| -rw-r--r-- | stdlib/source/lux/control/concurrency/thread.lux | 40 | 
7 files changed, 222 insertions, 222 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index 66ea24cd8..6355a43b7 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -25,7 +25,7 @@        ["csr" reader]        ["csw" writer]        ["|.|" export]]]] -   ["." meta (#+ with-gensyms monad) +   ["." meta (#+ with_gensyms monad)      ["." annotation]]     [type (#+ :share)      ["." abstract (#+ abstract: :representation :abstraction)]]] @@ -37,10 +37,10 @@  (exception: #export poisoned)  (exception: #export dead) -(with-expansions -  [<Mail> (as-is (-> s (Actor s) (Promise (Try s)))) -   <Obituary> (as-is [Text s (List <Mail>)]) -   <Mailbox> (as-is (Rec Mailbox +(with_expansions +  [<Mail> (as_is (-> s (Actor s) (Promise (Try s)))) +   <Obituary> (as_is [Text s (List <Mail>)]) +   <Mailbox> (as_is (Rec Mailbox                        [(Promise [<Mail> Mailbox])                         (Resolver [<Mail> Mailbox])]))] @@ -73,29 +73,29 @@      (type: #export (Behavior o s)        {#.doc "An actor's behavior when mail is received and when a fatal error occurs."} -      {#on-init (-> o s) -       #on-mail (-> (Mail s) s (Actor s) (Promise (Try s))) -       #on-stop (-> Text s (Promise Any))}) +      {#on_init (-> o s) +       #on_mail (-> (Mail s) s (Actor s) (Promise (Try s))) +       #on_stop (-> Text s (Promise Any))})      (def: #export (spawn! behavior init)        {#.doc "Given a behavior and initial state, spawns an actor and returns it."}        (All [o s] (-> (Behavior o s) o (IO (Actor s)))) -      (io (let [[on-init on-mail on-stop] behavior +      (io (let [[on_init on_mail on_stop] behavior                  self (:share [o s]                               {(Behavior o s)                                behavior}                               {(Actor s)                                (:abstraction {#obituary (promise.promise [])                                               #mailbox (atom (promise.promise []))})}) -                process (loop [state (on-init init) +                process (loop [state (on_init init)                                 [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]                            (do {! promise.monad}                              [[head tail] |mailbox| -                             ?state' (on-mail head state self)] +                             ?state' (on_mail head state self)]                              (case ?state'                                (#try.Failure error)                                (do ! -                                [_ (on-stop error state)] +                                [_ (on_stop error state)]                                  (let [[_ resolve] (get@ #obituary (:representation self))]                                    (exec (io.run                                           (do io.monad @@ -195,19 +195,19 @@      )    ) -(def: (default-on-mail mail state self) +(def: (default_on_mail mail state self)    (All [s] (-> (Mail s) s (Actor s) (Promise (Try s))))    (mail state self)) -(def: (default-on-stop cause state) +(def: (default_on_stop cause state)    (All [s] (-> Text s (Promise Any)))    (promise\wrap []))  (def: #export default    (All [s] (Behavior s s)) -  {#on-init function.identity -   #on-mail ..default-on-mail -   #on-stop ..default-on-stop}) +  {#on_init function.identity +   #on_mail ..default_on_mail +   #on_stop ..default_on_stop})  (def: #export (poison! actor)    {#.doc (doc "Kills the actor by sending mail that will kill it upon processing," @@ -217,64 +217,64 @@               (promise.resolved (exception.throw ..poisoned [])))             actor)) -(def: actor-decl^ +(def: actor_decl^    (Parser [Text (List Text)]) -  (<>.either (<c>.form (<>.and <c>.local-identifier (<>.some <c>.local-identifier))) -             (<>.and <c>.local-identifier (\ <>.monad wrap (list))))) +  (<>.either (<c>.form (<>.and <c>.local_identifier (<>.some <c>.local_identifier))) +             (<>.and <c>.local_identifier (\ <>.monad wrap (list))))) -(type: On-MailC +(type: On_MailC    [[Text Text Text] Code]) -(type: On-StopC +(type: On_StopC    [[Text Text] Code])  (type: BehaviorC -  [(Maybe On-MailC) (Maybe On-StopC) (List Code)]) +  [(Maybe On_MailC) (Maybe On_StopC) (List Code)])  (def: argument    (Parser Text) -  <c>.local-identifier) +  <c>.local_identifier)  (def: behavior^    (Parser BehaviorC) -  (let [on-mail-args ($_ <>.and ..argument ..argument ..argument) -        on-stop-args ($_ <>.and ..argument ..argument)] +  (let [on_mail_args ($_ <>.and ..argument ..argument ..argument) +        on_stop_args ($_ <>.and ..argument ..argument)]      ($_ <>.and -        (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on-mail)) on-mail-args)) +        (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on_mail)) on_mail_args))                                      <c>.any))) -        (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on-stop)) on-stop-args)) +        (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on_stop)) on_stop_args))                                      <c>.any)))          (<>.some <c>.any)))) -(def: (on-mail g!_ ?on-mail) -  (-> Code (Maybe On-MailC) Code) -  (case ?on-mail +(def: (on_mail g!_ ?on_mail) +  (-> Code (Maybe On_MailC) Code) +  (case ?on_mail      #.None -    (` (~! ..default-on-mail)) +    (` (~! ..default_on_mail))      (#.Some [[mailN stateN selfN] bodyC])      (` (function ((~ g!_) -                  (~ (code.local-identifier mailN)) -                  (~ (code.local-identifier stateN)) -                  (~ (code.local-identifier selfN))) +                  (~ (code.local_identifier mailN)) +                  (~ (code.local_identifier stateN)) +                  (~ (code.local_identifier selfN)))           (~ bodyC))))) -(def: (on-stop g!_ ?on-stop) -  (-> Code (Maybe On-StopC) Code) -  (case ?on-stop +(def: (on_stop g!_ ?on_stop) +  (-> Code (Maybe On_StopC) Code) +  (case ?on_stop      #.None -    (` (~! ..default-on-stop)) +    (` (~! ..default_on_stop))      (#.Some [[causeN stateN] bodyC])      (` (function ((~ g!_) -                  (~ (code.local-identifier causeN)) -                  (~ (code.local-identifier stateN))) +                  (~ (code.local_identifier causeN)) +                  (~ (code.local_identifier stateN)))           (~ bodyC))))) -(with-expansions [<examples> (as-is (actor: #export (Stack a) +(with_expansions [<examples> (as_is (actor: #export (Stack a)                                        (List a) -                                      ((on-mail mail state self) +                                      ((on_mail mail state self)                                         (do (try.with promise.monad)                                           [#let [_ (log! "BEFORE")]                                            output (mail state self) @@ -288,7 +288,7 @@                                      (actor: #export Counter                                        Nat -                                      ((on-stop cause state) +                                      ((on_stop cause state)                                         (\ promise.monad wrap                                            (log! (if (exception.match? ..poisoned cause)                                                    (format "Counter was poisoned: " (%.nat state)) @@ -302,45 +302,45 @@                                          (promise.resolved (#try.Success [state state])))))]    (syntax: #export (actor:                       {export |export|.parser} -                     {[name vars] actor-decl^} -                     {annotations (<>.default cs.empty-annotations csr.annotations)} -                     state-type -                     {[?on-mail ?on-stop messages] behavior^}) +                     {[name vars] actor_decl^} +                     {annotations (<>.default cs.empty_annotations csr.annotations)} +                     state_type +                     {[?on_mail ?on_stop messages] behavior^})      {#.doc (doc "Defines an actor, with its behavior and internal state." -                "Messages for the actor must be defined after the on-mail and on-stop handlers." +                "Messages for the actor must be defined after the on_mail and on_stop handlers."                  <examples>)} -    (with-gensyms [g!_] +    (with_gensyms [g!_]        (do meta.monad -        [g!type (meta.gensym (format name "-abstract-type")) -         #let [g!actor (code.local-identifier name) -               g!vars (list\map code.local-identifier vars)]] +        [g!type (meta.gensym (format name "_abstract_type")) +         #let [g!actor (code.local_identifier name) +               g!vars (list\map code.local_identifier vars)]]          (wrap (list (` ((~! abstract:) (~+ (|export|.write export)) ((~ g!type) (~+ g!vars)) -                        (~ state-type) +                        (~ state_type)                          (def: (~+ (|export|.write export)) (~ g!actor)                            (All [(~+ g!vars)] -                            (..Behavior (~ state-type) ((~ g!type) (~+ g!vars)))) -                          {#..on-init (|>> ((~! abstract.:abstraction) (~ g!type))) -                           #..on-mail (~ (..on-mail g!_ ?on-mail)) -                           #..on-stop (~ (..on-stop g!_ ?on-stop))}) +                            (..Behavior (~ state_type) ((~ g!type) (~+ g!vars)))) +                          {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type))) +                           #..on_mail (~ (..on_mail g!_ ?on_mail)) +                           #..on_stop (~ (..on_stop g!_ ?on_stop))})                          (~+ messages)))))))) -  (syntax: #export (actor {[state-type init] (<c>.record (<>.and <c>.any <c>.any))} -                          {[?on-mail ?on-stop messages] behavior^}) -    (with-gensyms [g!_] -      (wrap (list (` (: ((~! io.IO) (..Actor (~ state-type))) -                        (..spawn! (: (..Behavior (~ state-type) (~ state-type)) -                                     {#..on-init (|>>) -                                      #..on-mail (~ (..on-mail g!_ ?on-mail)) -                                      #..on-stop (~ (..on-stop g!_ ?on-stop))}) -                                  (: (~ state-type) +  (syntax: #export (actor {[state_type init] (<c>.record (<>.and <c>.any <c>.any))} +                          {[?on_mail ?on_stop messages] behavior^}) +    (with_gensyms [g!_] +      (wrap (list (` (: ((~! io.IO) (..Actor (~ state_type))) +                        (..spawn! (: (..Behavior (~ state_type) (~ state_type)) +                                     {#..on_init (|>>) +                                      #..on_mail (~ (..on_mail g!_ ?on_mail)) +                                      #..on_stop (~ (..on_stop g!_ ?on_stop))}) +                                  (: (~ state_type)                                       (~ init)))))))))    (type: Signature      {#vars (List Text)       #name Text -     #inputs (List cs.Typed-Input) +     #inputs (List cs.Typed_Input)       #state Text       #self Text       #output Code}) @@ -348,22 +348,22 @@    (def: signature^      (Parser Signature)      (<c>.form ($_ <>.and -                  (<>.default (list) (<c>.tuple (<>.some <c>.local-identifier))) -                  <c>.local-identifier -                  (<>.some csr.typed-input) -                  <c>.local-identifier -                  <c>.local-identifier +                  (<>.default (list) (<c>.tuple (<>.some <c>.local_identifier))) +                  <c>.local_identifier +                  (<>.some csr.typed_input) +                  <c>.local_identifier +                  <c>.local_identifier                    <c>.any)))    (def: reference^      (Parser [Name (List Text)]) -    (<>.either (<c>.form (<>.and <c>.identifier (<>.some <c>.local-identifier))) +    (<>.either (<c>.form (<>.and <c>.identifier (<>.some <c>.local_identifier)))                 (<>.and <c>.identifier (\ <>.monad wrap (list)))))    (syntax: #export (message:                       {export |export|.parser}                       {signature signature^} -                     {annotations (<>.default cs.empty-annotations csr.annotations)} +                     {annotations (<>.default cs.empty_annotations csr.annotations)}                       body)      {#.doc (doc "A message can access the actor's state through the state parameter."                  "A message can also access the actor itself through the self parameter." @@ -371,30 +371,30 @@                  "A message may succeed or fail (in case of failure, the actor dies)."                  <examples>)} -    (with-gensyms [g!_ g!return] +    (with_gensyms [g!_ g!return]        (do meta.monad -        [actor-scope abstract.current -         #let [g!type (code.local-identifier (get@ #abstract.name actor-scope)) -               g!message (code.local-identifier (get@ #name signature)) -               g!actor-vars (get@ #abstract.type-vars actor-scope) -               g!all-vars (|> (get@ #vars signature) (list\map code.local-identifier) (list\compose g!actor-vars)) +        [actor_scope abstract.current +         #let [g!type (code.local_identifier (get@ #abstract.name actor_scope)) +               g!message (code.local_identifier (get@ #name signature)) +               g!actor_vars (get@ #abstract.type_vars actor_scope) +               g!all_vars (|> (get@ #vars signature) (list\map code.local_identifier) (list\compose g!actor_vars))                 g!inputsC (|> (get@ #inputs signature) (list\map product.left))                 g!inputsT (|> (get@ #inputs signature) (list\map product.right)) -               g!state (|> signature (get@ #state) code.local-identifier) -               g!self (|> signature (get@ #self) code.local-identifier)]] +               g!state (|> signature (get@ #state) code.local_identifier) +               g!self (|> signature (get@ #self) code.local_identifier)]]          (wrap (list (` (def: (~+ (|export|.write export)) ((~ g!message) (~+ g!inputsC))                           (~ (csw.annotations annotations)) -                         (All [(~+ g!all-vars)] +                         (All [(~+ g!all_vars)]                             (-> (~+ g!inputsT) -                               (..Message (~ (get@ #abstract.abstraction actor-scope)) +                               (..Message (~ (get@ #abstract.abstraction actor_scope))                                            (~ (get@ #output signature)))))                           (function ((~ g!_) (~ g!state) (~ g!self)) -                           (let [(~ g!state) (:coerce (~ (get@ #abstract.representation actor-scope)) +                           (let [(~ g!state) (:coerce (~ (get@ #abstract.representation actor_scope))                                                        (~ g!state))]                               (|> (~ body) -                                 (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor-scope)) +                                 (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))                                                                           (~ (get@ #output signature))]))) -                                 (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor-scope)) +                                 (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))                                                                                 (~ (get@ #output signature))]))))))))                      )))))) @@ -416,6 +416,6 @@                         (if continue?                           (do !                             [outcome (..mail! (action event stop) actor)] -                           (wrap (try.to-maybe outcome))) +                           (wrap (try.to_maybe outcome)))                           (wrap #.None))))                     channel))) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux index 04517cc3e..3920c0214 100644 --- a/stdlib/source/lux/control/concurrency/atom.lux +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -13,15 +13,15 @@     [type      abstract]]) -(with-expansions [<jvm> (as-is (host.import: (java/util/concurrent/atomic/AtomicReference a) +(with_expansions [<jvm> (as_is (host.import: (java/util/concurrent/atomic/AtomicReference a)                                   ["#::."                                    (new [a])                                    (get [] a)                                    (compareAndSet [a a] boolean)]))] -  (for {@.old <jvm> -        @.jvm <jvm>} -        -       (as-is))) +                 (for {@.old <jvm> +                       @.jvm <jvm>} +                       +                      (as_is)))  (abstract: #export (Atom a)    (for {@.old @@ -60,7 +60,7 @@                ("js array read" 0 (:representation atom))                }))) -  (def: #export (compare-and-swap current new atom) +  (def: #export (compare_and_swap current new atom)      {#.doc (doc "Only mutates an atom if you can present its current value."                  "That guarantees that atom was not updated since you last read from it.")}      (All [a] (-> a a (Atom a) (IO Bit))) @@ -87,7 +87,7 @@      (do io.monad        [old (read atom)         #let [new (f old)] -       swapped? (compare-and-swap old new atom)] +       swapped? (compare_and_swap old new atom)]        (if swapped?          (wrap new)          (recur []))))) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index aea0b082a..0c5303f46 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -22,7 +22,7 @@    {#.doc "An asynchronous channel to distribute values."}    (Promise (Maybe [a (Channel a)]))) -(exception: #export channel-is-already-closed) +(exception: #export channel_is_already_closed)  (signature: #export (Sink a)    (: (IO (Try Any)) @@ -49,7 +49,7 @@                 [latter (atom.read sink)]                 (if (is? current latter)                   ## Someone else closed the sink. -                 (wrap (exception.throw ..channel-is-already-closed [])) +                 (wrap (exception.throw ..channel_is_already_closed []))                   ## Someone else fed the sink while I was closing it.                   (recur []))))))) @@ -57,7 +57,7 @@         (loop [_ []]           (do {! io.monad}             [current (atom.read sink) -            #let [[next resolve-next] (:share [a] +            #let [[next resolve_next] (:share [a]                                                {(promise.Resolver (Maybe [a (Channel a)]))                                                 current}                                                {[(Promise (Maybe [a (Channel a)])) @@ -67,14 +67,14 @@             (if fed?               ## I fed the sink.               (do ! -               [_ (atom.compare-and-swap current resolve-next sink)] +               [_ (atom.compare_and_swap current resolve_next sink)]                 (wrap (exception.return [])))               ## Someone else interacted with the sink.               (do !                 [latter (atom.read sink)]                 (if (is? current latter)                   ## Someone else closed the sink while I was feeding it. -                 (wrap (exception.throw ..channel-is-already-closed [])) +                 (wrap (exception.throw ..channel_is_already_closed []))                   ## Someone else fed the sink.                   (recur [])))))))))) @@ -99,11 +99,11 @@    (def: (apply ff fa)      (do promise.monad -      [cons-f ff -       cons-a fa] -      (case [cons-f cons-a] -        [(#.Some [head-f tail-f]) (#.Some [head-a tail-a])] -        (wrap (#.Some [(head-f head-a) (apply tail-f tail-a)])) +      [cons_f ff +       cons_a fa] +      (case [cons_f cons_a] +        [(#.Some [head_f tail_f]) (#.Some [head_a tail_a])] +        (wrap (#.Some [(head_f head_a) (apply tail_f tail_a)]))          _          (wrap #.None))))) @@ -181,7 +181,7 @@        #.None        (wrap #.None)))) -(def: #export (from-promise promise) +(def: #export (from_promise promise)    (All [a] (-> (Promise a) (Channel a)))    (promise\map (function (_ value)                   (#.Some [value ..empty])) @@ -219,7 +219,7 @@          [init' (f head init)]          (wrap (#.Some [init (folds f init' tail)])))))) -(def: #export (poll milli-seconds action) +(def: #export (poll milli_seconds action)    (All [a]      (-> Nat (IO a) [(Channel a) (Sink a)]))    (let [[output sink] (channel [])] @@ -227,12 +227,12 @@                      (do io.monad                        [value action                         _ (\ sink feed value)] -                      (promise.await recur (promise.wait milli-seconds))))) +                      (promise.await recur (promise.wait milli_seconds)))))        [output sink]))) -(def: #export (periodic milli-seconds) +(def: #export (periodic milli_seconds)    (-> Nat [(Channel Any) (Sink Any)]) -  (..poll milli-seconds (io []))) +  (..poll milli_seconds (io [])))  (def: #export (iterate f init)    (All [s o] (-> (-> s (Promise (Maybe [s o]))) s (Channel o))) @@ -282,7 +282,7 @@        #.None        (wrap #.Nil)))) -(def: #export (sequential milli-seconds values) +(def: #export (sequential milli_seconds values)    (All [a] (-> Nat (List a) (Channel a)))    (case values      #.Nil @@ -290,5 +290,5 @@      (#.Cons head tail)      (promise.resolved (#.Some [head (do promise.monad -                                      [_ (promise.wait milli-seconds)] -                                      (sequential milli-seconds tail))])))) +                                      [_ (promise.wait milli_seconds)] +                                      (sequential milli_seconds tail))])))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index e4835b8d8..96822700d 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -38,7 +38,7 @@              #.None              (do !                [#let [new [(#.Some value) #.None]] -               succeeded? (atom.compare-and-swap old new promise)] +               succeeded? (atom.compare_and_swap old new promise)]                (if succeeded?                  (do !                    [_ (monad.map ! (function (_ f) (f value)) @@ -72,7 +72,7 @@          #.None          (let [new [_value (#.Cons f _observers)]] -          (if (io.run (atom.compare-and-swap old new promise)) +          (if (io.run (atom.compare_and_swap old new promise))              (io.io [])              (await f (:abstraction promise)))))))    ) @@ -134,7 +134,7 @@    {#.doc "Heterogeneous alternative combinator."}    (All [a b] (-> (Promise a) (Promise b) (Promise (| a b))))    (let [[a|b resolve] (..promise [])] -    (with-expansions +    (with_expansions        [<sides> (template [<promise> <tag>]                   [(io.run (await (|>> <tag> resolve) <promise>))] @@ -155,7 +155,7 @@                      [right]))            left||right)))) -(def: #export (schedule millis-delay computation) +(def: #export (schedule millis_delay computation)    {#.doc (doc "Runs an I/O computation on its own thread (after a specified delay)."                "Returns a Promise that will eventually host its result.")}    (All [a] (-> Nat (IO a) (Promise a))) @@ -163,7 +163,7 @@      (exec (|> (do io.monad                  [value computation]                  (resolve value)) -              (thread.schedule millis-delay) +              (thread.schedule millis_delay)                io.run)        !out))) @@ -173,17 +173,17 @@    (All [a] (-> (IO a) (Promise a)))    (schedule 0)) -(def: #export (delay time-millis value) +(def: #export (delay time_millis value)    {#.doc "Delivers a value after a certain period has passed."}    (All [a] (-> Nat a (Promise a))) -  (schedule time-millis (io value))) +  (schedule time_millis (io value))) -(def: #export (wait time-millis) +(def: #export (wait time_millis)    {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."}    (-> Nat (Promise Any)) -  (delay time-millis [])) +  (delay time_millis [])) -(def: #export (time-out time-millis promise) +(def: #export (time_out time_millis promise)    {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."}    (All [a] (-> Nat (Promise a) (Promise (Maybe a)))) -  (..or (wait time-millis) promise)) +  (..or (wait time_millis) promise)) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index a405b7b3e..9e6ff9b29 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -22,25 +22,25 @@     ["." promise (#+ Promise Resolver)]])  (type: State -  {#max-positions Nat -   #open-positions Int -   #waiting-list (Queue (Resolver Any))}) +  {#max_positions Nat +   #open_positions Int +   #waiting_list (Queue (Resolver Any))})  (abstract: #export Semaphore    (Atom State)    {#.doc "A tool for controlling access to resources by multiple concurrent processes."} -  (def: most-positions-possible +  (def: most_positions_possible      (.nat (\ i.interval top))) -  (def: #export (semaphore initial-open-positions) +  (def: #export (semaphore initial_open_positions)      (-> Nat Semaphore) -    (let [max-positions (n.min initial-open-positions -                               ..most-positions-possible)] -      (:abstraction (atom.atom {#max-positions max-positions -                                #open-positions (.int max-positions) -                                #waiting-list queue.empty})))) +    (let [max_positions (n.min initial_open_positions +                               ..most_positions_possible)] +      (:abstraction (atom.atom {#max_positions max_positions +                                #open_positions (.int max_positions) +                                #waiting_list queue.empty}))))    (def: #export (wait semaphore)      (Ex [k] (-> Semaphore (Promise Any))) @@ -52,13 +52,13 @@                 (do io.monad                   [state (atom.read semaphore)                    #let [[ready? state'] (: [Bit State] -                                           (if (i.> +0 (get@ #open-positions state)) +                                           (if (i.> +0 (get@ #open_positions state))                                               [true (|> state -                                                       (update@ #open-positions dec))] +                                                       (update@ #open_positions dec))]                                               [false (|> state -                                                        (update@ #open-positions dec) -                                                        (update@ #waiting-list (queue.push sink)))]))] -                  success? (atom.compare-and-swap state state' semaphore)] +                                                        (update@ #open_positions dec) +                                                        (update@ #waiting_list (queue.push sink)))]))] +                  success? (atom.compare_and_swap state state' semaphore)]                   (if success?                     (if ready?                       (sink []) @@ -66,9 +66,9 @@                     (recur [])))))          signal))) -  (exception: #export (semaphore-is-maxed-out {max-positions Nat}) +  (exception: #export (semaphore_is_maxed_out {max_positions Nat})      (exception.report -     ["Max Positions" (%.nat max-positions)])) +     ["Max Positions" (%.nat max_positions)]))    (def: #export (signal semaphore)      (Ex [k] (-> Semaphore (Promise (Try Int)))) @@ -77,29 +77,29 @@         (loop [_ []]           (do {! io.monad}             [state (atom.read semaphore) -            #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit] -                                               (case (queue.peek (get@ #waiting-list state)) +            #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit] +                                               (case (queue.peek (get@ #waiting_list state))                                                   #.None -                                                 (if (n.= (get@ #max-positions state) -                                                          (.nat (get@ #open-positions state))) +                                                 (if (n.= (get@ #max_positions state) +                                                          (.nat (get@ #open_positions state)))                                                     [#.None                                                      state                                                      true]                                                     [#.None -                                                    (update@ #open-positions inc state) +                                                    (update@ #open_positions inc state)                                                      false])                                                   (#.Some head)                                                   [(#.Some head)                                                    (|> state -                                                      (update@ #open-positions inc) -                                                      (update@ #waiting-list queue.pop)) +                                                      (update@ #open_positions inc) +                                                      (update@ #waiting_list queue.pop))                                                    false]))]] -           (if maxed-out? -             (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)])) +           (if maxed_out? +             (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)]))               (do ! -               [#let [open-positions (get@ #open-positions state')] -                success? (atom.compare-and-swap state state' semaphore)] +               [#let [open_positions (get@ #open_positions state')] +                success? (atom.compare_and_swap state state' semaphore)]                 (if success?                   (do !                     [_ (case ?sink @@ -108,7 +108,7 @@                          (#.Some sink)                          (sink []))] -                   (wrap (#try.Success open-positions))) +                   (wrap (#try.Success open_positions)))                   (recur [])))))))))    ) @@ -144,8 +144,8 @@  (abstract: #export Barrier    {#limit Limit     #count (Atom Nat) -   #start-turnstile Semaphore -   #end-turnstile Semaphore} +   #start_turnstile Semaphore +   #end_turnstile Semaphore}    {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} @@ -153,10 +153,10 @@      (-> Limit Barrier)      (:abstraction {#limit limit                     #count (atom.atom 0) -                   #start-turnstile (semaphore 0) -                   #end-turnstile (semaphore 0)})) +                   #start_turnstile (semaphore 0) +                   #end_turnstile (semaphore 0)})) -  (def: (un-block times turnstile) +  (def: (un_block times turnstile)      (-> Nat Semaphore (Promise Any))      (loop [step 0]        (if (n.< times step) @@ -169,16 +169,16 @@      [(def: (<phase> (^:representation barrier))         (-> Barrier (Promise Any))         (do promise.monad -         [#let [limit (refinement.un-refine (get@ #limit barrier)) +         [#let [limit (refinement.un_refine (get@ #limit barrier))                  goal <goal>                  count (io.run (atom.update <update> (get@ #count barrier)))                  reached? (n.= goal count)]]           (if reached? -           (un-block limit (get@ <turnstile> barrier)) +           (un_block limit (get@ <turnstile> barrier))             (wait (get@ <turnstile> barrier)))))] -    [start inc limit #start-turnstile] -    [end   dec 0     #end-turnstile] +    [start inc limit #start_turnstile] +    [end   dec 0     #end_turnstile]      )    (def: #export (block barrier) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 523aa5567..7fd916fdb 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -36,7 +36,7 @@      (All [a] (-> (Var a) a))      (|>> :representation atom.read io.run product.left)) -  (def: (un-follow sink var) +  (def: (un_follow sink var)      (All [a] (-> (Sink a) (Var a) (IO Any)))      (do io.monad        [_ (atom.update (function (_ [value observers]) @@ -44,26 +44,26 @@                        (:representation var))]        (wrap []))) -  (def: (write! new-value var) +  (def: (write! new_value var)      (All [a] (-> a (Var a) (IO Any)))      (do {! io.monad}        [#let [var' (:representation var)] -       (^@ old [old-value observers]) (atom.read var') -       succeeded? (atom.compare-and-swap old [new-value observers] var')] +       (^@ old [old_value observers]) (atom.read var') +       succeeded? (atom.compare_and_swap old [new_value observers] var')]        (if succeeded?          (do !            [_ (monad.map ! (function (_ sink)                              (do ! -                              [result (\ sink feed new-value)] +                              [result (\ sink feed new_value)]                                (case result                                  (#try.Success _)                                  (wrap [])                                  (#try.Failure _) -                                (un-follow sink var)))) +                                (un_follow sink var))))                          observers)]            (wrap [])) -        (write! new-value var)))) +        (write! new_value var))))    (def: #export (follow target)      {#.doc "Creates a channel that will receive all changes to the value of the given var."} @@ -76,19 +76,19 @@        (wrap [channel sink])))    ) -(type: (Tx-Frame a) +(type: (Tx_Frame a)    {#var (Var a)     #original a     #current a})  (type: Tx -  (List (Ex [a] (Tx-Frame a)))) +  (List (Ex [a] (Tx_Frame a))))  (type: #export (STM a)    {#.doc "A computation which updates a transaction and produces a value."}    (-> Tx [Tx a])) -(def: (find-var-value var tx) +(def: (find_var_value var tx)    (All [a] (-> (Var a) Tx (Maybe a)))    (|> tx        (list.find (function (_ [_var _original _current]) @@ -102,7 +102,7 @@  (def: #export (read var)    (All [a] (-> (Var a) (STM a)))    (function (_ tx) -    (case (find-var-value var tx) +    (case (find_var_value var tx)        (#.Some value)        [tx value] @@ -111,7 +111,7 @@          [(#.Cons [var value value] tx)           value])))) -(def: (update-tx-value var value tx) +(def: (update_tx_value var value tx)    (All [a] (-> (Var a) a Tx Tx))    (case tx      #.Nil @@ -127,15 +127,15 @@        (#.Cons {#var _var                 #original _original                 #current _current} -              (update-tx-value var value tx'))))) +              (update_tx_value var value tx')))))  (def: #export (write value var)    {#.doc "Writes value to var."}    (All [a] (-> a (Var a) (STM Any)))    (function (_ tx) -    (case (find-var-value var tx) +    (case (find_var_value var tx)        (#.Some _) -      [(update-tx-value var value tx) +      [(update_tx_value var value tx)         []]        #.None @@ -184,40 +184,40 @@       _ (..write a' var)]      (wrap [a a']))) -(def: (can-commit? tx) +(def: (can_commit? tx)    (-> Tx Bit)    (list.every? (function (_ [_var _original _current])                   (is? _original (..read! _var)))                 tx)) -(def: (commit-var! [_var _original _current]) -  (-> (Ex [a] (Tx-Frame a)) (IO Any)) +(def: (commit_var! [_var _original _current]) +  (-> (Ex [a] (Tx_Frame a)) (IO Any))    (if (is? _original _current)      (io [])      (..write! _current _var))) -(def: fresh-tx Tx (list)) +(def: fresh_tx Tx (list))  (type: (Commit a)    [(STM a)     (Promise a)     (Resolver a)]) -(def: pending-commits +(def: pending_commits    (Atom (Rec Commits            [(Promise [(Ex [a] (Commit a)) Commits])             (Resolver [(Ex [a] (Commit a)) Commits])]))    (atom (promise.promise []))) -(def: commit-processor-flag +(def: commit_processor_flag    (Atom Bit)    (atom #0)) -(def: (issue-commit commit) +(def: (issue_commit commit)    (All [a] (-> (Commit a) (IO Any)))    (let [entry [commit (promise.promise [])]]      (do {! io.monad} -      [|commits|&resolve (atom.read pending-commits)] +      [|commits|&resolve (atom.read pending_commits)]        (loop [[|commits| resolve] |commits|&resolve]          (do !            [|commits| (promise.poll |commits|)] @@ -226,48 +226,48 @@              (do io.monad                [resolved? (resolve entry)]                (if resolved? -                (atom.write (product.right entry) pending-commits) +                (atom.write (product.right entry) pending_commits)                  (recur |commits|&resolve)))              (#.Some [head tail])              (recur tail))))))) -(def: (process-commit commit) +(def: (process_commit commit)    (All [a] (-> (Commit a) (IO Any))) -  (let [[stm-proc output resolve] commit -        [finished-tx value] (stm-proc fresh-tx)] -    (if (can-commit? finished-tx) +  (let [[stm_proc output resolve] commit +        [finished_tx value] (stm_proc fresh_tx)] +    (if (can_commit? finished_tx)        (do {! io.monad} -        [_ (monad.map ! commit-var! finished-tx)] +        [_ (monad.map ! commit_var! finished_tx)]          (resolve value)) -      (issue-commit commit)))) +      (issue_commit commit)))) -(def: init-processor! +(def: init_processor!    (IO Any)    (do {! io.monad} -    [flag (atom.read commit-processor-flag)] +    [flag (atom.read commit_processor_flag)]      (if flag        (wrap [])        (do ! -        [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] -        (if was-first? +        [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)] +        (if was_first?            (do ! -            [[promise resolve] (atom.read pending-commits)] +            [[promise resolve] (atom.read pending_commits)]              (promise.await (function (recur [head [tail _resolve]])                               (do ! -                               [_ (process-commit head)] +                               [_ (process_commit head)]                                 (promise.await recur tail)))                             promise))            (wrap [])))        ))) -(def: #export (commit stm-proc) +(def: #export (commit stm_proc)    {#.doc (doc "Commits a transaction and returns its result (asynchronously)."                "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."                "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}    (All [a] (-> (STM a) (Promise a)))    (let [[output resolver] (promise.promise [])]      (exec (io.run (do io.monad -                    [_ init-processor!] -                    (issue-commit [stm-proc output resolver]))) +                    [_ init_processor!] +                    (issue_commit [stm_proc output resolver])))        output))) diff --git a/stdlib/source/lux/control/concurrency/thread.lux b/stdlib/source/lux/control/concurrency/thread.lux index 10ec17815..8bdd2b9c9 100644 --- a/stdlib/source/lux/control/concurrency/thread.lux +++ b/stdlib/source/lux/control/concurrency/thread.lux @@ -15,7 +15,7 @@    [//     ["." atom (#+ Atom)]]) -(with-expansions [<jvm> (as-is (host.import: java/lang/Object) +(with_expansions [<jvm> (as_is (host.import: java/lang/Object)                                 (host.import: java/lang/Runtime                                   ["#::." @@ -38,11 +38,11 @@                                   ["#::."                                    (new [int])                                    (schedule [java/lang/Runnable long java/util/concurrent/TimeUnit] #io (java/util/concurrent/ScheduledFuture java/lang/Object))]))] -  (for {@.old (as-is <jvm>) -        @.jvm (as-is <jvm>) +  (for {@.old (as_is <jvm>) +        @.jvm (as_is <jvm>)          @.js -        (as-is (host.import: (setTimeout [host.Function host.Number] #io Any)))} +        (as_is (host.import: (setTimeout [host.Function host.Number] #io Any)))}         ## Default         (type: Thread @@ -53,7 +53,7 @@  (def: #export parallelism    Nat -  (with-expansions [<jvm> (|> (java/lang/Runtime::getRuntime) +  (with_expansions [<jvm> (|> (java/lang/Runtime::getRuntime)                                (java/lang/Runtime::availableProcessors)                                .nat)]      (for {@.old <jvm> @@ -62,30 +62,30 @@           ## Default           1))) -(with-expansions [<jvm> (as-is (def: runner +(with_expansions [<jvm> (as_is (def: runner                                   java/util/concurrent/ScheduledThreadPoolExecutor                                   (java/util/concurrent/ScheduledThreadPoolExecutor::new (.int ..parallelism))))]    (for {@.old <jvm>          @.jvm <jvm>          @.js -        (as-is)} +        (as_is)}         ## Default         (def: runner           (Atom (List Thread))           (atom.atom (list))))) -(def: #export (schedule milli-seconds action) +(def: #export (schedule milli_seconds action)    (-> Nat (IO Any) (IO Any))    (for {@.old          (let [runnable (host.object [] [java/lang/Runnable]                           []                           (java/lang/Runnable [] (run self) void                                               (io.run action)))] -          (case milli-seconds +          (case milli_seconds              0 (java/util/concurrent/Executor::execute runnable runner) -            _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) java/util/concurrent/TimeUnit::MILLISECONDS +            _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS                                                                            runner)))          @.jvm @@ -93,34 +93,34 @@                           []                           (java/lang/Runnable [] (run self) void                                               (io.run action)))] -          (case milli-seconds +          (case milli_seconds              0 (java/util/concurrent/Executor::execute runnable runner) -            _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) java/util/concurrent/TimeUnit::MILLISECONDS +            _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS                                                                            runner)))          @.js          (..setTimeout [(host.closure [] (io.run action)) -                       (n.frac milli-seconds)])} +                       (n.frac milli_seconds)])}         ## Default         (do io.monad           [_ (atom.update (|>> (#.Cons {#creation (.nat ("lux io current-time")) -                                       #delay milli-seconds +                                       #delay milli_seconds                                         #action action}))                           ..runner)]           (wrap []))))  (for {@.old -      (as-is) +      (as_is)        @.jvm -      (as-is) +      (as_is)        @.js -      (as-is)} +      (as_is)}       ## Default -     (as-is (exception: #export cannot-continue-running-threads) +     (as_is (exception: #export cannot_continue_running_threads)              (def: #export (run! _)                (-> Any (IO Any)) @@ -139,11 +139,11 @@                                                                   (n.+ (get@ #delay thread))                                                                   (n.<= now)))                                                             threads)] -                     swapped? (atom.compare-and-swap threads pending ..runner)] +                     swapped? (atom.compare_and_swap threads pending ..runner)]                      (if swapped?                        (do !                          [_ (monad.map ! (get@ #action) ready)]                          (run! [])) -                      (error! (ex.construct ..cannot-continue-running-threads [])))) +                      (error! (ex.construct ..cannot_continue_running_threads []))))                    )))              ))  | 
