aboutsummaryrefslogtreecommitdiff
path: root/lib/Server/Subscribe.hs
blob: fdc092bbc184cca0ff704cbf96cd94ea1eceefdc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
module Server.Subscribe where
import           Conduit                (MonadIO (..))
import           Control.Concurrent.STM (atomically, newTQueue, readTQueue,
                                         readTVar, writeTVar)
import           Control.Exception      (handle)
import           Control.Monad.Extra    (forever, whenJust)
import qualified Data.Aeson             as A
import qualified Data.ByteString.Char8  as C8
import           Data.Functor           ((<&>))
import           Data.Map               (Map)
import qualified Data.Map               as M
import           Data.Pool
import           Data.UUID              (UUID)
import           Database.Persist       (Entity (entityKey), SelectOpt (Desc),
                                         entityVal, selectFirst, selectList,
                                         (<-.), (==.), (||.))
import           Database.Persist.Sql   (SqlBackend)
import qualified Network.WebSockets     as WS
import           Persist
import           Server.Base            (ServerState)
import           Server.Util            (ServiceM)


handleSubscribe
  :: Pool SqlBackend
  -> ServerState
  -> UUID
  -> WS.Connection
  -> ServiceM ()
handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do
  queue <- atomically $ do
    queue <- newTQueue
    qs <- readTVar subscribers
    writeTVar subscribers
      $ M.insertWith (<>) ticketId [queue] qs
    pure queue
  -- send most recent ping, if any (so we won't have to wait for movement)
  lastPing <- runSqlWithoutLog dbpool $ do
    trackers <- getTicketTrackers ticketId
      <&> fmap entityKey
    selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp]
      <&> fmap entityVal
  whenJust lastPing $ \ping ->
    WS.sendTextData conn (A.encode lastPing)
  handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do
    res <- atomically $ readTQueue queue
    case res of
      Just ping -> WS.sendTextData conn (A.encode ping)
      Nothing -> do
        removeSubscriber queue
        WS.sendClose conn (C8.pack "train ended")
    where removeSubscriber queue = atomically $ do
            qs <- readTVar subscribers
            writeTVar subscribers
             $ M.adjust (filter (/= queue)) ticketId qs

-- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
--   => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = do
  joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
           <&> fmap (trackerTicketTracker . entityVal)
  selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []