From ad8a09cafa519a15a22cafbfd2fa289538edc73d Mon Sep 17 00:00:00 2001 From: stuebinm Date: Wed, 8 May 2024 22:42:35 +0200 Subject: restructure: split up the server module --- lib/Server/Subscribe.hs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 lib/Server/Subscribe.hs (limited to 'lib/Server/Subscribe.hs') diff --git a/lib/Server/Subscribe.hs b/lib/Server/Subscribe.hs new file mode 100644 index 0000000..fdc092b --- /dev/null +++ b/lib/Server/Subscribe.hs @@ -0,0 +1,63 @@ + +module Server.Subscribe where +import Conduit (MonadIO (..)) +import Control.Concurrent.STM (atomically, newTQueue, readTQueue, + readTVar, writeTVar) +import Control.Exception (handle) +import Control.Monad.Extra (forever, whenJust) +import qualified Data.Aeson as A +import qualified Data.ByteString.Char8 as C8 +import Data.Functor ((<&>)) +import Data.Map (Map) +import qualified Data.Map as M +import Data.Pool +import Data.UUID (UUID) +import Database.Persist (Entity (entityKey), SelectOpt (Desc), + entityVal, selectFirst, selectList, + (<-.), (==.), (||.)) +import Database.Persist.Sql (SqlBackend) +import qualified Network.WebSockets as WS +import Persist +import Server.Base (ServerState) +import Server.Util (ServiceM) + + +handleSubscribe + :: Pool SqlBackend + -> ServerState + -> UUID + -> WS.Connection + -> ServiceM () +handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do + queue <- atomically $ do + queue <- newTQueue + qs <- readTVar subscribers + writeTVar subscribers + $ M.insertWith (<>) ticketId [queue] qs + pure queue + -- send most recent ping, if any (so we won't have to wait for movement) + lastPing <- runSqlWithoutLog dbpool $ do + trackers <- getTicketTrackers ticketId + <&> fmap entityKey + selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp] + <&> fmap entityVal + whenJust lastPing $ \ping -> + WS.sendTextData conn (A.encode lastPing) + handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do + res <- atomically $ readTQueue queue + case res of + Just ping -> WS.sendTextData conn (A.encode ping) + Nothing -> do + removeSubscriber queue + WS.sendClose conn (C8.pack "train ended") + where removeSubscriber queue = atomically $ do + qs <- readTVar subscribers + writeTVar subscribers + $ M.adjust (filter (/= queue)) ticketId qs + +-- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO))) +-- => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker] +getTicketTrackers ticketId = do + joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] [] + <&> fmap (trackerTicketTracker . entityVal) + selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) [] -- cgit v1.2.3