aboutsummaryrefslogtreecommitdiff
path: root/lib/Server/Subscribe.hs
diff options
context:
space:
mode:
authorstuebinm2024-05-08 22:42:35 +0200
committerstuebinm2024-05-08 22:43:05 +0200
commitad8a09cafa519a15a22cafbfd2fa289538edc73d (patch)
tree81f49d19669d5895115a1e8d39bd3557fc0c03d8 /lib/Server/Subscribe.hs
parent0febc9cd99e0d8b80b1385593e25e7670d5c842b (diff)
restructure: split up the server module
Diffstat (limited to 'lib/Server/Subscribe.hs')
-rw-r--r--lib/Server/Subscribe.hs63
1 files changed, 63 insertions, 0 deletions
diff --git a/lib/Server/Subscribe.hs b/lib/Server/Subscribe.hs
new file mode 100644
index 0000000..fdc092b
--- /dev/null
+++ b/lib/Server/Subscribe.hs
@@ -0,0 +1,63 @@
+
+module Server.Subscribe where
+import Conduit (MonadIO (..))
+import Control.Concurrent.STM (atomically, newTQueue, readTQueue,
+ readTVar, writeTVar)
+import Control.Exception (handle)
+import Control.Monad.Extra (forever, whenJust)
+import qualified Data.Aeson as A
+import qualified Data.ByteString.Char8 as C8
+import Data.Functor ((<&>))
+import Data.Map (Map)
+import qualified Data.Map as M
+import Data.Pool
+import Data.UUID (UUID)
+import Database.Persist (Entity (entityKey), SelectOpt (Desc),
+ entityVal, selectFirst, selectList,
+ (<-.), (==.), (||.))
+import Database.Persist.Sql (SqlBackend)
+import qualified Network.WebSockets as WS
+import Persist
+import Server.Base (ServerState)
+import Server.Util (ServiceM)
+
+
+handleSubscribe
+ :: Pool SqlBackend
+ -> ServerState
+ -> UUID
+ -> WS.Connection
+ -> ServiceM ()
+handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do
+ queue <- atomically $ do
+ queue <- newTQueue
+ qs <- readTVar subscribers
+ writeTVar subscribers
+ $ M.insertWith (<>) ticketId [queue] qs
+ pure queue
+ -- send most recent ping, if any (so we won't have to wait for movement)
+ lastPing <- runSqlWithoutLog dbpool $ do
+ trackers <- getTicketTrackers ticketId
+ <&> fmap entityKey
+ selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp]
+ <&> fmap entityVal
+ whenJust lastPing $ \ping ->
+ WS.sendTextData conn (A.encode lastPing)
+ handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do
+ res <- atomically $ readTQueue queue
+ case res of
+ Just ping -> WS.sendTextData conn (A.encode ping)
+ Nothing -> do
+ removeSubscriber queue
+ WS.sendClose conn (C8.pack "train ended")
+ where removeSubscriber queue = atomically $ do
+ qs <- readTVar subscribers
+ writeTVar subscribers
+ $ M.adjust (filter (/= queue)) ticketId qs
+
+-- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
+-- => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
+getTicketTrackers ticketId = do
+ joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
+ <&> fmap (trackerTicketTracker . entityVal)
+ selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []