aboutsummaryrefslogtreecommitdiff
path: root/lib/Server/Subscribe.hs
blob: 86b67a6b6e488191aa515a666fb208a0b34280af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
{-# LANGUAGE BlockArguments #-}

module Server.Subscribe where
import           Conduit                         (MonadIO (..))
import           Control.Concurrent.STM          (atomically, newTQueue,
                                                  readTQueue, readTVar,
                                                  writeTVar)
import           Control.Exception               (handle)
import           Control.Monad.Extra             (forever, whenJust)
import qualified Data.Aeson                      as A
import qualified Data.ByteString.Char8           as C8
import           Data.Coerce                     (coerce)
import           Data.Functor                    ((<&>))
import           Data.Map                        (Map)
import qualified Data.Map                        as M
import           Data.Pool
import           Data.UUID                       (UUID)
import           Database.Esqueleto.Experimental hiding ((<&>))
import           Database.Persist.Sql            (SqlBackend)
import qualified Network.WebSockets              as WS
import           Persist
import           Server.Base                     (ServerState)
import           Server.Util                     (ServiceM)

handleSubscribe
  :: Pool SqlBackend
  -> ServerState
  -> UUID
  -> WS.Connection
  -> ServiceM ()
handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do
  queue <- atomically $ do
    queue <- newTQueue
    qs <- readTVar subscribers
    writeTVar subscribers
      $ M.insertWith (<>) ticketId [queue] qs
    pure queue

  -- send most recent ping, if any (so we won't have to wait for movement)
  runSqlWithoutLog dbpool (selectOne do
        ping <- from (table @Ping)
        where_ (ping ^. PingTicket ==. val (Just (coerce ticketId)))
        orderBy [desc (ping ^. PingTimestamp)]
        pure ping)
    <&> fmap entityVal
    >>= flip whenJust (WS.sendTextData conn . A.encode)

  handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do
    res <- atomically $ readTQueue queue
    case res of
      Just ping -> WS.sendTextData conn (A.encode ping)
      Nothing -> do
        removeSubscriber queue
        WS.sendClose conn (C8.pack "train ended")
    where removeSubscriber queue = atomically $ do
            qs <- readTVar subscribers
            writeTVar subscribers
             $ M.adjust (filter (/= queue)) ticketId qs

-- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
--   => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = select do
    (tracker :& trackerticket) <- from $
      table @Tracker
      `innerJoin`
      table @TrackerTicket
      `on` \(tr :& ti) -> tr ^. TrackerId ==. ti ^. TrackerTicketTracker

    where_ $
      tracker ^. TrackerCurrentTicket ==. val (Just (TicketKey ticketId))
      ||. trackerticket ^. TrackerTicketTicket ==. val (TicketKey ticketId)

    pure tracker

  -- joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
  --          <&> fmap (trackerTicketTracker . entityVal)
  -- selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []