aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/process.lux
blob: e63aba14d4c468a44dd40224a692ae50aab6edfe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
(.module:
  [lux #*
   [control
    ["." monad (#+ do)]
    ["ex" exception (#+ exception:)]]
   [data
    [collection
     ["." list]]]
   [compiler
    ["." host]]
   ["." io (#+ IO io)]
   [host (#+ import: object)]]
  [//
   ["." atom (#+ Atom)]])

(`` (for {(~~ (static host.jvm))
          (as-is (import: java/lang/Object)

                 (import: java/lang/Runtime
                   (#static getRuntime [] Runtime)
                   (availableProcessors [] int))

                 (import: java/lang/Runnable)

                 (import: java/util/concurrent/TimeUnit
                   (#enum MILLISECONDS))
                 
                 (import: java/util/concurrent/Executor
                   (execute [Runnable] #io void))

                 (import: (java/util/concurrent/ScheduledFuture a))

                 (import: java/util/concurrent/ScheduledThreadPoolExecutor
                   (new [int])
                   (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))}

         (type: Process
           {#creation Nat
            #delay Nat
            #action (IO Any)})
         ))

(def: #export parallelism
  Nat
  (`` (for {(~~ (static host.jvm))
            (|> (Runtime::getRuntime)
                (Runtime::availableProcessors)
                .nat)}
           1)))

(def: runner
  (`` (for {(~~ (static host.jvm))
            (ScheduledThreadPoolExecutor::new (.int ..parallelism))}
           
           (: (Atom (List Process))
              (atom.atom (list))))))

(def: #export (schedule milli-seconds action)
  (-> Nat (IO Any) (IO Any))
  (`` (for {(~~ (static host.jvm))
            (let [runnable (object [] [Runnable]
                             []
                             (Runnable [] (run) void
                                       (io.run action)))]
              (case milli-seconds
                0 (Executor::execute runnable runner)
                _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS
                                                         runner)))}
           (atom.update (|>> (#.Cons {#creation ("lux io current-time")
                                      #delay milli-seconds
                                      #action action}))
                        runner))))

(`` (for {(~~ (static host.jvm))
          (as-is)}
         (as-is (exception: #export (cannot-continue-running-processes) "")
                
                (def: #export run!
                  (IO Any)
                  (loop [_ []]
                    (do io.Monad<IO>
                      [processes (atom.read runner)]
                      (case processes
                        ## And... we're done!
                        #.Nil
                        (wrap [])

                        _
                        (do @
                          [#let [now ("lux io current-time")
                                 [ready pending] (list.partition (function (_ process)
                                                                   (|> (get@ #creation process)
                                                                       (n/+ (get@ #delay process))
                                                                       (n/<= now)))
                                                                 processes)]
                           swapped? (atom.compare-and-swap! processes pending runner)]
                          (if swapped?
                            (monad.seq @ ready)
                            (error! (ex.construct cannot-continue-running-processes []))))
                        ))))
                )))