diff options
author | stuebinm | 2024-05-01 03:07:06 +0200 |
---|---|---|
committer | stuebinm | 2024-05-02 00:18:16 +0200 |
commit | 80984549172d7de83564757de80996487ca2fb15 (patch) | |
tree | 1e4bfe43fa9fc96fa5642fe34f502005775f257f | |
parent | b26a3d617e90c9693a4ee8b7cc8bbba506cd4746 (diff) |
restructure: get the tracker to work again
This should hopefully be the final (major) part of the restructuring: a
tracker no longer has to know which trip it is on (and indeed it has no
idea for now), instead the server keeps state about which trips are
currently running and will insert incoming pings in a hopefully
reasonable manner, based on their geoposition & time.
There's lots of associated TODO items here (especially there should be
manual overrides for all this logic in the web ui), but that's work for
a future me.
(incidentally, this also adds support for sending all log messages out
via ntfy-sh)
Diffstat (limited to '')
-rw-r--r-- | lib/API.hs | 54 | ||||
-rw-r--r-- | lib/Config.hs | 20 | ||||
-rw-r--r-- | lib/Extrapolation.hs | 9 | ||||
-rw-r--r-- | lib/Persist.hs | 38 | ||||
-rw-r--r-- | lib/Server.hs | 179 | ||||
-rw-r--r-- | lib/Server/ControlRoom.hs | 39 | ||||
-rw-r--r-- | lib/Server/GTFS_RT.hs | 15 | ||||
-rw-r--r-- | lib/Server/Util.hs | 49 | ||||
-rw-r--r-- | messages/en.msg | 1 | ||||
-rw-r--r-- | site/tracker.hamlet | 137 | ||||
-rw-r--r-- | tracktrain.cabal | 1 |
11 files changed, 407 insertions, 135 deletions
@@ -39,7 +39,8 @@ import Data.Aeson (FromJSON (..), Value, import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as LB import Data.HashMap.Strict.InsOrd (singleton) -import Data.ProtoLens (Message, encodeMessage) +import Data.ProtoLens (Message (messageName), + encodeMessage) import GHC.Generics (Generic) import GTFS (Depth (Deep), GTFSFile (..), StationID, Trip, TripId, @@ -50,45 +51,24 @@ import Prometheus import Proto.GtfsRealtime (FeedMessage) import Servant.API.ContentTypes (Accept (..)) -newtype RegisterJson = RegisterJson - { registerAgent :: Text } - deriving (Show, Generic) - -instance FromJSON RegisterJson where - parseJSON = genericParseJSON (aesonOptions "register") -instance ToSchema RegisterJson where - declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register") -instance ToSchema Value where - declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty - & type_ ?~ SwaggerObject - --- | The server's API (as it is actually intended). -type API = "timetable" :> Capture "Station Id" StationID :> QueryParam "day" Day :> Get '[JSON] (Map TripId (Trip Deep Deep)) - :<|> "timetable" :> "stops" :> Capture "Date" Day :> Get '[JSON] Value - :<|> "trip" :> Capture "Trip Id" TripId :> Get '[JSON] (Trip Deep Deep) +-- | tracktrain's API +type API = -- ingress API (put this behind BasicAuth?) -- TODO: perhaps require a first ping for registration? - :<|> "train" :> "register" :> Capture "Ticket Id" UUID :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token - -- TODO: perhaps a websocket instead? - :<|> "train" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor) - :<|> "train" :> "ping" :> "ws" :> WebSocket - :<|> "train" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket - -- debug things + "tracker" :> "register" :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token + :<|> "tracker" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor) + :<|> "tracker" :> "ping" :> "ws" :> WebSocket + :<|> "ticket" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket :<|> "debug" :> "pings" :> Get '[JSON] (Map Token [TrainPing]) :<|> "debug" :> "pings" :> Capture "Ticket Id" UUID :> Get '[JSON] [TrainPing] - :<|> "debug" :> "register" :> Capture "Ticket Id" UUID :> Post '[JSON] Token :<|> "gtfs.zip" :> Get '[OctetStream] GTFSFile :<|> "gtfs" :> GtfsRealtimeAPI --- | The API used for publishing gtfs realtime updates type GtfsRealtimeAPI = "servicealerts" :> Get '[Proto] FeedMessage :<|> "tripupdates" :> Get '[Proto] FeedMessage :<|> "vehiclepositions" :> Get '[Proto] FeedMessage --- | The server's API with an additional debug route for accessing the specification --- itself. Split from API to prevent the API documenting the format in which it is --- documented, which would be silly and way to verbose. type CompleteAPI = "api" :> "openapi" :> Get '[JSON] Swagger :<|> "api" :> API @@ -103,6 +83,18 @@ data Metrics = Metrics instance MimeRender OctetStream GTFSFile where mimeRender p (GTFSFile bytes) = mimeRender p bytes +newtype RegisterJson = RegisterJson + { registerAgent :: Text } + deriving (Show, Generic) + +instance FromJSON RegisterJson where + parseJSON = genericParseJSON (aesonOptions "register") +instance ToSchema RegisterJson where + declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register") +instance ToSchema Value where + declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty + & type_ ?~ SwaggerObject + -- TODO write something useful here! (and if it's just "hey this is some websocket thingie") @@ -128,7 +120,7 @@ instance Accept Proto where instance Message msg => MimeRender Proto msg where mimeRender _ = LB.fromStrict . encodeMessage --- TODO: this instance is horrible; ideally it should at least include --- the name of the message type (if at all possible) +-- | Not an ideal instance, hides fields of the protobuf message instance {-# OVERLAPPABLE #-} Message msg => ToSchema msg where - declareNamedSchema _ = declareNamedSchema (Proxy @String) + declareNamedSchema proxy = + pure (NamedSchema (Just (messageName proxy)) mempty) diff --git a/lib/Config.hs b/lib/Config.hs index 2349e66..94fdd28 100644 --- a/lib/Config.hs +++ b/lib/Config.hs @@ -1,10 +1,11 @@ {-# LANGUAGE RecordWildCards #-} -module Config where +module Config (UffdConfig(..), ServerConfig(..), LoggingConfig(..)) where import Conferer (DefaultConfig (configDef)) import Conferer.FromConfig import Conferer.FromConfig.Warp () import Data.ByteString (ByteString) +import Data.Functor ((<&>)) import Data.Text (Text) import GHC.Generics (Generic) import Network.Wai.Handler.Warp (Settings) @@ -24,6 +25,13 @@ data ServerConfig = ServerConfig , serverConfigAssets :: FilePath , serverConfigZoneinfoPath :: FilePath , serverConfigLogin :: UffdConfig + , serverConfigLogging :: LoggingConfig + } deriving (Generic) + +data LoggingConfig = LoggingConfig + { loggingConfigNtfyToken :: Maybe Text + , loggingConfigNtfyTopic :: Text + , loggingConfigHostname :: Text } deriving (Generic) instance FromConfig ServerConfig @@ -36,6 +44,7 @@ instance DefaultConfig ServerConfig where , serverConfigAssets = "./assets" , serverConfigZoneinfoPath = "/etc/zoneinfo/" , serverConfigLogin = configDef + , serverConfigLogging = configDef } instance DefaultConfig UffdConfig where @@ -50,3 +59,12 @@ instance FromConfig UffdConfig where uffdConfigClientSecret <- fetchFromConfig (key /. "clientSecret") config uffdConfigEnable <- fetchFromConfig (key /. "enable") config pure UffdConfig {..} + +instance FromConfig LoggingConfig where + fromConfig key config = LoggingConfig + <$> fetchFromConfig (key /. "ntfyToken") config + <*> fetchFromConfig (key /. "ntfyTopic") config + <*> fetchFromConfig (key /. "name") config + +instance DefaultConfig LoggingConfig where + configDef = LoggingConfig Nothing "tracktrain" "" diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs index 01e5f6f..389d047 100644 --- a/lib/Extrapolation.hs +++ b/lib/Extrapolation.hs @@ -27,8 +27,8 @@ import GTFS (Seconds (..), import Persist (Geopos (..), ShapePoint (shapePointGeopos), Station (..), Stop (..), - Ticket (..), Token (..), - Tracker (..), + Ticket (..), TicketId, + Token (..), Tracker (..), TrainAnchor (..), TrainPing (..)) import Server.Util (utcToSeconds) @@ -40,6 +40,7 @@ class Extrapolator strategy where -- | here's a position ping, guess things from that! extrapolateAnchorFromPing :: strategy + -> TicketId -> Ticket -> V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint @@ -68,9 +69,9 @@ instance Extrapolator LinearExtrapolator where $ NE.filter (\a -> trainAnchorSequence a < positionNow) history where difference status = positionNow - trainAnchorSequence status - extrapolateAnchorFromPing _ Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor + extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor { trainAnchorCreated = trainPingTimestamp - , trainAnchorTicket = trainPingTicket + , trainAnchorTicket = ticketId , trainAnchorWhen = utcToSeconds trainPingTimestamp ticketDay , trainAnchorSequence , trainAnchorDelay diff --git a/lib/Persist.hs b/lib/Persist.hs index 7613fd9..46e5ef4 100644 --- a/lib/Persist.hs +++ b/lib/Persist.hs @@ -21,13 +21,19 @@ import Database.Persist.Sql (PersistFieldSql (..), import Database.Persist.TH import qualified GTFS import PersistOrphans -import Servant (FromHttpApiData (..), +import Servant (FromHttpApiData (..), Handler, ToHttpApiData) -import Conduit (ResourceT) +import Conduit (MonadTrans (lift), MonadUnliftIO, + ResourceT, runResourceT) +import Config (LoggingConfig) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (NoLoggingT) -import Control.Monad.Reader (ReaderT) +import Control.Monad.Logger (LoggingT, MonadLogger, NoLoggingT, + runNoLoggingT, runStderrLoggingT) +import Control.Monad.Reader (MonadReader (ask), + ReaderT (runReaderT), runReader) +import Control.Monad.Trans.Control (MonadBaseControl (liftBaseWith), + MonadTransControl (liftWith, restoreT)) import Data.Data (Proxy (..)) import Data.Map (Map) import Data.Pool (Pool) @@ -37,10 +43,11 @@ import Data.Time (NominalDiffTime, TimeOfDay, getCurrentTime, nominalDay) import Data.Time.Calendar (Day, DayOfWeek (..)) import Data.Vector (Vector) -import Database.Persist.Postgresql (SqlBackend) +import Database.Persist.Postgresql (SqlBackend, runSqlPool) import Fmt import GHC.Generics (Generic) import MultiLangText (MultiLangText) +import Server.Util (runLogging) import Web.PathPieces (PathPiece) import Yesod (Lang) @@ -120,6 +127,7 @@ Tracker sql=tt_tracker_token expires UTCTime blocked Bool agent Text + currentTicket TicketId Maybe deriving Eq Show Generic TrackerTicket @@ -130,7 +138,7 @@ TrackerTicket -- raw frames as received from OBUs TrainPing json sql=tt_trip_ping - ticket TicketId + -- ticket TicketId token TrackerId geopos Geopos timestamp UTCTime @@ -169,5 +177,19 @@ instance ToSchema TrainAnchor where instance ToSchema Announcement where declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "announcement") -runSql :: MonadIO m => Pool SqlBackend -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a -> m a -runSql pool = liftIO . flip runSqlPersistMPool pool +runSqlWithoutLog :: MonadIO m + => Pool SqlBackend + -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a + -> m a +runSqlWithoutLog pool = liftIO . flip runSqlPersistMPool pool + +-- It's a bit unfortunate that we have an extra reader here for just the +-- logging config, but since Handler is not MonadUnliftIO there seems to be (?) +-- no better way than to nest logger monads … +runSql :: (MonadLogger m, MonadIO m, MonadReader LoggingConfig m) + => Pool SqlBackend + -> ReaderT SqlBackend (LoggingT (ResourceT IO)) a + -> m a +runSql pool x = do + conf <- ask + liftIO $ runResourceT $ runLogging conf $ runSqlPool x pool diff --git a/lib/Server.hs b/lib/Server.hs index 3922a7b..73c55cb 100644 --- a/lib/Server.hs +++ b/lib/Server.hs @@ -16,12 +16,14 @@ import Control.Concurrent.STM (TQueue, TVar, atomically, import Control.Monad (forever, unless, void, when) import Control.Monad.Catch (handle) -import Control.Monad.Extra (ifM, maybeM, unlessM, - whenJust, whenM) +import Control.Monad.Extra (ifM, mapMaybeM, maybeM, + unlessM, whenJust, whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (LoggingT, NoLoggingT, +import Control.Monad.Logger (LoggingT, MonadLogger, + NoLoggingT, logInfoN, logWarnN) -import Control.Monad.Reader (ReaderT, forM) +import Control.Monad.Reader (MonadReader, ReaderT, + forM) import Control.Monad.Trans (lift) import Data.Aeson ((.=)) import qualified Data.Aeson as A @@ -33,7 +35,7 @@ import Data.Pool (Pool) import Data.Proxy (Proxy (Proxy)) import Data.Swagger (toSchema) import Data.Text (Text) -import Data.Text.Encoding (decodeUtf8) +import Data.Text.Encoding (decodeASCII, decodeUtf8) import Data.Time (NominalDiffTime, UTCTime (utctDay), addUTCTime, diffUTCTime, @@ -48,7 +50,8 @@ import Fmt ((+|), (|+)) import qualified Network.WebSockets as WS import Servant (Application, ServerError (errBody), - err401, err404, serve, + err400, err401, err404, + serve, serveDirectoryFileServer, throwError) import Servant.API (NoContent (..), @@ -62,24 +65,33 @@ import Persist import Server.ControlRoom import Server.GTFS_RT (gtfsRealtimeServer) import Server.Util (Service, ServiceM, - runService, sendErrorMsg) + runService, sendErrorMsg, + utcToSeconds) import Yesod (toWaiAppPlain) -import Extrapolation (Extrapolator (..), - LinearExtrapolator (..)) -import System.IO.Unsafe - import Conduit (ResourceT) -import Config (ServerConfig (serverConfigAssets, serverConfigZoneinfoPath)) +import Config (LoggingConfig, + ServerConfig (..)) +import Control.Exception (throw) import Data.ByteString (ByteString) import Data.ByteString.Lazy (toStrict) +import Data.Foldable (minimumBy) +import Data.Function (on, (&)) +import Data.Maybe (fromMaybe) import qualified Data.Text as T import Data.Time.LocalTime.TimeZone.Olson (getTimeZoneSeriesFromOlsonFile) import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries) import Data.UUID (UUID) +import qualified Data.UUID as UUID +import Extrapolation (Extrapolator (..), + LinearExtrapolator (..), + euclid) +import GTFS (Seconds (unSeconds), + seconds2Double) import Prometheus import Prometheus.Metric.GHC import System.FilePath ((</>)) +import System.IO.Unsafe application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application application gtfs dbpool settings = do @@ -94,65 +106,84 @@ application gtfs dbpool settings = do (serverConfigZoneinfoPath settings </> T.unpack tzname) subscribers <- newTVarIO mempty - pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService + pure $ serve (Proxy @CompleteAPI) + $ hoistServer (Proxy @CompleteAPI) (runService (serverConfigLogging settings)) $ server gtfs getTzseries metrics subscribers dbpool settings -- databaseMigration :: ConnectionString -> IO () -doMigration pool = runSql pool $ runMigration $ do +doMigration pool = runSqlWithoutLog pool $ runMigration $ do migrateEnableExtension "uuid-ossp" migrateAll server :: GTFS.GTFS -> (Text -> IO TimeZoneSeries) -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI - :<|> (handleTimetable :<|> handleTimetableStops :<|> handleTrip - :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS + :<|> (handleTrackerRegister :<|> handleTrainPing (throwError err401) :<|> handleWS :<|> handleSubscribe :<|> handleDebugState :<|> handleDebugTrain - :<|> handleDebugRegister :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool) + :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool) :<|> metrics :<|> serveDirectoryFileServer (serverConfigAssets settings) :<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool settings))) - where handleTimetable station maybeDay = - M.filter isLastStop . GTFS.tripsOnDay gtfs <$> liftIO day - where isLastStop = (==) station . GTFS.stationId . GTFS.stopStation . V.last . GTFS.tripStops - day = maybeM (getCurrentTime <&> utctDay) pure (pure maybeDay) - handleTimetableStops day = - pure . A.toJSON . fmap mkJson . M.elems $ GTFS.tripsOnDay gtfs day - where mkJson :: GTFS.Trip GTFS.Deep GTFS.Deep -> A.Value - mkJson GTFS.Trip {..} = A.object - [ "trip" .= tripTripId - , "sequencelength" .= (GTFS.stopSequence . V.last) tripStops - , "stops" .= fmap (\GTFS.Stop{..} -> A.object - [ "departure" .= GTFS.toUTC stopDeparture (GTFS.tzseries gtfs) day - , "arrival" .= GTFS.toUTC stopArrival (GTFS.tzseries gtfs) day - , "station" .= GTFS.stationId stopStation - , "lat" .= GTFS.stationLat stopStation - , "lon" .= GTFS.stationLon stopStation - ]) tripStops - ] - handleTrip trip = case M.lookup trip (GTFS.trips gtfs) of - Just res -> pure res - Nothing -> throwError err404 - handleRegister (ticketId :: UUID) RegisterJson{..} = do + where handleTrackerRegister RegisterJson{..} = do today <- liftIO getCurrentTime <&> utctDay expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod runSql dbpool $ do - TrackerKey tracker <- insert (Tracker expires False registerAgent) - insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker)) - pure tracker - handleDebugRegister (ticketId :: UUID) = do - expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod - runSql dbpool $ do - TrackerKey tracker <- insert (Tracker expires False "debug key") - insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker)) + TrackerKey tracker <- insert (Tracker expires False registerAgent Nothing) pure tracker handleTrainPing onError ping@TrainPing{..} = - let ticketId = trainPingTicket in - isTokenValid dbpool trainPingToken ticketId >>= \case - Nothing -> do - onError - pure Nothing - Just (tracker@Tracker{..}, ticket@Ticket{..}) -> do + isTokenValid dbpool trainPingToken >>= \case + Nothing -> onError >> pure Nothing + Just tracker@Tracker{..} -> do + + -- if the tracker is not associated with a ticket, it is probably + -- just starting out on a new trip, or has finished an old one. + maybeTicketId <- case trackerCurrentTicket of + Just ticketId -> pure (Just ticketId) + Nothing -> runSql dbpool $ do + now <- liftIO getCurrentTime + tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] [] + ticketsWithFirstStation <- flip mapMaybeM tickets + (\ticket@(Entity ticketId _) -> do + selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case + Nothing -> pure Nothing + Just (Entity _ stop) -> do + station <- getJust (stopStation stop) + tzseries <- liftIO $ getTzseries (GTFS.tzname (stopDeparture stop)) + pure (Just (ticket, station, stop, tzseries))) + + if null ticketsWithFirstStation then pure Nothing else do + let (closestTicket, _, _, _) = minimumBy + -- (compare `on` euclid trainPingGeopos . stationGeopos . snd) + (compare `on` + (\(Entity _ ticket, station, stop, tzseries) -> + let + runningDay = ticketDay ticket + spaceDistance = euclid trainPingGeopos (stationGeopos station) + timeDiff = + GTFS.toSeconds (stopDeparture stop) tzseries runningDay + - utcToSeconds now runningDay + in + euclid trainPingGeopos (stationGeopos station) + + abs (seconds2Double timeDiff / 3600))) + ticketsWithFirstStation + logInfoN + $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+ + " (trip "+|ticketTripName (entityVal closestTicket)|+")." + + update (coerce trainPingToken) + [TrackerCurrentTicket =. Just (entityKey closestTicket)] + + pure (Just (entityKey closestTicket)) + + ticketId <- case maybeTicketId of + Just ticketId -> pure ticketId + Nothing -> do + logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + " sent a ping, but no trips are running today." + throwError err400 + runSql dbpool $ do + ticket@Ticket{..} <- getJust ticketId stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival] >>= mapM (\stop -> do @@ -165,17 +196,31 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI <&> (V.fromList . fmap entityVal) let anchor = extrapolateAnchorFromPing LinearExtrapolator - ticket stations shapePoints ping + ticketId ticket stations shapePoints ping insert ping - last <- selectFirst [TrainAnchorTicket ==. trainPingTicket] [Desc TrainAnchorWhen] + last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen] -- only insert new estimates if they've actually changed anything when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor)) $ void $ insert anchor + -- are we at the final stop? if so, mark this ticket as done + -- & the tracker as free + let maxSequence = V.last stations + & (\(stop, _, _) -> stopSequence stop) + & fromIntegral + when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do + update (coerce trainPingToken) + [TrackerCurrentTicket =. Nothing] + update ticketId + [TicketCompleted =. True] + logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + " has completed ticket "+|UUID.toString (coerce ticketId)|+ + " (trip "+|ticketTripName|+")" + queues <- liftIO $ atomically $ do - queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket) + queues <- readTVar subscribers <&> M.lookup (coerce ticketId) whenJust queues $ mapM_ (\q -> writeTQueue q (Just ping)) pure queues @@ -187,14 +232,14 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI msg <- liftIO $ WS.receiveData conn case A.eitherDecode msg of Left err -> do - logWarnN ("stray websocket message: "+|show msg|+" (could not decode: "+|err|+")") + logWarnN ("stray websocket message: "+|decodeASCII (toStrict msg)|+" (could not decode: "+|err|+")") liftIO $ WS.sendClose conn (C8.pack err) -- TODO: send a close msg (Nothing) to the subscribed queues? decGauge metricsWSGauge - Right ping -> + Right ping -> do -- if invalid token, send a "polite" close request. Note that the client may -- ignore this and continue sending messages, which will continue to be handled. - liftIO $ handleTrainPing (WS.sendClose conn ("" :: ByteString)) ping >>= \case - Just anchor -> WS.sendTextData conn (A.encode anchor) + handleTrainPing (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case + Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor) Nothing -> pure () handleSubscribe (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do queue <- atomically $ do @@ -204,7 +249,7 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI $ M.insertWith (<>) ticketId [queue] qs pure queue -- send most recent ping, if any (so we won't have to wait for movement) - lastPing <- runSql dbpool $ do + lastPing <- runSqlWithoutLog dbpool $ do trackers <- getTicketTrackers ticketId <&> fmap entityKey selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp] @@ -239,21 +284,21 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI handleDebugAPI = pure $ toSwagger (Proxy @API) metrics = exportMetricsAsText <&> (decodeUtf8 . toStrict) -getTicketTrackers :: UUID -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) [Entity Tracker] +getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO))) + => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker] getTicketTrackers ticketId = do joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] [] <&> fmap (trackerTicketTracker . entityVal) - selectList [TrackerId <-. joins] [] + selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) [] -- TODO: proper debug logging for expired tokens -isTokenValid :: MonadIO m => Pool SqlBackend -> TrackerId -> TicketId -> m (Maybe (Tracker, Ticket)) -isTokenValid dbpool token ticketId = runSql dbpool $ get token >>= \case +isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker) +isTokenValid dbpool token = runSql dbpool $ get token >>= \case Just tracker | not (trackerBlocked tracker) -> do ifM (hasExpired (trackerExpires tracker)) (pure Nothing) - $ runSql dbpool $ get ticketId - <&> (\case { Nothing -> Nothing; Just ticket -> Just (tracker, ticket) }) + (pure (Just tracker)) _ -> pure Nothing hasExpired :: MonadIO m => UTCTime -> m Bool diff --git a/lib/Server/ControlRoom.hs b/lib/Server/ControlRoom.hs index 9d15bcf..5292620 100644 --- a/lib/Server/ControlRoom.hs +++ b/lib/Server/ControlRoom.hs @@ -17,6 +17,7 @@ import Control.Monad.IO.Class (MonadIO (liftIO)) import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as C8 import qualified Data.ByteString.Lazy as LB +import Data.Coerce (coerce) import Data.Function (on, (&)) import Data.Functor ((<&>)) import Data.List (lookup, nubBy) @@ -86,6 +87,7 @@ mkYesod "ControlRoom" [parseRoutes| /obu OnboardUnitMenuR GET /obu/#UUID OnboardUnitR GET +/tracker OnboardTrackerR GET |] emptyMarkup :: MarkupM a -> Bool @@ -96,6 +98,7 @@ instance Yesod ControlRoom where authRoute _ = Just $ AuthR LoginR isAuthorized OnboardUnitMenuR _ = pure Authorized isAuthorized (OnboardUnitR _) _ = pure Authorized + isAuthorized OnboardTrackerR _ = pure Authorized isAuthorized (AuthR _) _ = pure Authorized isAuthorized _ _ = do UffdConfig{..} <- getYesod <&> serverConfigLogin . getSettings @@ -200,7 +203,7 @@ getTicketsR = do gtfs <- getYesod <&> getGtfs -- TODO: tickets should have all trip information saved - tickets <- runDB $ selectList [ TicketDay ==. day ] [] >>= mapM (\ticket -> do + tickets <- runDB $ selectList [ TicketDay ==. day ] [ Asc TicketTripName ] >>= mapM (\ticket -> do stops <- selectList [ StopTicket ==. entityKey ticket ] [] startStation <- getJust (stopStation $ entityVal $ head stops) pure (ticket, startStation, fmap entityVal stops)) @@ -317,9 +320,11 @@ getTicketViewR ticketId = do pure (entityVal stop, station)) anns <- runDB $ selectList [ AnnouncementTicket ==. ticketKey ] [] - trackerIds <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] [] + joins <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] [] <&> fmap (trackerTicketTracker . entityVal) - trackers <- runDB $ selectList [ TrackerId <-. trackerIds ] [Asc TrackerExpires] + trackers <- runDB $ selectList + ([ TrackerId <-. joins ] ||. [ TrackerCurrentTicket ==. Just ticketKey ]) + [Asc TrackerExpires] lastPing <- runDB $ selectFirst [ TrainPingToken <-. fmap entityKey trackers ] [Desc TrainPingTimestamp] anchors <- runDB $ selectList [ TrainAnchorTicket ==. ticketKey ] [] <&> nonEmpty . fmap entityVal @@ -511,7 +516,9 @@ getTokenBlock token = do case maybe of Just r@Tracker{..} -> do liftIO $ print r - redirect RootR + redirect $ case trackerCurrentTicket of + Just ticket -> TicketViewR (coerce ticket) + Nothing -> RootR Nothing -> notFound getOnboardUnitMenuR :: Handler Html @@ -525,14 +532,16 @@ getOnboardUnitMenuR = do defaultLayout $ do [whamlet| -<h1>_{MsgOBU} -<section> - _{MsgChooseTrain} - $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets - <hr> - <a href="@{OnboardUnitR ticketId}"> - #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop} -|] + <h1>_{MsgOBU} + <section> + _{MsgChooseTrain} + $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets + <hr> + <a href="@{OnboardUnitR ticketId}"> + #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop} + <section> + <a href="@{OnboardTrackerR}">_{MsgStartTracking} + |] getOnboardUnitR :: UUID -> Handler Html getOnboardUnitR ticketId = do @@ -541,6 +550,12 @@ getOnboardUnitR ticketId = do Just ticket -> pure ticket defaultLayout $(whamletFile "site/obu.hamlet") +getOnboardTrackerR :: Handler Html +getOnboardTrackerR = do + defaultLayout + $( whamletFile "site/tracker.hamlet") + + announceForm :: UUID -> Html -> MForm Handler (FormResult Announcement, Widget) announceForm ticketId = renderDivs $ Announcement <$> pure (TicketKey ticketId) diff --git a/lib/Server/GTFS_RT.hs b/lib/Server/GTFS_RT.hs index 48a84db..d2e53a1 100644 --- a/lib/Server/GTFS_RT.hs +++ b/lib/Server/GTFS_RT.hs @@ -155,13 +155,14 @@ gtfsRealtimeServer gtfs@GTFS{..} dbpool = ticket <- selectList [TicketCompleted ==. False] [] - positions <- forM ticket $ \(Entity key ticket) -> do - selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case - Nothing -> pure Nothing - Just lastPing -> - pure (Just $ mkPosition (lastPing, ticket)) - - defFeedMessage (catMaybes positions) + -- TODO: reimplement this (since trainpings no longer reference tickets it's gone for now) + -- positions <- forM ticket $ \(Entity key ticket) -> do + -- selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case + -- Nothing -> pure Nothing + -- Just lastPing -> + -- pure (Just $ mkPosition (lastPing, ticket)) + + defFeedMessage [] -- (catMaybes positions) where mkPosition :: (Entity TrainPing, Ticket) -> RT.FeedEntity diff --git a/lib/Server/Util.hs b/lib/Server/Util.hs index 5ffb829..0106428 100644 --- a/lib/Server/Util.hs +++ b/lib/Server/Util.hs @@ -1,28 +1,67 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE RecordWildCards #-} -- | mostly the monad the service runs in -module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds) where +module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds, runLogging) where +import Config (LoggingConfig (..)) +import Control.Exception (handle, try) +import Control.Monad.Extra (void, whenJust) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (LoggingT, runStderrLoggingT) +import Control.Monad.Logger (Loc, LogLevel (..), LogSource, LogStr, + LoggingT (..), defaultOutput, + fromLogStr, runStderrLoggingT) +import Control.Monad.Reader (ReaderT (..)) import qualified Data.Aeson as A import Data.ByteString (ByteString) +import qualified Data.ByteString as C8 import Data.Text (Text) +import qualified Data.Text as T +import Data.Text.Encoding (decodeUtf8Lenient) import Data.Time (Day, UTCTime (..), diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) +import Fmt ((+|), (|+)) +import GHC.IO.Exception (IOException (IOError)) import GTFS (Seconds (..)) import Prometheus (MonadMonitor (doIO)) import Servant (Handler, ServerError, ServerT, err404, errBody, errHeaders, throwError) +import System.IO (stderr) +import System.Process.Extra (callProcess) -type ServiceM = LoggingT Handler +type ServiceM = LoggingT (ReaderT LoggingConfig Handler) type Service api = ServerT api ServiceM -runService :: ServiceM a -> Handler a -runService = runStderrLoggingT +runService :: LoggingConfig -> ServiceM a -> Handler a +runService conf m = runReaderT (runLogging conf m) conf instance MonadMonitor ServiceM where doIO = liftIO +runLogging :: MonadIO m => LoggingConfig -> LoggingT m a -> m a +runLogging LoggingConfig{..} logging = runLoggingT logging printLogMsg + where printLogMsg loc source level msg = do + -- this is what runStderrLoggingT does + defaultOutput stderr loc source level msg + + whenJust loggingConfigNtfyToken \token -> handle ntfyFailed do + callProcess "ntfy" + [ "send" + , "--token=" <> T.unpack token + , "--title="+|loggingConfigHostname|+"/"+|"tracktrain" + , "--priority="+|show (ntfyPriority level)|+"" + , T.unpack loggingConfigNtfyTopic + , T.unpack (decodeUtf8Lenient (fromLogStr msg)) ] + + ntfyFailed (e :: IOError) = + putStrLn ("calling ntfy failed:"+|show e|+".") + ntfyPriority level = case level of + LevelDebug -> 2 + LevelInfo -> 3 + LevelWarn -> 4 + LevelError -> 5 + LevelOther _ -> 0 + sendErrorMsg :: Text -> ServiceM a sendErrorMsg msg = throwError err404 diff --git a/messages/en.msg b/messages/en.msg index 41ced10..a2ef189 100644 --- a/messages/en.msg +++ b/messages/en.msg @@ -40,6 +40,7 @@ Tickets: Tickets ImportTrips: import selected trips delete: delete AccordingToGtfs: Additional Trips contained in the Gtfs +StartTracking: Start Tracking OBU: Onboard-Unit ChooseTrain: Choose a Train diff --git a/site/tracker.hamlet b/site/tracker.hamlet new file mode 100644 index 0000000..5877c5d --- /dev/null +++ b/site/tracker.hamlet @@ -0,0 +1,137 @@ +<h1>_{MsgOBU} + +<section> + <h2>Blub + <strong>Token:</strong> <span id="token"> +<section> + <h2>Status + <p id="status">_{MsgNone} + <p id>_{MsgError}: <span id="error"> +<section> + <h2>_{MsgLive} + <p><strong>Position: </strong><span id="lat"></span>, <span id="long"></span> + <p><strong>Accuracy: </strong><span id="acc"> +<section> + <h2>_{MsgEstimated} + <p><strong>_{MsgDelay}</strong>: <span id="delay"> + <p><strong>_{MsgSequence}</strong>: <span id="sequence"> + + +<script> + var token = null; + + let euclid = (a,b) => { + let x = a[0]-b[0]; + let y = a[1]-b[1]; + return x*x+y*y; + } + + let minimalDist = (point, list, proj, norm) => { + return list.reduce ( + (min, x) => { + let dist = norm(point, proj(x)); + return dist < min[0] ? [dist,x] : min + }, + [norm(point, proj(list[0])), list[0]] + )[1] + } + + let counter = 0; + let ws; + let id; + + function setStatus(msg) { + document.getElementById("status").innerText = msg + } + + async function geoError(error) { + setStatus("error"); + alert(`_{MsgPermissionFailed}: \n${error.message}`); + console.error(error); + main(); + } + + async function wsError(error) { + // alert(`_{MsgWebsocketError}: \n${error.message === undefined ? error.reason : error.message}`); + console.log(error); + navigator.geolocation.clearWatch(id); + } + + async function wsClose(error) { + console.log(error); + document.getElementById("error").innerText = `websocket closed (reason: ${error.reason}). reconnecting …`; + navigator.geolocation.clearWatch(id); + setTimeout(openWebsocket, 1000); + } + + function wsMsg(msg) { + let json = JSON.parse(msg.data); + console.log(json); + document.getElementById("delay").innerText = + `${json.delay}s (${Math.floor(json.delay / 60)}min)`; + document.getElementById("sequence").innerText = json.sequence; + } + + + function initGeopos() { + document.getElementById("error").innerText = ""; + id = navigator.geolocation.watchPosition( + geoPing, + geoError, + {enableHighAccuracy: true} + ); + } + + + function openWebsocket () { + ws = new WebSocket((location.protocol == "http:" ? "ws" : "wss") + "://" + location.host + "/api/tracker/ping/ws"); + ws.onerror = wsError; + ws.onclose = wsClose; + ws.onmessage = wsMsg; + ws.onopen = (event) => { + setStatus("connected"); + }; + } + + async function geoPing(geoloc) { + console.log("got position update " + counter); + document.getElementById("lat").innerText = geoloc.coords.latitude; + document.getElementById("long").innerText = geoloc.coords.longitude; + document.getElementById("acc").innerText = geoloc.coords.accuracy; + + if (ws !== undefined && ws.readyState == 1) { + ws.send(JSON.stringify({ + token: token, + geopos: [ geoloc.coords.latitude, geoloc.coords.longitude ], + timestamp: (new Date()).toISOString() + })); + counter += 1; + setStatus(`sent ${counter} pings`); + } else { + setStatus(`websocket readystate ${ws.readyState}`); + } + } + + + async function main() { + initGeopos(); + + token = await (await fetch("/api/tracker/register/", { + method: "POST", + body: JSON.stringify({agent: "tracktrain-website"}), + headers: {"Content-Type": "application/json"} + })).json(); + + if (token.error) { + alert("could not obtain token: \n" + token.msg); + setStatus("_{MsgTokenFailed}"); + } else { + console.log("got token"); + + document.getElementById("token").innerText = token; + + openWebsocket(); + } + } + + main() diff --git a/tracktrain.cabal b/tracktrain.cabal index ad1624f..a8e42a4 100644 --- a/tracktrain.cabal +++ b/tracktrain.cabal @@ -97,6 +97,7 @@ library , proto-lens , http-media , filepath + , monad-control hs-source-dirs: lib exposed-modules: GTFS , Server |