module Server.Subscribe where import Conduit (MonadIO (..)) import Control.Concurrent.STM (atomically, newTQueue, readTQueue, readTVar, writeTVar) import Control.Exception (handle) import Control.Monad.Extra (forever, whenJust) import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as C8 import Data.Functor ((<&>)) import Data.Map (Map) import qualified Data.Map as M import Data.Pool import Data.UUID (UUID) import Database.Persist (Entity (entityKey), SelectOpt (Desc), entityVal, selectFirst, selectList, (<-.), (==.), (||.)) import Database.Persist.Sql (SqlBackend) import qualified Network.WebSockets as WS import Persist import Server.Base (ServerState) import Server.Util (ServiceM) handleSubscribe :: Pool SqlBackend -> ServerState -> UUID -> WS.Connection -> ServiceM () handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do queue <- atomically $ do queue <- newTQueue qs <- readTVar subscribers writeTVar subscribers $ M.insertWith (<>) ticketId [queue] qs pure queue -- send most recent ping, if any (so we won't have to wait for movement) lastPing <- runSqlWithoutLog dbpool $ do trackers <- getTicketTrackers ticketId <&> fmap entityKey selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp] <&> fmap entityVal whenJust lastPing $ \ping -> WS.sendTextData conn (A.encode lastPing) handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do res <- atomically $ readTQueue queue case res of Just ping -> WS.sendTextData conn (A.encode ping) Nothing -> do removeSubscriber queue WS.sendClose conn (C8.pack "train ended") where removeSubscriber queue = atomically $ do qs <- readTVar subscribers writeTVar subscribers $ M.adjust (filter (/= queue)) ticketId qs -- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO))) -- => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker] getTicketTrackers ticketId = do joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] [] <&> fmap (trackerTicketTracker . entityVal) selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []