1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
module Server.Subscribe where
import Conduit (MonadIO (..))
import Control.Concurrent.STM (atomically, newTQueue, readTQueue,
readTVar, writeTVar)
import Control.Exception (handle)
import Control.Monad.Extra (forever, whenJust)
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as C8
import Data.Functor ((<&>))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Pool
import Data.UUID (UUID)
import Database.Persist (Entity (entityKey), SelectOpt (Desc),
entityVal, selectFirst, selectList,
(<-.), (==.), (||.))
import Database.Persist.Sql (SqlBackend)
import qualified Network.WebSockets as WS
import Persist
import Server.Base (ServerState)
import Server.Util (ServiceM)
handleSubscribe
:: Pool SqlBackend
-> ServerState
-> UUID
-> WS.Connection
-> ServiceM ()
handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do
queue <- atomically $ do
queue <- newTQueue
qs <- readTVar subscribers
writeTVar subscribers
$ M.insertWith (<>) ticketId [queue] qs
pure queue
-- send most recent ping, if any (so we won't have to wait for movement)
lastPing <- runSqlWithoutLog dbpool $ do
trackers <- getTicketTrackers ticketId
<&> fmap entityKey
selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp]
<&> fmap entityVal
whenJust lastPing $ \ping ->
WS.sendTextData conn (A.encode lastPing)
handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do
res <- atomically $ readTQueue queue
case res of
Just ping -> WS.sendTextData conn (A.encode ping)
Nothing -> do
removeSubscriber queue
WS.sendClose conn (C8.pack "train ended")
where removeSubscriber queue = atomically $ do
qs <- readTVar subscribers
writeTVar subscribers
$ M.adjust (filter (/= queue)) ticketId qs
-- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
-- => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = do
joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
<&> fmap (trackerTicketTracker . entityVal)
selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []
|