{-# LANGUAGE BlockArguments #-} module Server.Subscribe where import Conduit (MonadIO (..)) import Control.Concurrent.STM (atomically, newTQueue, readTQueue, readTVar, writeTVar) import Control.Exception (handle) import Control.Monad.Extra (forever, whenJust) import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as C8 import Data.Coerce (coerce) import Data.Functor ((<&>)) import Data.Map (Map) import qualified Data.Map as M import Data.Pool import Data.UUID (UUID) import Database.Esqueleto.Experimental hiding ((<&>)) import Database.Persist.Sql (SqlBackend) import qualified Network.WebSockets as WS import Persist import Server.Base (ServerState) import Server.Util (ServiceM) handleSubscribe :: Pool SqlBackend -> ServerState -> UUID -> WS.Connection -> ServiceM () handleSubscribe dbpool subscribers (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do queue <- atomically $ do queue <- newTQueue qs <- readTVar subscribers writeTVar subscribers $ M.insertWith (<>) ticketId [queue] qs pure queue -- send most recent ping, if any (so we won't have to wait for movement) runSqlWithoutLog dbpool (selectOne do ping <- from (table @Ping) where_ (ping ^. PingTicket ==. val (Just (coerce ticketId))) orderBy [desc (ping ^. PingTimestamp)] pure ping) <&> fmap entityVal >>= flip whenJust (WS.sendTextData conn . A.encode) handle (\(e :: WS.ConnectionException) -> removeSubscriber queue) $ forever $ do res <- atomically $ readTQueue queue case res of Just ping -> WS.sendTextData conn (A.encode ping) Nothing -> do removeSubscriber queue WS.sendClose conn (C8.pack "train ended") where removeSubscriber queue = atomically $ do qs <- readTVar subscribers writeTVar subscribers $ M.adjust (filter (/= queue)) ticketId qs -- getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO))) -- => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker] getTicketTrackers ticketId = select do (tracker :& trackerticket) <- from $ table @Tracker `innerJoin` table @TrackerTicket `on` \(tr :& ti) -> tr ^. TrackerId ==. ti ^. TrackerTicketTracker where_ $ tracker ^. TrackerCurrentTicket ==. val (Just (TicketKey ticketId)) ||. trackerticket ^. TrackerTicketTicket ==. val (TicketKey ticketId) pure tracker -- joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] [] -- <&> fmap (trackerTicketTracker . entityVal) -- selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []