diff options
Diffstat (limited to 'stdlib/source/library/lux/world/file/watch.lux')
-rw-r--r-- | stdlib/source/library/lux/world/file/watch.lux | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/stdlib/source/library/lux/world/file/watch.lux b/stdlib/source/library/lux/world/file/watch.lux new file mode 100644 index 000000000..df655ed9c --- /dev/null +++ b/stdlib/source/library/lux/world/file/watch.lux @@ -0,0 +1,459 @@ +(.module: + [library + [lux #* + ["@" target] + ["." ffi (#+ import:)] + [abstract + [predicate (#+ Predicate)] + ["." monad (#+ do)]] + [control + ["." io (#+ IO)] + ["." try (#+ Try)] + ["." exception (#+ exception:)] + [concurrency + ["." promise (#+ Promise)] + ["." stm (#+ STM Var)]]] + [data + ["." product] + ["." maybe] + ["." text + ["%" format (#+ format)]] + [collection + ["." dictionary (#+ Dictionary)] + ["." list ("#\." functor monoid fold)] + ["." set] + ["." array]]] + [math + [number + ["n" nat]]] + [time + ["." instant (#+ Instant) ("#\." equivalence)]] + [type + [abstract (#+ abstract: :representation :abstraction)]]]] + ["." //]) + +(abstract: #export Concern + {#create Bit + #modify Bit + #delete Bit} + + (def: none + Concern + (:abstraction + {#create false + #modify false + #delete false})) + + (template [<concern> <predicate> <event> <create> <modify> <delete>] + [(def: #export <concern> + Concern + (:abstraction + {#create <create> + #modify <modify> + #delete <delete>})) + + (def: #export <predicate> + (Predicate Concern) + (|>> :representation (get@ <event>)))] + + [creation creation? #create + true false false] + [modification modification? #modify + false true false] + [deletion deletion? #delete + false false true] + ) + + (def: #export (also left right) + (-> Concern Concern Concern) + (:abstraction + {#create (or (..creation? left) (..creation? right)) + #modify (or (..modification? left) (..modification? right)) + #delete (or (..deletion? left) (..deletion? right))})) + + (def: #export all + Concern + ($_ ..also + ..creation + ..modification + ..deletion + )) + ) + +(interface: #export (Watcher !) + (: (-> Concern //.Path (! (Try Any))) + start) + (: (-> //.Path (! (Try Concern))) + concern) + (: (-> //.Path (! (Try Concern))) + stop) + (: (-> [] (! (Try (List [Concern //.Path])))) + poll)) + +(template [<name>] + [(exception: #export (<name> {path //.Path}) + (exception.report + ["Path" (%.text path)]))] + + [not_being_watched] + [cannot_poll_a_non_existent_directory] + ) + +(type: File_Tracker + (Dictionary //.Path Instant)) + +(type: Directory_Tracker + (Dictionary //.Path [Concern File_Tracker])) + +(def: (update_watch! new_concern path tracker) + (-> Concern //.Path (Var Directory_Tracker) (STM Bit)) + (do {! stm.monad} + [@tracker (stm.read tracker)] + (case (dictionary.get path @tracker) + (#.Some [old_concern last_modified]) + (do ! + [_ (stm.update (dictionary.put path [new_concern last_modified]) tracker)] + (wrap true)) + + #.None + (wrap false)))) + +(def: (file_tracker fs directory) + (-> (//.System Promise) //.Path (Promise (Try File_Tracker))) + (do {! (try.with promise.monad)} + [files (\ fs directory_files directory)] + (monad.fold ! + (function (_ file tracker) + (do ! + [last_modified (\ fs last_modified file)] + (wrap (dictionary.put file last_modified tracker)))) + (: File_Tracker + (dictionary.new text.hash)) + files))) + +(def: (poll_files fs directory) + (-> (//.System Promise) //.Path (Promise (Try (List [//.Path Instant])))) + (do {! (try.with promise.monad)} + [files (\ fs directory_files directory)] + (monad.map ! (function (_ file) + (|> file + (\ fs last_modified) + (\ ! map (|>> [file])))) + files))) + +(def: (poll_directory_changes fs [directory [concern file_tracker]]) + (-> (//.System Promise) [//.Path [Concern File_Tracker]] + (Promise (Try [[//.Path [Concern File_Tracker]] + [(List [//.Path Instant]) + (List [//.Path Instant Instant]) + (List //.Path)]]))) + (do {! (try.with promise.monad)} + [current_files (..poll_files fs directory) + #let [creations (if (..creation? concern) + (list.filter (|>> product.left (dictionary.key? file_tracker) not) + current_files) + (list)) + available (|> current_files + (list\map product.left) + (set.from_list text.hash)) + deletions (if (..deletion? concern) + (|> (dictionary.entries file_tracker) + (list\map product.left) + (list.filter (|>> (set.member? available) not))) + (list)) + modifications (list.all (function (_ [path current_modification]) + (do maybe.monad + [previous_modification (dictionary.get path file_tracker)] + (wrap [path previous_modification current_modification]))) + current_files)]] + (wrap [[directory + [concern + (let [with_deletions (list\fold dictionary.remove file_tracker deletions) + with_creations (list\fold (function (_ [path last_modified] tracker) + (dictionary.put path last_modified tracker)) + with_deletions + creations) + with_modifications (list\fold (function (_ [path previous_modification current_modification] tracker) + (dictionary.put path current_modification tracker)) + with_creations + modifications)] + with_modifications)]] + [creations + modifications + deletions]]))) + +(def: #export (polling fs) + (-> (//.System Promise) (Watcher Promise)) + (let [tracker (: (Var Directory_Tracker) + (stm.var (dictionary.new text.hash)))] + (implementation + (def: (start new_concern path) + (do {! promise.monad} + [exists? (\ fs directory? path)] + (if exists? + (do ! + [updated? (stm.commit (..update_watch! new_concern path tracker))] + (if updated? + (wrap (#try.Success [])) + (do (try.with !) + [file_tracker (..file_tracker fs path)] + (do ! + [_ (stm.commit (stm.update (dictionary.put path [new_concern file_tracker]) tracker))] + (wrap (#try.Success [])))))) + (wrap (exception.throw ..cannot_poll_a_non_existent_directory [path]))))) + (def: (concern path) + (stm.commit + (do stm.monad + [@tracker (stm.read tracker)] + (wrap (case (dictionary.get path @tracker) + (#.Some [concern file_tracker]) + (#try.Success concern) + + #.None + (exception.throw ..not_being_watched [path])))))) + (def: (stop path) + (stm.commit + (do {! stm.monad} + [@tracker (stm.read tracker)] + (case (dictionary.get path @tracker) + (#.Some [concern file_tracker]) + (do ! + [_ (stm.update (dictionary.remove path) tracker)] + (wrap (#try.Success concern))) + + #.None + (wrap (exception.throw ..not_being_watched [path])))))) + (def: (poll _) + (do promise.monad + [@tracker (stm.commit (stm.read tracker))] + (do {! (try.with promise.monad)} + [changes (|> @tracker + dictionary.entries + (monad.map ! (..poll_directory_changes fs))) + _ (do promise.monad + [_ (stm.commit (stm.write (|> changes + (list\map product.left) + (dictionary.from_list text.hash)) + tracker))] + (wrap (#try.Success []))) + #let [[creations modifications deletions] + (list\fold (function (_ [_ [creations modifications deletions]] + [all_creations all_modifications all_deletions]) + [(list\compose creations all_creations) + (list\compose modifications all_modifications) + (list\compose deletions all_deletions)]) + [(list) (list) (list)] + changes)]] + (wrap ($_ list\compose + (list\map (|>> product.left [..creation]) creations) + (|> modifications + (list.filter (function (_ [path previous_modification current_modification]) + (not (instant\= previous_modification current_modification)))) + (list\map (|>> product.left [..modification]))) + (list\map (|>> [..deletion]) deletions) + ))))) + ))) + +(def: #export (mock separator) + (-> Text [(//.System Promise) (Watcher Promise)]) + (let [fs (//.mock separator)] + [fs + (..polling fs)])) + +(with_expansions [<jvm> (as_is (import: java/lang/Object) + + (import: java/lang/String) + + (import: (java/util/List a) + ["#::." + (size [] int) + (get [int] a)]) + + (def: (default_list list) + (All [a] (-> (java/util/List a) (List a))) + (let [size (.nat (java/util/List::size list))] + (loop [idx 0 + output #.Nil] + (if (n.< size idx) + (recur (inc idx) + (#.Cons (java/util/List::get (.int idx) list) + output)) + output)))) + + (import: (java/nio/file/WatchEvent$Kind a)) + + (import: (java/nio/file/WatchEvent a) + ["#::." + (kind [] (java/nio/file/WatchEvent$Kind a))]) + + (import: java/nio/file/Watchable) + + (import: java/nio/file/Path + ["#::." + (register [java/nio/file/WatchService [(java/nio/file/WatchEvent$Kind [? < java/lang/Object])]] #io #try java/nio/file/WatchKey) + (toString [] java/lang/String)]) + + (import: java/nio/file/StandardWatchEventKinds + ["#::." + (#static ENTRY_CREATE (java/nio/file/WatchEvent$Kind java/nio/file/Path)) + (#static ENTRY_MODIFY (java/nio/file/WatchEvent$Kind java/nio/file/Path)) + (#static ENTRY_DELETE (java/nio/file/WatchEvent$Kind java/nio/file/Path))]) + + (def: (default_event_concern event) + (All [a] + (-> (java/nio/file/WatchEvent a) Concern)) + (let [kind (:as (java/nio/file/WatchEvent$Kind java/nio/file/Path) + (java/nio/file/WatchEvent::kind event))] + (cond (is? (java/nio/file/StandardWatchEventKinds::ENTRY_CREATE) + kind) + ..creation + + (is? (java/nio/file/StandardWatchEventKinds::ENTRY_MODIFY) + kind) + ..modification + + (is? (java/nio/file/StandardWatchEventKinds::ENTRY_DELETE) + kind) + ..deletion + + ## else + ..none + ))) + + (import: java/nio/file/WatchKey + ["#::." + (reset [] #io boolean) + (cancel [] #io void) + (watchable [] java/nio/file/Watchable) + (pollEvents [] #io (java/util/List (java/nio/file/WatchEvent ?)))]) + + (def: default_key_concern + (-> java/nio/file/WatchKey (IO Concern)) + (|>> java/nio/file/WatchKey::pollEvents + (\ io.monad map (|>> ..default_list + (list\map default_event_concern) + (list\fold ..also ..none))))) + + (import: java/nio/file/WatchService + ["#::." + (poll [] #io #try #? java/nio/file/WatchKey)]) + + (import: java/nio/file/FileSystem + ["#::." + (newWatchService [] #io #try java/nio/file/WatchService)]) + + (import: java/nio/file/FileSystems + ["#::." + (#static getDefault [] java/nio/file/FileSystem)]) + + (import: java/io/File + ["#::." + (new [java/lang/String]) + (toPath [] java/nio/file/Path)]) + + (type: Watch_Event + (java/nio/file/WatchEvent$Kind java/lang/Object)) + + (def: (default_start watch_events watcher path) + (-> (List Watch_Event) java/nio/file/WatchService //.Path (Promise (Try java/nio/file/WatchKey))) + (let [watch_events' (list\fold (function (_ [index watch_event] watch_events') + (ffi.array_write index watch_event watch_events')) + (ffi.array (java/nio/file/WatchEvent$Kind java/lang/Object) + (list.size watch_events)) + (list.enumeration watch_events))] + (promise.future + (java/nio/file/Path::register watcher + watch_events' + (|> path java/io/File::new java/io/File::toPath))))) + + (def: (default_poll watcher) + (-> java/nio/file/WatchService (IO (Try (List [Concern //.Path])))) + (loop [output (: (List [Concern //.Path]) + (list))] + (do (try.with io.monad) + [?key (java/nio/file/WatchService::poll watcher)] + (case ?key + (#.Some key) + (do {! io.monad} + [valid? (java/nio/file/WatchKey::reset key)] + (if valid? + (do ! + [#let [path (|> key + java/nio/file/WatchKey::watchable + (:as java/nio/file/Path) + java/nio/file/Path::toString + (:as //.Path))] + concern (..default_key_concern key)] + (recur (#.Cons [concern path] + output))) + (recur output))) + + #.None + (wrap output))))) + + (def: (watch_events concern) + (-> Concern (List Watch_Event)) + ($_ list\compose + (if (..creation? concern) + (list (:as Watch_Event (java/nio/file/StandardWatchEventKinds::ENTRY_CREATE))) + (list)) + (if (..modification? concern) + (list (:as Watch_Event (java/nio/file/StandardWatchEventKinds::ENTRY_MODIFY))) + (list)) + (if (..deletion? concern) + (list (:as Watch_Event (java/nio/file/StandardWatchEventKinds::ENTRY_DELETE))) + (list)) + )) + + (def: #export default + (IO (Try (Watcher Promise))) + (do (try.with io.monad) + [watcher (java/nio/file/FileSystem::newWatchService + (java/nio/file/FileSystems::getDefault)) + #let [tracker (stm.var (: (Dictionary //.Path [Concern java/nio/file/WatchKey]) + (dictionary.new text.hash))) + + stop (: (-> //.Path (Promise (Try Concern))) + (function (_ path) + (do {! promise.monad} + [@tracker (stm.commit (stm.read tracker))] + (case (dictionary.get path @tracker) + (#.Some [concern key]) + (do ! + [_ (promise.future + (java/nio/file/WatchKey::cancel key)) + _ (stm.commit (stm.update (dictionary.remove path) tracker))] + (wrap (#try.Success concern))) + + #.None + (wrap (exception.throw ..not_being_watched [path]))))))]] + (wrap (: (Watcher Promise) + (implementation + (def: (start concern path) + (do promise.monad + [?concern (stop path)] + (do (try.with promise.monad) + [key (..default_start (..watch_events (..also (try.default ..none ?concern) + concern)) + watcher + path)] + (do promise.monad + [_ (stm.commit (stm.update (dictionary.put path [concern key]) tracker))] + (wrap (#try.Success [])))))) + (def: (concern path) + (do promise.monad + [@tracker (stm.commit (stm.read tracker))] + (case (dictionary.get path @tracker) + (#.Some [concern key]) + (wrap (#try.Success concern)) + + #.None + (wrap (exception.throw ..not_being_watched [path]))))) + (def: stop stop) + (def: (poll _) + (promise.future (..default_poll watcher))) + ))))) + )] + (for {@.old (as_is <jvm>) + @.jvm (as_is <jvm>)} + (as_is))) |