diff options
author | stuebinm | 2024-05-15 01:12:09 +0200 |
---|---|---|
committer | stuebinm | 2024-05-15 01:12:09 +0200 |
commit | 0c9b3a6dba6850ce526d1d397f35aa6ad76beb50 (patch) | |
tree | f7b307548904ed0c0bb1459b9d09da63ed0a4243 | |
parent | 59670bdb6f0a3bba898274eadf47707e93bea195 (diff) |
Server.Ingest: new handling of unassigned trackers
now takes all potential stops along each trip into account when guessing
tickets; also checks if a ticket is still likely in case the tracker
switched its direction. This should solve many cases where a tracker is
accidentally turned off or falls asleep halfway before the last station
of one trip, then wakes up in the middle of the next.
-rw-r--r-- | lib/API.hs | 18 | ||||
-rw-r--r-- | lib/Extrapolation.hs | 24 | ||||
-rw-r--r-- | lib/Persist.hs | 6 | ||||
-rw-r--r-- | lib/Server/Ingest.hs | 206 | ||||
-rw-r--r-- | todo.org | 4 |
5 files changed, 170 insertions, 88 deletions
@@ -1,10 +1,11 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE ExplicitNamespaces #-} {-# LANGUAGE UndecidableInstances #-} -- | The sole authorative definition of this server's API, given as a Servant-style -- Haskell type. All other descriptions of the API are generated from this one. -module API (API, CompleteAPI, GtfsRealtimeAPI, RegisterJson(..), Metrics(..)) where +module API (API, CompleteAPI, GtfsRealtimeAPI, RegisterJson(..), Metrics(..), SentPing(..)) where import Data.Map (Map) import Data.Proxy (Proxy (..)) @@ -51,12 +52,22 @@ import Prometheus import Proto.GtfsRealtime (FeedMessage) import Servant.API.ContentTypes (Accept (..)) +-- | a bare ping as sent by a tracker device +data SentPing = SentPing + { sentPingToken :: TrackerId + , sentPingGeopos :: Geopos + , sentPingTimestamp :: UTCTime + } deriving (Generic) + +instance FromJSON SentPing where + parseJSON = genericParseJSON (aesonOptions "sentPing") + -- | tracktrain's API type API = -- ingress API (put this behind BasicAuth?) -- TODO: perhaps require a first ping for registration? "tracker" :> "register" :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token - :<|> "tracker" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor) + :<|> "tracker" :> "ping" :> ReqBody '[JSON] SentPing :> Post '[JSON] (Maybe TrainAnchor) :<|> "tracker" :> "ping" :> "ws" :> WebSocket :<|> "ticket" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket :<|> "debug" :> "pings" :> Get '[JSON] (Map Token [TrainPing]) @@ -94,6 +105,8 @@ instance ToSchema RegisterJson where instance ToSchema Value where declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty & type_ ?~ SwaggerObject +instance ToSchema SentPing where + declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "trainPing") @@ -124,3 +137,4 @@ instance Message msg => MimeRender Proto msg where instance {-# OVERLAPPABLE #-} Message msg => ToSchema msg where declareNamedSchema proxy = pure (NamedSchema (Just (messageName proxy)) mempty) + diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs index 389d047..759b31e 100644 --- a/lib/Extrapolation.hs +++ b/lib/Extrapolation.hs @@ -18,6 +18,7 @@ import qualified Data.Vector as V import GHC.Float (int2Double) import GHC.IO (unsafePerformIO) +import API (SentPing (..)) import Conduit (MonadIO (liftIO)) import Data.List (sortBy, sortOn) import Data.Ord (Down (..)) @@ -29,8 +30,7 @@ import Persist (Geopos (..), Station (..), Stop (..), Ticket (..), TicketId, Token (..), Tracker (..), - TrainAnchor (..), - TrainPing (..)) + TrainAnchor (..)) import Server.Util (utcToSeconds) -- | Determines how to extrapolate delays (and potentially other things) from the real-time @@ -44,7 +44,7 @@ class Extrapolator strategy where -> Ticket -> V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint - -> TrainPing + -> SentPing -> TrainAnchor -- | extrapolate status at some time (i.e. "how much delay does the train have *now*?") @@ -65,14 +65,14 @@ instance Extrapolator LinearExtrapolator where -- (in case the train just stands still for a while, take the most recent update) extrapolateAtPosition _ history positionNow = fmap (minimumBy (compare `on` difference)) - $ NE.nonEmpty $ sortOn (Down . trainAnchorWhen) + $ NE.nonEmpty $ sortOn (Down . trainAnchorCreated) $ NE.filter (\a -> trainAnchorSequence a < positionNow) history where difference status = positionNow - trainAnchorSequence status - extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor - { trainAnchorCreated = trainPingTimestamp + extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@SentPing{..} = TrainAnchor + { trainAnchorCreated = sentPingTimestamp , trainAnchorTicket = ticketId - , trainAnchorWhen = utcToSeconds trainPingTimestamp ticketDay + , trainAnchorWhen = utcToSeconds sentPingTimestamp ticketDay , trainAnchorSequence , trainAnchorDelay , trainAnchorMsg = Nothing @@ -81,8 +81,8 @@ instance Extrapolator LinearExtrapolator where (trainAnchorDelay, trainAnchorSequence) = linearDelay stops shape ping ticketDay tzseries = undefined -linearDelay :: V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint -> TrainPing -> Day -> (Seconds, Double) -linearDelay tripStops shape TrainPing{..} runningDay = (observedDelay, observedSequence) +linearDelay :: V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint -> SentPing -> Day -> (Seconds, Double) +linearDelay tripStops shape SentPing{..} runningDay = (observedDelay, observedSequence) where -- at which (fractional) sequence number is the ping? observedSequence = int2Double (stopSequence lastStop) + observedProgress * int2Double (stopSequence nextStop - stopSequence lastStop) @@ -93,7 +93,7 @@ linearDelay tripStops shape TrainPing{..} runningDay = (observedDelay, observedS + if expectedProgress == 1 -- if the hypothetical on-time train is already at (or past) the next station, -- just add the time distance we're behind - then seconds2Double (utcToSeconds trainPingTimestamp runningDay - nextSeconds) + then seconds2Double (utcToSeconds sentPingTimestamp runningDay - nextSeconds) -- otherwise the above is sufficient else 0 @@ -107,14 +107,14 @@ linearDelay tripStops shape TrainPing{..} runningDay = (observedDelay, observedS | p < 0 -> 0 | p > 1 -> 1 | otherwise -> p - where p = seconds2Double (utcToSeconds trainPingTimestamp runningDay - lastSeconds) + where p = seconds2Double (utcToSeconds sentPingTimestamp runningDay - lastSeconds) / seconds2Double expectedTravelTime -- scheduled duration between last and next stops expectedTravelTime = nextSeconds - lastSeconds -- closest point on the shape; this is where we assume the train to be - closestPoint = minimumBy (compare `on` euclid trainPingGeopos) line + closestPoint = minimumBy (compare `on` euclid sentPingGeopos) line -- scheduled departure at last & arrival at next stop lastSeconds = toSeconds (stopDeparture lastStop) lastTzSeries runningDay diff --git a/lib/Persist.hs b/lib/Persist.hs index 46e5ef4..e268455 100644 --- a/lib/Persist.hs +++ b/lib/Persist.hs @@ -142,6 +142,8 @@ TrainPing json sql=tt_trip_ping token TrackerId geopos Geopos timestamp UTCTime + sequence Double + ticket TicketId deriving Show Generic Eq -- status of a train somewhen in time (may be in the future), @@ -177,6 +179,8 @@ instance ToSchema TrainAnchor where instance ToSchema Announcement where declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "announcement") +type InSql a = ReaderT SqlBackend (LoggingT (ResourceT IO)) a + runSqlWithoutLog :: MonadIO m => Pool SqlBackend -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a @@ -188,7 +192,7 @@ runSqlWithoutLog pool = liftIO . flip runSqlPersistMPool pool -- no better way than to nest logger monads … runSql :: (MonadLogger m, MonadIO m, MonadReader LoggingConfig m) => Pool SqlBackend - -> ReaderT SqlBackend (LoggingT (ResourceT IO)) a + -> InSql a -> m a runSql pool x = do conf <- ask diff --git a/lib/Server/Ingest.hs b/lib/Server/Ingest.hs index d774017..6a8383f 100644 --- a/lib/Server/Ingest.hs +++ b/lib/Server/Ingest.hs @@ -3,12 +3,15 @@ module Server.Ingest (handleTrackerRegister, handleTrainPing, handleWS) where import API (Metrics (..), - RegisterJson (..)) + RegisterJson (..), + SentPing (..)) import Control.Concurrent.STM (atomically, readTVar, writeTQueue) -import Control.Monad (forever, void, when) +import Control.Monad (forM, forever, unless, + void, when) import Control.Monad.Catch (handle) -import Control.Monad.Extra (ifM, mapMaybeM, whenJust) +import Control.Monad.Extra (ifM, mapMaybeM, whenJust, + whenJustM) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Logger (LoggingT, logInfoN, logWarnN) @@ -23,6 +26,7 @@ import Data.Text (Text) import Data.Text.Encoding (decodeASCII, decodeUtf8) import Data.Time (NominalDiffTime, UTCTime (..), addUTCTime, + diffUTCTime, getCurrentTime, nominalDay) import qualified Data.Vector as V @@ -38,10 +42,12 @@ import Server.Util (ServiceM, getTzseries, utcToSeconds) import Config (LoggingConfig, - ServerConfig) + ServerConfig (..)) +import Control.Exception (throw) +import Control.Monad.Logger.CallStack (logErrorN) import Data.ByteString (ByteString) import Data.ByteString.Lazy (toStrict) -import Data.Foldable (minimumBy) +import Data.Foldable (find, minimumBy) import Data.Function (on, (&)) import qualified Data.Text as T import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries) @@ -49,12 +55,12 @@ import qualified Data.UUID as UUID import Extrapolation (Extrapolator (..), LinearExtrapolator (..), euclid) +import GHC.Generics (Generic) import GTFS (seconds2Double) import Prometheus (decGauge, incGauge) import Server.Base (ServerState) - handleTrackerRegister :: Pool SqlBackend -> RegisterJson @@ -74,78 +80,96 @@ handleTrainPing -> ServerState -> ServerConfig -> LoggingT (ReaderT LoggingConfig Handler) a - -> TrainPing + -> SentPing -> LoggingT (ReaderT LoggingConfig Handler) (Maybe TrainAnchor) -handleTrainPing dbpool subscribers cfg onError ping@TrainPing{..} = - isTokenValid dbpool trainPingToken >>= \case +handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} = + isTokenValid dbpool sentPingToken >>= \case Nothing -> onError >> pure Nothing Just tracker@Tracker{..} -> do - -- if the tracker is not associated with a ticket, it is probably - -- just starting out on a new trip, or has finished an old one. - maybeTicketId <- case trackerCurrentTicket of - Just ticketId -> pure (Just ticketId) - Nothing -> runSql dbpool $ do - now <- liftIO getCurrentTime - tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] [] - ticketsWithFirstStation <- flip mapMaybeM tickets - (\ticket@(Entity ticketId _) -> do - selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case - Nothing -> pure Nothing - Just (Entity _ stop) -> do - station <- getJust (stopStation stop) - tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopDeparture stop)) - pure (Just (ticket, station, stop, tzseries))) - - if null ticketsWithFirstStation then pure Nothing else do - let (closestTicket, _, _, _) = minimumBy - -- (compare `on` euclid trainPingGeopos . stationGeopos . snd) - (compare `on` - (\(Entity _ ticket, station, stop, tzseries) -> - let - runningDay = ticketDay ticket - spaceDistance = euclid trainPingGeopos (stationGeopos station) - timeDiff = - GTFS.toSeconds (stopDeparture stop) tzseries runningDay - - utcToSeconds now runningDay - in - euclid trainPingGeopos (stationGeopos station) - + abs (seconds2Double timeDiff / 3600))) - ticketsWithFirstStation - logInfoN - $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ - " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+ - " (trip "+|ticketTripName (entityVal closestTicket)|+")." - - update trainPingToken - [TrackerCurrentTicket =. Just (entityKey closestTicket)] - - pure (Just (entityKey closestTicket)) - - ticketId <- case maybeTicketId of + unless (serverConfigDebugMode cfg) $ do + now <- liftIO getCurrentTime + let timeDiff = sentPingTimestamp `diffUTCTime` now + when (utctDay sentPingTimestamp /= utctDay now) $ do + logErrorN "received ping for wrong day" + throw err400 + when (timeDiff < 10) $ do + logWarnN "received ping more than 10 seconds out of date" + throw err400 + when (timeDiff > 10) $ do + logWarnN "received ping from more than 10 seconds in the future" + throw err400 + + ticketId <- case trackerCurrentTicket of Just ticketId -> pure ticketId - Nothing -> do - logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ - " sent a ping, but no trips are running today." - throwError err400 + -- if the tracker is not associated with a ticket, it is probably new + -- & should be auto-associated with the most fitting current ticket + Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case + Just ticketId -> pure ticketId + Nothing -> do + logWarnN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+ + " sent a ping, but no trips are running today." + throwError err400 + + + runSql dbpool $ insertSentPing subscribers cfg ping tracker ticketId + +insertSentPing + :: ServerState + -> ServerConfig + -> SentPing + -> Tracker + -> TicketId + -> InSql (Maybe TrainAnchor) +insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId = do + ticket@Ticket{..} <- getJust ticketId + + stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival] + >>= mapM (\stop -> do + station <- getJust (stopStation (entityVal stop)) + tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopArrival (entityVal stop))) + pure (entityVal stop, station, tzseries)) + <&> V.fromList + + shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex] + <&> (V.fromList . fmap entityVal) + + + let anchor = extrapolateAnchorFromPing LinearExtrapolator + ticketId ticket stations shapePoints ping - runSql dbpool $ do - ticket@Ticket{..} <- getJust ticketId - stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival] - >>= mapM (\stop -> do - station <- getJust (stopStation (entityVal stop)) - tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopArrival (entityVal stop))) - pure (entityVal stop, station, tzseries)) - <&> V.fromList + maybeReassign <- selectFirst + [ TrainPingTicket ==. ticketId ] + [ Desc TrainPingTimestamp ] + <&> find (\ping -> trainPingSequence (entityVal ping) > trainAnchorSequence anchor) + >> guessTicketFromPing cfg ping + <&> find (/= ticketId) - shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex] - <&> (V.fromList . fmap entityVal) - let anchor = extrapolateAnchorFromPing LinearExtrapolator - ticketId ticket stations shapePoints ping + -- mapM (\newTicketId -> if ticketId /= newTicketId then Just newTicketId else Nothing)) + -- >>= (\ping -> guessTicketFromPing cfg ping >>= \case + -- Just newTicketId | ticketId /= newTicketId -> pure (Just newTicketId) + -- _ -> pure Nothing) - insert ping + case maybeReassign of + Just newTicketId -> do + update sentPingToken + [TrackerCurrentTicket =. Just newTicketId ] + logInfoN $ "tracker "+|UUID.toText (coerce sentPingToken)|+ + "has switched direction, and was reassigned to ticket " + +|UUID.toText (coerce newTicketId)|+"." + insertSentPing subscribers cfg ping tracker newTicketId + Nothing -> do + let trackedPing = TrainPing + { trainPingToken = sentPingToken + , trainPingGeopos = sentPingGeopos + , trainPingTimestamp = sentPingTimestamp + , trainPingSequence = trainAnchorSequence anchor + , trainPingTicket = ticketId + } + + insert trackedPing last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen] -- only insert new estimates if they've actually changed anything @@ -158,18 +182,18 @@ handleTrainPing dbpool subscribers cfg onError ping@TrainPing{..} = & (\(stop, _, _) -> stopSequence stop) & fromIntegral when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do - update trainPingToken + update sentPingToken [TrackerCurrentTicket =. Nothing] update ticketId [TicketCompleted =. True] - logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + logInfoN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+ " has completed ticket "+|UUID.toString (coerce ticketId)|+ " (trip "+|ticketTripName|+")" queues <- liftIO $ atomically $ do queues <- readTVar subscribers <&> M.lookup (coerce ticketId) whenJust queues $ - mapM_ (\q -> writeTQueue q (Just ping)) + mapM_ (\q -> writeTQueue q (Just trackedPing)) pure queues pure (Just anchor) @@ -196,6 +220,46 @@ handleWS dbpool subscribers cfg Metrics{..} conn = do Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor) Nothing -> pure () + +guessTicketFromPing :: ServerConfig -> SentPing -> InSql (Maybe (Key Ticket)) +guessTicketFromPing cfg SentPing{..} = do + tickets <- selectList [ TicketDay ==. utctDay sentPingTimestamp, TicketCompleted ==. False ] [] + + ticketsWithStation <- forM tickets (\ticket@(Entity ticketId _) -> do + stops <- selectList [StopTicket ==. ticketId] [Asc StopSequence] >>= mapM (\(Entity _ stop) -> do + station <- getJust (stopStation stop) + tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopDeparture stop)) + pure (station, stop, tzseries)) + pure (ticket, stops)) + + if null ticketsWithStation then pure Nothing else do + let (closestTicket, _) = ticketsWithStation + & minimumBy (compare `on` (\(Entity _ ticket, stations) -> + let + runningDay = ticketDay ticket + smallestDistance = stations + <&> (\(station, stop, tzseries) -> spaceAndTimeDiff + (sentPingGeopos, utcToSeconds sentPingTimestamp runningDay) + (stationGeopos station, GTFS.toSeconds (stopDeparture stop) tzseries runningDay)) + & minimum + in smallestDistance)) + + logInfoN + $ "Tracker "+|UUID.toString (coerce sentPingToken)|+ + " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+ + " (trip "+|ticketTripName (entityVal closestTicket)|+")." + + update sentPingToken + [TrackerCurrentTicket =. Just (entityKey closestTicket)] + + pure (Just (entityKey closestTicket)) + +spaceAndTimeDiff :: (Geopos, GTFS.Seconds) -> (Geopos, GTFS.Seconds) -> Double +spaceAndTimeDiff (pos1, time1) (pos2, time2) = + spaceDistance + abs (seconds2Double timeDiff / 3600) + where spaceDistance = euclid pos1 pos2 + timeDiff = time1 - time2 + -- TODO: proper debug logging for expired tokens isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker) isTokenValid dbpool token = runSql dbpool $ get token >>= \case @@ -4,8 +4,6 @@ ** TODO auto-reconnect of /tracker websocket fails after android has gone to sleep ** DONE /tracker should remember its token, not constantly open a new one either via a cookie or url parameter & redirect -** TODO matching of tokens to trip ought not to assume trips are at their start position -this produces horrible results if the tracker is started towards a trip's end ** TODO train anchors are sorted wrong probably based on sequence number, not based on time. results in "stuck" nonsensical delay information if we had a mistaken geolocation in the middle @@ -18,6 +16,8 @@ that time, since we assume linear movement. ** DONE do not give tripupdates after tickets are completed or outdated ** TODO tickets are not reliably marked completed ** TODO any kind of check against unrealistically fast travel? +** DONE matching of tokens to trip ought not to assume trips are at their start position +this produces horrible results if the tracker is started towards a trip's end * TODO implement GTFS realtime ** TODO get google to accept trip updates ** DONE get google to accept service alerts |