From d4f4208fe66d3813b65312dac0bf895c4cdc53d6 Mon Sep 17 00:00:00 2001 From: stuebinm Date: Wed, 24 Apr 2024 21:52:45 +0200 Subject: restructure: save a ticket's stop in the database now mostly independent of the gtfs, but still no live-reloading of it. --- lib/Server.hs | 207 +++++++++++++++++++++++++++++++++------------------------- 1 file changed, 119 insertions(+), 88 deletions(-) (limited to 'lib/Server.hs') diff --git a/lib/Server.hs b/lib/Server.hs index c6d2d94..3922a7b 100644 --- a/lib/Server.hs +++ b/lib/Server.hs @@ -8,113 +8,128 @@ -- Implementation of the API. This module is the main point of the program. module Server (application) where -import Control.Concurrent.STM (TQueue, TVar, atomically, - newTQueue, newTVar, newTVarIO, - readTQueue, readTVar, writeTQueue, - writeTVar) -import Control.Monad (forever, unless, void, when) -import Control.Monad.Catch (handle) -import Control.Monad.Extra (ifM, maybeM, unlessM, whenJust, - whenM) -import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (LoggingT, NoLoggingT, logWarnN) -import Control.Monad.Reader (ReaderT, forM) -import Control.Monad.Trans (lift) -import Data.Aeson ((.=)) -import qualified Data.Aeson as A -import qualified Data.ByteString.Char8 as C8 -import Data.Coerce (coerce) -import Data.Functor ((<&>)) -import qualified Data.Map as M -import Data.Pool (Pool) -import Data.Proxy (Proxy (Proxy)) -import Data.Swagger (toSchema) -import Data.Text (Text) -import Data.Text.Encoding (decodeUtf8) -import Data.Time (NominalDiffTime, - UTCTime (utctDay), addUTCTime, - diffUTCTime, getCurrentTime, - nominalDay) -import qualified Data.Vector as V +import Control.Concurrent.STM (TQueue, TVar, atomically, + newTQueue, newTVar, + newTVarIO, readTQueue, + readTVar, writeTQueue, + writeTVar) +import Control.Monad (forever, unless, void, + when) +import Control.Monad.Catch (handle) +import Control.Monad.Extra (ifM, maybeM, unlessM, + whenJust, whenM) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Logger (LoggingT, NoLoggingT, + logWarnN) +import Control.Monad.Reader (ReaderT, forM) +import Control.Monad.Trans (lift) +import Data.Aeson ((.=)) +import qualified Data.Aeson as A +import qualified Data.ByteString.Char8 as C8 +import Data.Coerce (coerce) +import Data.Functor ((<&>)) +import qualified Data.Map as M +import Data.Pool (Pool) +import Data.Proxy (Proxy (Proxy)) +import Data.Swagger (toSchema) +import Data.Text (Text) +import Data.Text.Encoding (decodeUtf8) +import Data.Time (NominalDiffTime, + UTCTime (utctDay), + addUTCTime, diffUTCTime, + getCurrentTime, + nominalDay) +import qualified Data.Vector as V import Database.Persist -import Database.Persist.Postgresql (SqlBackend, runMigration) -import Fmt ((+|), (|+)) -import qualified Network.WebSockets as WS -import Servant (Application, - ServerError (errBody), err401, - err404, serve, - serveDirectoryFileServer, - throwError) -import Servant.API (NoContent (..), (:<|>) (..)) -import Servant.Server (Handler, hoistServer) -import Servant.Swagger (toSwagger) +import Database.Persist.Postgresql (SqlBackend, + migrateEnableExtension, + runMigration) +import Fmt ((+|), (|+)) +import qualified Network.WebSockets as WS +import Servant (Application, + ServerError (errBody), + err401, err404, serve, + serveDirectoryFileServer, + throwError) +import Servant.API (NoContent (..), + (:<|>) (..)) +import Servant.Server (Handler, hoistServer) +import Servant.Swagger (toSwagger) import API -import GTFS +import qualified GTFS import Persist import Server.ControlRoom -import Server.GTFS_RT (gtfsRealtimeServer) -import Server.Util (Service, ServiceM, runService, - sendErrorMsg) -import Yesod (toWaiAppPlain) +import Server.GTFS_RT (gtfsRealtimeServer) +import Server.Util (Service, ServiceM, + runService, sendErrorMsg) +import Yesod (toWaiAppPlain) -import Extrapolation (Extrapolator (..), - LinearExtrapolator (..)) +import Extrapolation (Extrapolator (..), + LinearExtrapolator (..)) import System.IO.Unsafe -import Conduit (ResourceT) -import Config (ServerConfig (serverConfigAssets)) -import Data.ByteString (ByteString) -import Data.ByteString.Lazy (toStrict) -import Data.UUID (UUID) +import Conduit (ResourceT) +import Config (ServerConfig (serverConfigAssets, serverConfigZoneinfoPath)) +import Data.ByteString (ByteString) +import Data.ByteString.Lazy (toStrict) +import qualified Data.Text as T +import Data.Time.LocalTime.TimeZone.Olson (getTimeZoneSeriesFromOlsonFile) +import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries) +import Data.UUID (UUID) import Prometheus import Prometheus.Metric.GHC +import System.FilePath (()) -application :: GTFS -> Pool SqlBackend -> ServerConfig -> IO Application +application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application application gtfs dbpool settings = do doMigration dbpool metrics <- Metrics <$> register (gauge (Info "ws_connections" "Number of WS Connections")) register ghcMetrics + + -- TODO: maybe cache these in a TVar, we're not likely to ever need + -- more than one of these + let getTzseries tzname = getTimeZoneSeriesFromOlsonFile + (serverConfigZoneinfoPath settings T.unpack tzname) + subscribers <- newTVarIO mempty - pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService $ server gtfs metrics subscribers dbpool settings + pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService + $ server gtfs getTzseries metrics subscribers dbpool settings -- databaseMigration :: ConnectionString -> IO () -doMigration pool = runSql pool $ - -- TODO: before that, check if the uuid module is enabled - -- in sql: check if SELECT * FROM pg_extension WHERE extname = 'uuid-ossp'; - -- returns an empty list - runMigration migrateAll - -server :: GTFS -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI -server gtfs@GTFS{..} Metrics{..} subscribers dbpool settings = handleDebugAPI - :<|> (handleStations :<|> handleTimetable :<|> handleTimetableStops :<|> handleTrip +doMigration pool = runSql pool $ runMigration $ do + migrateEnableExtension "uuid-ossp" + migrateAll + +server :: GTFS.GTFS -> (Text -> IO TimeZoneSeries) -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI +server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI + :<|> (handleTimetable :<|> handleTimetableStops :<|> handleTrip :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS :<|> handleSubscribe :<|> handleDebugState :<|> handleDebugTrain - :<|> handleDebugRegister :<|> pure gtfsFile :<|> gtfsRealtimeServer gtfs dbpool) + :<|> handleDebugRegister :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool) :<|> metrics :<|> serveDirectoryFileServer (serverConfigAssets settings) :<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool settings))) - where handleStations = pure stations - handleTimetable station maybeDay = - M.filter isLastStop . tripsOnDay gtfs <$> liftIO day - where isLastStop = (==) station . stationId . stopStation . V.last . tripStops + where handleTimetable station maybeDay = + M.filter isLastStop . GTFS.tripsOnDay gtfs <$> liftIO day + where isLastStop = (==) station . GTFS.stationId . GTFS.stopStation . V.last . GTFS.tripStops day = maybeM (getCurrentTime <&> utctDay) pure (pure maybeDay) handleTimetableStops day = - pure . A.toJSON . fmap mkJson . M.elems $ tripsOnDay gtfs day - where mkJson :: Trip Deep Deep -> A.Value - mkJson Trip {..} = A.object + pure . A.toJSON . fmap mkJson . M.elems $ GTFS.tripsOnDay gtfs day + where mkJson :: GTFS.Trip GTFS.Deep GTFS.Deep -> A.Value + mkJson GTFS.Trip {..} = A.object [ "trip" .= tripTripId - , "sequencelength" .= (stopSequence . V.last) tripStops - , "stops" .= fmap (\Stop{..} -> A.object - [ "departure" .= toUTC stopDeparture tzseries day - , "arrival" .= toUTC stopArrival tzseries day - , "station" .= stationId stopStation - , "lat" .= stationLat stopStation - , "lon" .= stationLon stopStation + , "sequencelength" .= (GTFS.stopSequence . V.last) tripStops + , "stops" .= fmap (\GTFS.Stop{..} -> A.object + [ "departure" .= GTFS.toUTC stopDeparture (GTFS.tzseries gtfs) day + , "arrival" .= GTFS.toUTC stopArrival (GTFS.tzseries gtfs) day + , "station" .= GTFS.stationId stopStation + , "lat" .= GTFS.stationLat stopStation + , "lon" .= GTFS.stationLon stopStation ]) tripStops ] - handleTrip trip = case M.lookup trip trips of + handleTrip trip = case M.lookup trip (GTFS.trips gtfs) of Just res -> pure res Nothing -> throwError err404 handleRegister (ticketId :: UUID) RegisterJson{..} = do @@ -130,26 +145,41 @@ server gtfs@GTFS{..} Metrics{..} subscribers dbpool settings = handleDebugAPI TrackerKey tracker <- insert (Tracker expires False "debug key") insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker)) pure tracker - handleTrainPing onError ping@TrainPing{..} = isTokenValid dbpool trainPingToken trainPingTicket - >>= \case + handleTrainPing onError ping@TrainPing{..} = + let ticketId = trainPingTicket in + isTokenValid dbpool trainPingToken ticketId >>= \case Nothing -> do onError pure Nothing Just (tracker@Tracker{..}, ticket@Ticket{..}) -> do - let anchor = extrapolateAnchorFromPing LinearExtrapolator gtfs ticket ping - -- TODO: are these always inserted in order? runSql dbpool $ do + + stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival] + >>= mapM (\stop -> do + station <- getJust (stopStation (entityVal stop)) + tzseries <- liftIO $ getTzseries (GTFS.tzname (stopArrival (entityVal stop))) + pure (entityVal stop, station, tzseries)) + <&> V.fromList + + shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex] + <&> (V.fromList . fmap entityVal) + + let anchor = extrapolateAnchorFromPing LinearExtrapolator + ticket stations shapePoints ping + insert ping + last <- selectFirst [TrainAnchorTicket ==. trainPingTicket] [Desc TrainAnchorWhen] -- only insert new estimates if they've actually changed anything when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor)) $ void $ insert anchor - queues <- liftIO $ atomically $ do - queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket) - whenJust queues $ - mapM_ (\q -> writeTQueue q (Just ping)) - pure queues - pure (Just anchor) + + queues <- liftIO $ atomically $ do + queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket) + whenJust queues $ + mapM_ (\q -> writeTQueue q (Just ping)) + pure queues + pure (Just anchor) handleWS conn = do liftIO $ WS.forkPingThread conn 30 incGauge metricsWSGauge @@ -233,3 +263,4 @@ hasExpired limit = do validityPeriod :: NominalDiffTime validityPeriod = nominalDay + -- cgit v1.2.3