aboutsummaryrefslogtreecommitdiff
path: root/lib/Server.hs
diff options
context:
space:
mode:
authorstuebinm2024-04-24 21:52:45 +0200
committerstuebinm2024-04-24 21:52:45 +0200
commitd4f4208fe66d3813b65312dac0bf895c4cdc53d6 (patch)
tree698592178936900ae76985f5e1b3cdf72123afb4 /lib/Server.hs
parent607b9486a81ed6cb65d30227aeecea3412bd1ccd (diff)
restructure: save a ticket's stop in the database
now mostly independent of the gtfs, but still no live-reloading of it.
Diffstat (limited to 'lib/Server.hs')
-rw-r--r--lib/Server.hs207
1 files changed, 119 insertions, 88 deletions
diff --git a/lib/Server.hs b/lib/Server.hs
index c6d2d94..3922a7b 100644
--- a/lib/Server.hs
+++ b/lib/Server.hs
@@ -8,113 +8,128 @@
-- Implementation of the API. This module is the main point of the program.
module Server (application) where
-import Control.Concurrent.STM (TQueue, TVar, atomically,
- newTQueue, newTVar, newTVarIO,
- readTQueue, readTVar, writeTQueue,
- writeTVar)
-import Control.Monad (forever, unless, void, when)
-import Control.Monad.Catch (handle)
-import Control.Monad.Extra (ifM, maybeM, unlessM, whenJust,
- whenM)
-import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (LoggingT, NoLoggingT, logWarnN)
-import Control.Monad.Reader (ReaderT, forM)
-import Control.Monad.Trans (lift)
-import Data.Aeson ((.=))
-import qualified Data.Aeson as A
-import qualified Data.ByteString.Char8 as C8
-import Data.Coerce (coerce)
-import Data.Functor ((<&>))
-import qualified Data.Map as M
-import Data.Pool (Pool)
-import Data.Proxy (Proxy (Proxy))
-import Data.Swagger (toSchema)
-import Data.Text (Text)
-import Data.Text.Encoding (decodeUtf8)
-import Data.Time (NominalDiffTime,
- UTCTime (utctDay), addUTCTime,
- diffUTCTime, getCurrentTime,
- nominalDay)
-import qualified Data.Vector as V
+import Control.Concurrent.STM (TQueue, TVar, atomically,
+ newTQueue, newTVar,
+ newTVarIO, readTQueue,
+ readTVar, writeTQueue,
+ writeTVar)
+import Control.Monad (forever, unless, void,
+ when)
+import Control.Monad.Catch (handle)
+import Control.Monad.Extra (ifM, maybeM, unlessM,
+ whenJust, whenM)
+import Control.Monad.IO.Class (MonadIO (liftIO))
+import Control.Monad.Logger (LoggingT, NoLoggingT,
+ logWarnN)
+import Control.Monad.Reader (ReaderT, forM)
+import Control.Monad.Trans (lift)
+import Data.Aeson ((.=))
+import qualified Data.Aeson as A
+import qualified Data.ByteString.Char8 as C8
+import Data.Coerce (coerce)
+import Data.Functor ((<&>))
+import qualified Data.Map as M
+import Data.Pool (Pool)
+import Data.Proxy (Proxy (Proxy))
+import Data.Swagger (toSchema)
+import Data.Text (Text)
+import Data.Text.Encoding (decodeUtf8)
+import Data.Time (NominalDiffTime,
+ UTCTime (utctDay),
+ addUTCTime, diffUTCTime,
+ getCurrentTime,
+ nominalDay)
+import qualified Data.Vector as V
import Database.Persist
-import Database.Persist.Postgresql (SqlBackend, runMigration)
-import Fmt ((+|), (|+))
-import qualified Network.WebSockets as WS
-import Servant (Application,
- ServerError (errBody), err401,
- err404, serve,
- serveDirectoryFileServer,
- throwError)
-import Servant.API (NoContent (..), (:<|>) (..))
-import Servant.Server (Handler, hoistServer)
-import Servant.Swagger (toSwagger)
+import Database.Persist.Postgresql (SqlBackend,
+ migrateEnableExtension,
+ runMigration)
+import Fmt ((+|), (|+))
+import qualified Network.WebSockets as WS
+import Servant (Application,
+ ServerError (errBody),
+ err401, err404, serve,
+ serveDirectoryFileServer,
+ throwError)
+import Servant.API (NoContent (..),
+ (:<|>) (..))
+import Servant.Server (Handler, hoistServer)
+import Servant.Swagger (toSwagger)
import API
-import GTFS
+import qualified GTFS
import Persist
import Server.ControlRoom
-import Server.GTFS_RT (gtfsRealtimeServer)
-import Server.Util (Service, ServiceM, runService,
- sendErrorMsg)
-import Yesod (toWaiAppPlain)
+import Server.GTFS_RT (gtfsRealtimeServer)
+import Server.Util (Service, ServiceM,
+ runService, sendErrorMsg)
+import Yesod (toWaiAppPlain)
-import Extrapolation (Extrapolator (..),
- LinearExtrapolator (..))
+import Extrapolation (Extrapolator (..),
+ LinearExtrapolator (..))
import System.IO.Unsafe
-import Conduit (ResourceT)
-import Config (ServerConfig (serverConfigAssets))
-import Data.ByteString (ByteString)
-import Data.ByteString.Lazy (toStrict)
-import Data.UUID (UUID)
+import Conduit (ResourceT)
+import Config (ServerConfig (serverConfigAssets, serverConfigZoneinfoPath))
+import Data.ByteString (ByteString)
+import Data.ByteString.Lazy (toStrict)
+import qualified Data.Text as T
+import Data.Time.LocalTime.TimeZone.Olson (getTimeZoneSeriesFromOlsonFile)
+import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries)
+import Data.UUID (UUID)
import Prometheus
import Prometheus.Metric.GHC
+import System.FilePath ((</>))
-application :: GTFS -> Pool SqlBackend -> ServerConfig -> IO Application
+application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application
application gtfs dbpool settings = do
doMigration dbpool
metrics <- Metrics
<$> register (gauge (Info "ws_connections" "Number of WS Connections"))
register ghcMetrics
+
+ -- TODO: maybe cache these in a TVar, we're not likely to ever need
+ -- more than one of these
+ let getTzseries tzname = getTimeZoneSeriesFromOlsonFile
+ (serverConfigZoneinfoPath settings </> T.unpack tzname)
+
subscribers <- newTVarIO mempty
- pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService $ server gtfs metrics subscribers dbpool settings
+ pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService
+ $ server gtfs getTzseries metrics subscribers dbpool settings
-- databaseMigration :: ConnectionString -> IO ()
-doMigration pool = runSql pool $
- -- TODO: before that, check if the uuid module is enabled
- -- in sql: check if SELECT * FROM pg_extension WHERE extname = 'uuid-ossp';
- -- returns an empty list
- runMigration migrateAll
-
-server :: GTFS -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI
-server gtfs@GTFS{..} Metrics{..} subscribers dbpool settings = handleDebugAPI
- :<|> (handleStations :<|> handleTimetable :<|> handleTimetableStops :<|> handleTrip
+doMigration pool = runSql pool $ runMigration $ do
+ migrateEnableExtension "uuid-ossp"
+ migrateAll
+
+server :: GTFS.GTFS -> (Text -> IO TimeZoneSeries) -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI
+server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
+ :<|> (handleTimetable :<|> handleTimetableStops :<|> handleTrip
:<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS
:<|> handleSubscribe :<|> handleDebugState :<|> handleDebugTrain
- :<|> handleDebugRegister :<|> pure gtfsFile :<|> gtfsRealtimeServer gtfs dbpool)
+ :<|> handleDebugRegister :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
:<|> metrics
:<|> serveDirectoryFileServer (serverConfigAssets settings)
:<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool settings)))
- where handleStations = pure stations
- handleTimetable station maybeDay =
- M.filter isLastStop . tripsOnDay gtfs <$> liftIO day
- where isLastStop = (==) station . stationId . stopStation . V.last . tripStops
+ where handleTimetable station maybeDay =
+ M.filter isLastStop . GTFS.tripsOnDay gtfs <$> liftIO day
+ where isLastStop = (==) station . GTFS.stationId . GTFS.stopStation . V.last . GTFS.tripStops
day = maybeM (getCurrentTime <&> utctDay) pure (pure maybeDay)
handleTimetableStops day =
- pure . A.toJSON . fmap mkJson . M.elems $ tripsOnDay gtfs day
- where mkJson :: Trip Deep Deep -> A.Value
- mkJson Trip {..} = A.object
+ pure . A.toJSON . fmap mkJson . M.elems $ GTFS.tripsOnDay gtfs day
+ where mkJson :: GTFS.Trip GTFS.Deep GTFS.Deep -> A.Value
+ mkJson GTFS.Trip {..} = A.object
[ "trip" .= tripTripId
- , "sequencelength" .= (stopSequence . V.last) tripStops
- , "stops" .= fmap (\Stop{..} -> A.object
- [ "departure" .= toUTC stopDeparture tzseries day
- , "arrival" .= toUTC stopArrival tzseries day
- , "station" .= stationId stopStation
- , "lat" .= stationLat stopStation
- , "lon" .= stationLon stopStation
+ , "sequencelength" .= (GTFS.stopSequence . V.last) tripStops
+ , "stops" .= fmap (\GTFS.Stop{..} -> A.object
+ [ "departure" .= GTFS.toUTC stopDeparture (GTFS.tzseries gtfs) day
+ , "arrival" .= GTFS.toUTC stopArrival (GTFS.tzseries gtfs) day
+ , "station" .= GTFS.stationId stopStation
+ , "lat" .= GTFS.stationLat stopStation
+ , "lon" .= GTFS.stationLon stopStation
]) tripStops
]
- handleTrip trip = case M.lookup trip trips of
+ handleTrip trip = case M.lookup trip (GTFS.trips gtfs) of
Just res -> pure res
Nothing -> throwError err404
handleRegister (ticketId :: UUID) RegisterJson{..} = do
@@ -130,26 +145,41 @@ server gtfs@GTFS{..} Metrics{..} subscribers dbpool settings = handleDebugAPI
TrackerKey tracker <- insert (Tracker expires False "debug key")
insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker))
pure tracker
- handleTrainPing onError ping@TrainPing{..} = isTokenValid dbpool trainPingToken trainPingTicket
- >>= \case
+ handleTrainPing onError ping@TrainPing{..} =
+ let ticketId = trainPingTicket in
+ isTokenValid dbpool trainPingToken ticketId >>= \case
Nothing -> do
onError
pure Nothing
Just (tracker@Tracker{..}, ticket@Ticket{..}) -> do
- let anchor = extrapolateAnchorFromPing LinearExtrapolator gtfs ticket ping
- -- TODO: are these always inserted in order?
runSql dbpool $ do
+
+ stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
+ >>= mapM (\stop -> do
+ station <- getJust (stopStation (entityVal stop))
+ tzseries <- liftIO $ getTzseries (GTFS.tzname (stopArrival (entityVal stop)))
+ pure (entityVal stop, station, tzseries))
+ <&> V.fromList
+
+ shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex]
+ <&> (V.fromList . fmap entityVal)
+
+ let anchor = extrapolateAnchorFromPing LinearExtrapolator
+ ticket stations shapePoints ping
+
insert ping
+
last <- selectFirst [TrainAnchorTicket ==. trainPingTicket] [Desc TrainAnchorWhen]
-- only insert new estimates if they've actually changed anything
when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor))
$ void $ insert anchor
- queues <- liftIO $ atomically $ do
- queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket)
- whenJust queues $
- mapM_ (\q -> writeTQueue q (Just ping))
- pure queues
- pure (Just anchor)
+
+ queues <- liftIO $ atomically $ do
+ queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket)
+ whenJust queues $
+ mapM_ (\q -> writeTQueue q (Just ping))
+ pure queues
+ pure (Just anchor)
handleWS conn = do
liftIO $ WS.forkPingThread conn 30
incGauge metricsWSGauge
@@ -233,3 +263,4 @@ hasExpired limit = do
validityPeriod :: NominalDiffTime
validityPeriod = nominalDay
+