diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 22 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 81 | ||||
-rw-r--r-- | stdlib/source/lux/tool/compiler/default/platform.lux | 113 | ||||
-rw-r--r-- | stdlib/source/lux/tool/compiler/language/lux/generation.lux | 1 | ||||
-rw-r--r-- | stdlib/source/lux/tool/compiler/meta/archive/document.lux | 2 | ||||
-rw-r--r-- | stdlib/source/lux/type.lux | 3 | ||||
-rw-r--r-- | stdlib/source/program/compositor.lux | 4 | ||||
-rw-r--r-- | stdlib/source/test/lux/abstract/apply.lux | 15 | ||||
-rw-r--r-- | stdlib/source/test/lux/control/concurrency/stm.lux | 136 |
9 files changed, 209 insertions, 168 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index a9beb4a0e..17ae28f41 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -8,7 +8,7 @@ ["." monad (#+ Monad do)]] [control ["." try (#+ Try)] - ["ex" exception (#+ exception:)] + ["." exception (#+ exception:)] ["." io (#+ IO io)]] [data ["." maybe ("#@." functor)] @@ -45,13 +45,13 @@ stopped? (current #.None)] (if stopped? ## I closed the sink. - (wrap (ex.return [])) + (wrap (exception.return [])) ## Someone else interacted with the sink. (do @ [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink. - (wrap (ex.throw channel-is-already-closed [])) + (wrap (exception.throw ..channel-is-already-closed [])) ## Someone else fed the sink while I was closing it. (recur []))))))) @@ -70,13 +70,13 @@ ## I fed the sink. (do @ [_ (atom.compare-and-swap current resolve-next sink)] - (wrap (ex.return []))) + (wrap (exception.return []))) ## Someone else interacted with the sink. (do @ [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink while I was feeding it. - (wrap (ex.throw channel-is-already-closed [])) + (wrap (exception.throw ..channel-is-already-closed [])) ## Someone else fed the sink. (recur [])))))))))) @@ -85,14 +85,18 @@ (let [[promise resolve] (promise.promise [])] [promise (..sink resolve)])) -(structure: #export functor (Functor Channel) +(structure: #export functor + (Functor Channel) + (def: (map f) (promise@map (maybe@map (function (_ [head tail]) [(f head) (map f tail)]))))) -(structure: #export apply (Apply Channel) +(structure: #export apply + (Apply Channel) + (def: &functor ..functor) (def: (apply ff fa) @@ -108,7 +112,9 @@ (def: empty Channel (promise.resolved #.None)) -(structure: #export monad (Monad Channel) +(structure: #export monad + (Monad Channel) + (def: &functor ..functor) (def: (wrap a) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 783bc2117..3065d8033 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -3,9 +3,10 @@ [abstract [functor (#+ Functor)] [apply (#+ Apply)] - ["." monad (#+ do Monad)]] + ["." monad (#+ Monad do)]] [control - ["." io (#+ IO io)]] + ["." io (#+ IO io)] + ["." try]] [data ["." product] ["." maybe] @@ -16,53 +17,63 @@ [// ["." atom (#+ Atom atom)] ["." promise (#+ Promise Resolver)] - ["." frp]]) + ["." frp (#+ Channel Sink)]]) -(type: #export (Observer a) +(type: (Observer a) (-> a (IO Any))) (abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom [a (List (Observer a))]) + (Atom [a (List (Sink a))]) (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) (:abstraction (atom.atom [value (list)]))) - (def: read!! + (def: read! (All [a] (-> (Var a) a)) (|>> :representation atom.read io.run product.left)) - (def: #export (read! (^:representation var)) - {#.doc "Reads var immediately, without going through a transaction."} - (All [a] (-> (Var a) (IO a))) - (|> var - atom.read - (:: io.functor map product.left))) + (def: (un-follow sink var) + (All [a] (-> (Sink a) (Var a) (IO Any))) + (do io.monad + [_ (atom.update (function (_ [value observers]) + [value (list.filter (|>> (is? sink) not) observers)]) + (:representation var))] + (wrap []))) - (def: (write! new-value (^:representation var)) + (def: (write! new-value var) (All [a] (-> a (Var a) (IO Any))) (do io.monad - [(^@ old [_value _observers]) (atom.read var) - succeeded? (atom.compare-and-swap old [new-value _observers] var)] + [#let [var' (:representation var)] + (^@ old [old-value observers]) (atom.read var') + succeeded? (atom.compare-and-swap old [new-value observers] var')] (if succeeded? (do @ - [_ (monad.map @ (function (_ f) (f new-value)) _observers)] + [_ (monad.map @ (function (_ sink) + (do @ + [result (:: sink feed new-value)] + (case result + (#try.Success _) + (wrap []) + + (#try.Failure _) + (un-follow sink var)))) + observers)] (wrap [])) - (write! new-value (:abstraction var))))) + (write! new-value var)))) (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO (frp.Channel a)))) + (All [a] (-> (Var a) (IO [(Channel a) (Sink a)]))) (do io.monad - [#let [[channel source] (frp.channel []) - target (:representation target)] + [#let [[channel sink] (frp.channel [])] _ (atom.update (function (_ [value observers]) - [value (#.Cons (:: source feed) observers)]) - target)] - (wrap channel))) + [value (#.Cons sink observers)]) + (:representation target))] + (wrap [channel sink]))) ) (type: (Tx-Frame a) @@ -96,7 +107,7 @@ [tx value] #.None - (let [value (read!! var)] + (let [value (..read! var)] [(#.Cons [var value value] tx) value])))) @@ -129,16 +140,20 @@ []] #.None - [(#.Cons [var (read!! var) value] tx) + [(#.Cons [var (..read! var) value] tx) []]))) -(structure: #export functor (Functor STM) +(structure: #export functor + (Functor STM) + (def: (map f fa) (function (_ tx) (let [[tx' a] (fa tx)] [tx' (f a)])))) -(structure: #export apply (Apply STM) +(structure: #export apply + (Apply STM) + (def: &functor ..functor) (def: (apply ff fa) @@ -147,7 +162,9 @@ [tx'' a] (fa tx')] [tx'' (f a)])))) -(structure: #export monad (Monad STM) +(structure: #export monad + (Monad STM) + (def: &functor ..functor) (def: (wrap a) @@ -162,22 +179,22 @@ {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) (do ..monad - [a (read var) + [a (..read var) #let [a' (f a)] - _ (write a' var)] + _ (..write a' var)] (wrap [a a']))) (def: (can-commit? tx) (-> Tx Bit) (list.every? (function (_ [_var _original _current]) - (is? _original (read!! _var))) + (is? _original (..read! _var))) tx)) (def: (commit-var! [_var _original _current]) (-> (Ex [a] (Tx-Frame a)) (IO Any)) (if (is? _original _current) (io []) - (write! _current _var))) + (..write! _current _var))) (def: fresh-tx Tx (list)) diff --git a/stdlib/source/lux/tool/compiler/default/platform.lux b/stdlib/source/lux/tool/compiler/default/platform.lux index f562e762a..75ef54731 100644 --- a/stdlib/source/lux/tool/compiler/default/platform.lux +++ b/stdlib/source/lux/tool/compiler/default/platform.lux @@ -243,16 +243,12 @@ (All [<type-vars>] (-> <Context> (-> <Compiler> <Importer>))) - (let [current (:share [<type-vars>] - {<Context> - initial} - {(Var <Context>) - (stm.var initial)}) + (let [current (stm.var initial) pending (:share [<type-vars>] {<Context> initial} {(Var (Dictionary Module <Pending>)) - (stm.var (dictionary.new text.hash))})] + (:assume (stm.var (dictionary.new text.hash)))})] (function (_ compile) (function (import! module) (do promise.monad @@ -262,38 +258,39 @@ {(Promise [<Return> (Maybe [<Context> archive.ID <Signal>])]) - (stm.commit - (do stm.monad - [[archive state] (stm.read current)] - (if (archive.archived? archive module) - (wrap [(promise@wrap (#try.Success [archive state])) - #.None]) - (do @ - [@pending (stm.read pending)] - (case (dictionary.get module @pending) - (#.Some [return signal]) - (wrap [return - #.None]) - - #.None - (case (archive.reserve module archive) - (#try.Success [module-id archive]) - (do @ - [_ (stm.write [archive state] current) - #let [[return signal] (:share [<type-vars>] - {<Context> - initial} - {<Pending> - (promise.promise [])})] - _ (stm.update (dictionary.put module [return signal]) pending)] - (wrap [return - (#.Some [[archive state] - module-id - signal])])) - - (#try.Failure error) - (wrap [(promise@wrap (#try.Failure error)) - #.None])))))))}) + (:assume + (stm.commit + (do stm.monad + [[archive state] (stm.read current)] + (if (archive.archived? archive module) + (wrap [(promise@wrap (#try.Success [archive state])) + #.None]) + (do @ + [@pending (stm.read pending)] + (case (dictionary.get module @pending) + (#.Some [return signal]) + (wrap [return + #.None]) + + #.None + (case (archive.reserve module archive) + (#try.Success [module-id archive]) + (do @ + [_ (stm.write [archive state] current) + #let [[return signal] (:share [<type-vars>] + {<Context> + initial} + {<Pending> + (promise.promise [])})] + _ (stm.update (dictionary.put module [return signal]) pending)] + (wrap [return + (#.Some [[archive state] + module-id + signal])])) + + (#try.Failure error) + (wrap [(promise@wrap (#try.Failure error)) + #.None]))))))))}) _ (case signal #.None (wrap []) @@ -361,7 +358,8 @@ {<Context> context} {(///.Compiler <State+> .Module Any) - ((//init.compiler expander syntax.prelude (get@ #write platform)) $.key (list))}) + (:assume + ((//init.compiler expander syntax.prelude (get@ #write platform)) $.key (list)))}) parallel-compiler (..parallel context (function (_ import! module-id [archive state] module) @@ -383,29 +381,22 @@ platform} {(-> <Context> (///.Compilation <State+> .Module Any) (List Module) (Action [Archive <State+>])) - recur})] - archive,document+ (|> new-dependencies - (list@map import!) - (monad.seq ..monad)) - #let [archive (case archive,document+ - #.Nil - archive - - archive,document+ - (|> archive,document+ - (list@map product.left) - (list@fold archive.merge archive))) - state (case archive,document+ - #.Nil - state + (:assume + recur)})] + [archive state] (case new-dependencies + #.Nil + (wrap [archive state]) - archive,document+ - (try.assume - (:share [|state|] - {|state| - state} - {(Try |state|) - (..updated-state archive state)})))]] + (#.Cons _) + (do @ + [archive,document+ (|> new-dependencies + (list@map import!) + (monad.seq ..monad)) + #let [archive (|> archive,document+ + (list@map product.left) + (list@fold archive.merge archive))]] + (wrap [archive (try.assume + (..updated-state archive state))])))] (case ((get@ #///.process compilation) ## TODO: The "///directive.set-current-module" below shouldn't be necessary. Remove it ASAP. ## TODO: The context shouldn't need to be re-set either. diff --git a/stdlib/source/lux/tool/compiler/language/lux/generation.lux b/stdlib/source/lux/tool/compiler/language/lux/generation.lux index 7196d13f1..3d2e6b3a3 100644 --- a/stdlib/source/lux/tool/compiler/language/lux/generation.lux +++ b/stdlib/source/lux/tool/compiler/language/lux/generation.lux @@ -78,6 +78,7 @@ [Phase extension.Phase] [Handler extension.Handler] [Bundle extension.Bundle] + [Extender extension.Extender] ) (def: #export (state host module) diff --git a/stdlib/source/lux/tool/compiler/meta/archive/document.lux b/stdlib/source/lux/tool/compiler/meta/archive/document.lux index 19b8576a1..825436999 100644 --- a/stdlib/source/lux/tool/compiler/meta/archive/document.lux +++ b/stdlib/source/lux/tool/compiler/meta/archive/document.lux @@ -40,7 +40,7 @@ {(Key e) key} {e - document//content})) + (:assume document//content)})) (exception.throw ..invalid-signature [(key.signature key) document//signature])))) diff --git a/stdlib/source/lux/type.lux b/stdlib/source/lux/type.lux index c3a17f34a..161cee0f7 100644 --- a/stdlib/source/lux/type.lux +++ b/stdlib/source/lux/type.lux @@ -403,8 +403,7 @@ (-> (~ (get@ #type exemplar)) (~ (get@ #type computation)))) (.function ((~ g!_) (~ g!_)) - ## TODO: this should use : instead of :assume - (:assume (~ (get@ #expression computation))))))] + (~ (get@ #expression computation)))))] (wrap (list (` ((~ shareC) (~ (get@ #expression exemplar))))))))) (syntax: #export (:by-example {type-vars type-parameters} diff --git a/stdlib/source/program/compositor.lux b/stdlib/source/program/compositor.lux index 43bc084c5..ecce5fa65 100644 --- a/stdlib/source/program/compositor.lux +++ b/stdlib/source/program/compositor.lux @@ -98,12 +98,12 @@ platform} {(Promise (Try [(directive.State+ <parameters>) Archive])) - (platform.initialize static (get@ #/cli.module configuration) expander host-analysis platform generation-bundle host-directive-bundle program extender)}) + (:assume (platform.initialize static (get@ #/cli.module configuration) expander host-analysis platform generation-bundle host-directive-bundle program extender))}) [archive state] (:share [<parameters>] {(Platform <parameters>) platform} {(Promise (Try [Archive (directive.State+ <parameters>)])) - (platform.compile static expander platform configuration [archive state])}) + (:assume (platform.compile static expander platform configuration [archive state]))}) _ (ioW.freeze (get@ #platform.&file-system platform) (get@ #/static.host static) (get@ #/static.target static) archive)] (wrap (log! "Compilation complete!")))) diff --git a/stdlib/source/test/lux/abstract/apply.lux b/stdlib/source/test/lux/abstract/apply.lux index c53283233..c9a6be500 100644 --- a/stdlib/source/test/lux/abstract/apply.lux +++ b/stdlib/source/test/lux/abstract/apply.lux @@ -63,11 +63,10 @@ (def: #export (spec injection comparison apply) (All [f] (-> (Injection f) (Comparison f) (Apply f) Test)) - (<| (_.covering /._) - (_.with-cover [/.Apply] - ($_ _.and - (..identity injection comparison apply) - (..homomorphism injection comparison apply) - (..interchange injection comparison apply) - (..composition injection comparison apply) - )))) + (_.with-cover [/.Apply] + ($_ _.and + (..identity injection comparison apply) + (..homomorphism injection comparison apply) + (..interchange injection comparison apply) + (..composition injection comparison apply) + ))) diff --git a/stdlib/source/test/lux/control/concurrency/stm.lux b/stdlib/source/test/lux/control/concurrency/stm.lux index c84663a96..07d0c946b 100644 --- a/stdlib/source/test/lux/control/concurrency/stm.lux +++ b/stdlib/source/test/lux/control/concurrency/stm.lux @@ -1,7 +1,13 @@ (.module: [lux #* ["_" test (#+ Test)] - ["M" abstract/monad (#+ do Monad)] + [abstract + ["." monad (#+ Monad do)] + {[0 #test] + [/ + ["$." functor (#+ Injection Comparison)] + ["$." apply] + ["$." monad]]}] [control ["." io (#+ IO)]] [data @@ -11,74 +17,96 @@ [collection ["." list ("#@." functor)]]] [math - ["r" random]]] + ["." random]]] {1 ["." / [// ["." atom (#+ Atom atom)] - ["." process] ["." promise] ["." frp (#+ Channel)]]]}) -(def: (read! channel) - (All [a] (-> (Channel a) (IO (Atom (List a))))) - (do io.monad - [#let [output (atom (list))] - _ (frp.listen (function (_ value) - ## TODO: Simplify when possible. - (do @ - [_ (atom.update (|>> (#.Cons value)) output)] - (wrap []))) - channel)] - (wrap output))) +(def: injection + (Injection /.STM) + (:: /.monad wrap)) -(def: iterations-per-process Nat 100) +(def: comparison + (Comparison /.STM) + (function (_ == left right) + (io.run + (do io.monad + [?left (promise.poll (/.commit left)) + ?right (promise.poll (/.commit right))] + (wrap (case [?left ?right] + [(#.Some left) + (#.Some right)] + (== left right) + + _ + false)))))) (def: #export test Test - (do r.monad - [_ (wrap [])] - (<| (_.context (%.name (name-of /.STM))) + (<| (_.covering /._) + (do random.monad + [dummy random.nat + expected random.nat + iterations-per-process (|> random.nat (:: @ map (n.% 100)))] ($_ _.and + (_.with-cover [/.functor] + ($functor.spec ..injection ..comparison /.functor)) + (_.with-cover [/.apply] + ($apply.spec ..injection ..comparison /.apply)) + (_.with-cover [/.monad] + ($monad.spec ..injection ..comparison /.monad)) + (wrap (do promise.monad - [output (/.commit (/.read (/.var 0)))] - (_.assert "Can read STM vars." - (n.= 0 output)))) + [actual (/.commit (:: /.monad wrap expected))] + (_.claim [/.commit] + (n.= expected actual)))) (wrap (do promise.monad - [#let [_var (/.var 0)] - output (/.commit (do /.monad - [_ (/.write 5 _var)] - (/.read _var)))] - (_.assert "Can write STM vars." - (n.= 5 output)))) + [actual (/.commit (/.read (/.var expected)))] + (_.claim [/.Var /.var /.read] + (n.= expected actual)))) + (wrap (do promise.monad + [actual (let [box (/.var dummy)] + (/.commit (do /.monad + [_ (/.write expected box)] + (/.read box)))) + verdict (let [box (/.var dummy)] + (/.commit (do /.monad + [_ (/.write expected box) + actual (/.read box)] + (wrap (n.= expected actual)))))] + (_.claim [/.write] + (and (n.= expected actual) + verdict)))) (wrap (do promise.monad - [#let [_var (/.var 5)] + [#let [box (/.var dummy)] output (/.commit (do /.monad - [_ (/.update (n.* 3) _var)] - (/.read _var)))] - (_.assert "Can update STM vars." - (n.= 15 output)))) + [_ (/.update (n.+ expected) box)] + (/.read box)))] + (_.claim [/.update] + (n.= (n.+ expected dummy) + output)))) (wrap (do promise.monad - [#let [_var (/.var 0) - changes (io.run (read! (io.run (/.follow _var))))] - _ (/.commit (/.write 5 _var)) - _ (/.commit (/.update (n.* 3) _var)) - changes (promise.future (atom.read changes))] - (_.assert "Can follow all the changes to STM vars." - (:: (list.equivalence n.equivalence) = - (list 5 15) - (list.reverse changes))))) - (wrap (let [_concurrency-var (/.var 0)] + [#let [box (/.var dummy) + [follower sink] (io.run (/.follow box))] + _ (/.commit (/.write expected box)) + _ (/.commit (/.update (n.* 2) box)) + _ (promise.future (:: sink close)) + _ (/.commit (/.update (n.* 3) box)) + changes (frp.consume follower)] + (_.claim [/.follow] + (:: (list.equivalence n.equivalence) = + (list expected (n.* 2 expected)) + changes)))) + (wrap (let [var (/.var 0)] (do promise.monad - [_ (|> process.parallelism - (list.n/range 1) - (list@map (function (_ _) - (|> iterations-per-process - (list.n/range 1) - (M.map @ (function (_ _) (/.commit (/.update inc _concurrency-var))))))) - (M.seq @)) - last-val (/.commit (/.read _concurrency-var))] - (_.assert "Can modify STM vars concurrently from multiple threads." - (|> process.parallelism - (n.* iterations-per-process) - (n.= last-val)))))))))) + [_ (|> (list.repeat iterations-per-process []) + (list@map (function (_ _) (/.commit (/.update inc var)))) + (monad.seq @)) + cummulative (/.commit (/.read var))] + (_.claim [/.STM] + (n.= iterations-per-process + cummulative))))) + )))) |