aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/API.hs18
-rw-r--r--lib/Extrapolation.hs24
-rw-r--r--lib/Persist.hs6
-rw-r--r--lib/Server/Ingest.hs206
4 files changed, 168 insertions, 86 deletions
diff --git a/lib/API.hs b/lib/API.hs
index b2635c1..7ebfb06 100644
--- a/lib/API.hs
+++ b/lib/API.hs
@@ -1,10 +1,11 @@
{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE ExplicitNamespaces #-}
{-# LANGUAGE UndecidableInstances #-}
-- | The sole authorative definition of this server's API, given as a Servant-style
-- Haskell type. All other descriptions of the API are generated from this one.
-module API (API, CompleteAPI, GtfsRealtimeAPI, RegisterJson(..), Metrics(..)) where
+module API (API, CompleteAPI, GtfsRealtimeAPI, RegisterJson(..), Metrics(..), SentPing(..)) where
import Data.Map (Map)
import Data.Proxy (Proxy (..))
@@ -51,12 +52,22 @@ import Prometheus
import Proto.GtfsRealtime (FeedMessage)
import Servant.API.ContentTypes (Accept (..))
+-- | a bare ping as sent by a tracker device
+data SentPing = SentPing
+ { sentPingToken :: TrackerId
+ , sentPingGeopos :: Geopos
+ , sentPingTimestamp :: UTCTime
+ } deriving (Generic)
+
+instance FromJSON SentPing where
+ parseJSON = genericParseJSON (aesonOptions "sentPing")
+
-- | tracktrain's API
type API =
-- ingress API (put this behind BasicAuth?)
-- TODO: perhaps require a first ping for registration?
"tracker" :> "register" :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token
- :<|> "tracker" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor)
+ :<|> "tracker" :> "ping" :> ReqBody '[JSON] SentPing :> Post '[JSON] (Maybe TrainAnchor)
:<|> "tracker" :> "ping" :> "ws" :> WebSocket
:<|> "ticket" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket
:<|> "debug" :> "pings" :> Get '[JSON] (Map Token [TrainPing])
@@ -94,6 +105,8 @@ instance ToSchema RegisterJson where
instance ToSchema Value where
declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty
& type_ ?~ SwaggerObject
+instance ToSchema SentPing where
+ declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "trainPing")
@@ -124,3 +137,4 @@ instance Message msg => MimeRender Proto msg where
instance {-# OVERLAPPABLE #-} Message msg => ToSchema msg where
declareNamedSchema proxy =
pure (NamedSchema (Just (messageName proxy)) mempty)
+
diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs
index 389d047..759b31e 100644
--- a/lib/Extrapolation.hs
+++ b/lib/Extrapolation.hs
@@ -18,6 +18,7 @@ import qualified Data.Vector as V
import GHC.Float (int2Double)
import GHC.IO (unsafePerformIO)
+import API (SentPing (..))
import Conduit (MonadIO (liftIO))
import Data.List (sortBy, sortOn)
import Data.Ord (Down (..))
@@ -29,8 +30,7 @@ import Persist (Geopos (..),
Station (..), Stop (..),
Ticket (..), TicketId,
Token (..), Tracker (..),
- TrainAnchor (..),
- TrainPing (..))
+ TrainAnchor (..))
import Server.Util (utcToSeconds)
-- | Determines how to extrapolate delays (and potentially other things) from the real-time
@@ -44,7 +44,7 @@ class Extrapolator strategy where
-> Ticket
-> V.Vector (Stop, Station, TimeZoneSeries)
-> V.Vector ShapePoint
- -> TrainPing
+ -> SentPing
-> TrainAnchor
-- | extrapolate status at some time (i.e. "how much delay does the train have *now*?")
@@ -65,14 +65,14 @@ instance Extrapolator LinearExtrapolator where
-- (in case the train just stands still for a while, take the most recent update)
extrapolateAtPosition _ history positionNow =
fmap (minimumBy (compare `on` difference))
- $ NE.nonEmpty $ sortOn (Down . trainAnchorWhen)
+ $ NE.nonEmpty $ sortOn (Down . trainAnchorCreated)
$ NE.filter (\a -> trainAnchorSequence a < positionNow) history
where difference status = positionNow - trainAnchorSequence status
- extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor
- { trainAnchorCreated = trainPingTimestamp
+ extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@SentPing{..} = TrainAnchor
+ { trainAnchorCreated = sentPingTimestamp
, trainAnchorTicket = ticketId
- , trainAnchorWhen = utcToSeconds trainPingTimestamp ticketDay
+ , trainAnchorWhen = utcToSeconds sentPingTimestamp ticketDay
, trainAnchorSequence
, trainAnchorDelay
, trainAnchorMsg = Nothing
@@ -81,8 +81,8 @@ instance Extrapolator LinearExtrapolator where
(trainAnchorDelay, trainAnchorSequence) = linearDelay stops shape ping ticketDay
tzseries = undefined
-linearDelay :: V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint -> TrainPing -> Day -> (Seconds, Double)
-linearDelay tripStops shape TrainPing{..} runningDay = (observedDelay, observedSequence)
+linearDelay :: V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint -> SentPing -> Day -> (Seconds, Double)
+linearDelay tripStops shape SentPing{..} runningDay = (observedDelay, observedSequence)
where -- at which (fractional) sequence number is the ping?
observedSequence = int2Double (stopSequence lastStop)
+ observedProgress * int2Double (stopSequence nextStop - stopSequence lastStop)
@@ -93,7 +93,7 @@ linearDelay tripStops shape TrainPing{..} runningDay = (observedDelay, observedS
+ if expectedProgress == 1
-- if the hypothetical on-time train is already at (or past) the next station,
-- just add the time distance we're behind
- then seconds2Double (utcToSeconds trainPingTimestamp runningDay - nextSeconds)
+ then seconds2Double (utcToSeconds sentPingTimestamp runningDay - nextSeconds)
-- otherwise the above is sufficient
else 0
@@ -107,14 +107,14 @@ linearDelay tripStops shape TrainPing{..} runningDay = (observedDelay, observedS
| p < 0 -> 0
| p > 1 -> 1
| otherwise -> p
- where p = seconds2Double (utcToSeconds trainPingTimestamp runningDay - lastSeconds)
+ where p = seconds2Double (utcToSeconds sentPingTimestamp runningDay - lastSeconds)
/ seconds2Double expectedTravelTime
-- scheduled duration between last and next stops
expectedTravelTime = nextSeconds - lastSeconds
-- closest point on the shape; this is where we assume the train to be
- closestPoint = minimumBy (compare `on` euclid trainPingGeopos) line
+ closestPoint = minimumBy (compare `on` euclid sentPingGeopos) line
-- scheduled departure at last & arrival at next stop
lastSeconds = toSeconds (stopDeparture lastStop) lastTzSeries runningDay
diff --git a/lib/Persist.hs b/lib/Persist.hs
index 46e5ef4..e268455 100644
--- a/lib/Persist.hs
+++ b/lib/Persist.hs
@@ -142,6 +142,8 @@ TrainPing json sql=tt_trip_ping
token TrackerId
geopos Geopos
timestamp UTCTime
+ sequence Double
+ ticket TicketId
deriving Show Generic Eq
-- status of a train somewhen in time (may be in the future),
@@ -177,6 +179,8 @@ instance ToSchema TrainAnchor where
instance ToSchema Announcement where
declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "announcement")
+type InSql a = ReaderT SqlBackend (LoggingT (ResourceT IO)) a
+
runSqlWithoutLog :: MonadIO m
=> Pool SqlBackend
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a
@@ -188,7 +192,7 @@ runSqlWithoutLog pool = liftIO . flip runSqlPersistMPool pool
-- no better way than to nest logger monads …
runSql :: (MonadLogger m, MonadIO m, MonadReader LoggingConfig m)
=> Pool SqlBackend
- -> ReaderT SqlBackend (LoggingT (ResourceT IO)) a
+ -> InSql a
-> m a
runSql pool x = do
conf <- ask
diff --git a/lib/Server/Ingest.hs b/lib/Server/Ingest.hs
index d774017..6a8383f 100644
--- a/lib/Server/Ingest.hs
+++ b/lib/Server/Ingest.hs
@@ -3,12 +3,15 @@
module Server.Ingest (handleTrackerRegister, handleTrainPing, handleWS) where
import API (Metrics (..),
- RegisterJson (..))
+ RegisterJson (..),
+ SentPing (..))
import Control.Concurrent.STM (atomically, readTVar,
writeTQueue)
-import Control.Monad (forever, void, when)
+import Control.Monad (forM, forever, unless,
+ void, when)
import Control.Monad.Catch (handle)
-import Control.Monad.Extra (ifM, mapMaybeM, whenJust)
+import Control.Monad.Extra (ifM, mapMaybeM, whenJust,
+ whenJustM)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Logger (LoggingT, logInfoN,
logWarnN)
@@ -23,6 +26,7 @@ import Data.Text (Text)
import Data.Text.Encoding (decodeASCII, decodeUtf8)
import Data.Time (NominalDiffTime,
UTCTime (..), addUTCTime,
+ diffUTCTime,
getCurrentTime,
nominalDay)
import qualified Data.Vector as V
@@ -38,10 +42,12 @@ import Server.Util (ServiceM, getTzseries,
utcToSeconds)
import Config (LoggingConfig,
- ServerConfig)
+ ServerConfig (..))
+import Control.Exception (throw)
+import Control.Monad.Logger.CallStack (logErrorN)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (toStrict)
-import Data.Foldable (minimumBy)
+import Data.Foldable (find, minimumBy)
import Data.Function (on, (&))
import qualified Data.Text as T
import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries)
@@ -49,12 +55,12 @@ import qualified Data.UUID as UUID
import Extrapolation (Extrapolator (..),
LinearExtrapolator (..),
euclid)
+import GHC.Generics (Generic)
import GTFS (seconds2Double)
import Prometheus (decGauge, incGauge)
import Server.Base (ServerState)
-
handleTrackerRegister
:: Pool SqlBackend
-> RegisterJson
@@ -74,78 +80,96 @@ handleTrainPing
-> ServerState
-> ServerConfig
-> LoggingT (ReaderT LoggingConfig Handler) a
- -> TrainPing
+ -> SentPing
-> LoggingT (ReaderT LoggingConfig Handler) (Maybe TrainAnchor)
-handleTrainPing dbpool subscribers cfg onError ping@TrainPing{..} =
- isTokenValid dbpool trainPingToken >>= \case
+handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} =
+ isTokenValid dbpool sentPingToken >>= \case
Nothing -> onError >> pure Nothing
Just tracker@Tracker{..} -> do
- -- if the tracker is not associated with a ticket, it is probably
- -- just starting out on a new trip, or has finished an old one.
- maybeTicketId <- case trackerCurrentTicket of
- Just ticketId -> pure (Just ticketId)
- Nothing -> runSql dbpool $ do
- now <- liftIO getCurrentTime
- tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] []
- ticketsWithFirstStation <- flip mapMaybeM tickets
- (\ticket@(Entity ticketId _) -> do
- selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case
- Nothing -> pure Nothing
- Just (Entity _ stop) -> do
- station <- getJust (stopStation stop)
- tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopDeparture stop))
- pure (Just (ticket, station, stop, tzseries)))
-
- if null ticketsWithFirstStation then pure Nothing else do
- let (closestTicket, _, _, _) = minimumBy
- -- (compare `on` euclid trainPingGeopos . stationGeopos . snd)
- (compare `on`
- (\(Entity _ ticket, station, stop, tzseries) ->
- let
- runningDay = ticketDay ticket
- spaceDistance = euclid trainPingGeopos (stationGeopos station)
- timeDiff =
- GTFS.toSeconds (stopDeparture stop) tzseries runningDay
- - utcToSeconds now runningDay
- in
- euclid trainPingGeopos (stationGeopos station)
- + abs (seconds2Double timeDiff / 3600)))
- ticketsWithFirstStation
- logInfoN
- $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
- " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
- " (trip "+|ticketTripName (entityVal closestTicket)|+")."
-
- update trainPingToken
- [TrackerCurrentTicket =. Just (entityKey closestTicket)]
-
- pure (Just (entityKey closestTicket))
-
- ticketId <- case maybeTicketId of
+ unless (serverConfigDebugMode cfg) $ do
+ now <- liftIO getCurrentTime
+ let timeDiff = sentPingTimestamp `diffUTCTime` now
+ when (utctDay sentPingTimestamp /= utctDay now) $ do
+ logErrorN "received ping for wrong day"
+ throw err400
+ when (timeDiff < 10) $ do
+ logWarnN "received ping more than 10 seconds out of date"
+ throw err400
+ when (timeDiff > 10) $ do
+ logWarnN "received ping from more than 10 seconds in the future"
+ throw err400
+
+ ticketId <- case trackerCurrentTicket of
Just ticketId -> pure ticketId
- Nothing -> do
- logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
- " sent a ping, but no trips are running today."
- throwError err400
+ -- if the tracker is not associated with a ticket, it is probably new
+ -- & should be auto-associated with the most fitting current ticket
+ Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case
+ Just ticketId -> pure ticketId
+ Nothing -> do
+ logWarnN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ " sent a ping, but no trips are running today."
+ throwError err400
+
+
+ runSql dbpool $ insertSentPing subscribers cfg ping tracker ticketId
+
+insertSentPing
+ :: ServerState
+ -> ServerConfig
+ -> SentPing
+ -> Tracker
+ -> TicketId
+ -> InSql (Maybe TrainAnchor)
+insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId = do
+ ticket@Ticket{..} <- getJust ticketId
+
+ stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
+ >>= mapM (\stop -> do
+ station <- getJust (stopStation (entityVal stop))
+ tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopArrival (entityVal stop)))
+ pure (entityVal stop, station, tzseries))
+ <&> V.fromList
+
+ shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex]
+ <&> (V.fromList . fmap entityVal)
+
+
+ let anchor = extrapolateAnchorFromPing LinearExtrapolator
+ ticketId ticket stations shapePoints ping
- runSql dbpool $ do
- ticket@Ticket{..} <- getJust ticketId
- stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
- >>= mapM (\stop -> do
- station <- getJust (stopStation (entityVal stop))
- tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopArrival (entityVal stop)))
- pure (entityVal stop, station, tzseries))
- <&> V.fromList
+ maybeReassign <- selectFirst
+ [ TrainPingTicket ==. ticketId ]
+ [ Desc TrainPingTimestamp ]
+ <&> find (\ping -> trainPingSequence (entityVal ping) > trainAnchorSequence anchor)
+ >> guessTicketFromPing cfg ping
+ <&> find (/= ticketId)
- shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex]
- <&> (V.fromList . fmap entityVal)
- let anchor = extrapolateAnchorFromPing LinearExtrapolator
- ticketId ticket stations shapePoints ping
+ -- mapM (\newTicketId -> if ticketId /= newTicketId then Just newTicketId else Nothing))
+ -- >>= (\ping -> guessTicketFromPing cfg ping >>= \case
+ -- Just newTicketId | ticketId /= newTicketId -> pure (Just newTicketId)
+ -- _ -> pure Nothing)
- insert ping
+ case maybeReassign of
+ Just newTicketId -> do
+ update sentPingToken
+ [TrackerCurrentTicket =. Just newTicketId ]
+ logInfoN $ "tracker "+|UUID.toText (coerce sentPingToken)|+
+ "has switched direction, and was reassigned to ticket "
+ +|UUID.toText (coerce newTicketId)|+"."
+ insertSentPing subscribers cfg ping tracker newTicketId
+ Nothing -> do
+ let trackedPing = TrainPing
+ { trainPingToken = sentPingToken
+ , trainPingGeopos = sentPingGeopos
+ , trainPingTimestamp = sentPingTimestamp
+ , trainPingSequence = trainAnchorSequence anchor
+ , trainPingTicket = ticketId
+ }
+
+ insert trackedPing
last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen]
-- only insert new estimates if they've actually changed anything
@@ -158,18 +182,18 @@ handleTrainPing dbpool subscribers cfg onError ping@TrainPing{..} =
& (\(stop, _, _) -> stopSequence stop)
& fromIntegral
when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do
- update trainPingToken
+ update sentPingToken
[TrackerCurrentTicket =. Nothing]
update ticketId
[TicketCompleted =. True]
- logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ logInfoN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
" has completed ticket "+|UUID.toString (coerce ticketId)|+
" (trip "+|ticketTripName|+")"
queues <- liftIO $ atomically $ do
queues <- readTVar subscribers <&> M.lookup (coerce ticketId)
whenJust queues $
- mapM_ (\q -> writeTQueue q (Just ping))
+ mapM_ (\q -> writeTQueue q (Just trackedPing))
pure queues
pure (Just anchor)
@@ -196,6 +220,46 @@ handleWS dbpool subscribers cfg Metrics{..} conn = do
Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor)
Nothing -> pure ()
+
+guessTicketFromPing :: ServerConfig -> SentPing -> InSql (Maybe (Key Ticket))
+guessTicketFromPing cfg SentPing{..} = do
+ tickets <- selectList [ TicketDay ==. utctDay sentPingTimestamp, TicketCompleted ==. False ] []
+
+ ticketsWithStation <- forM tickets (\ticket@(Entity ticketId _) -> do
+ stops <- selectList [StopTicket ==. ticketId] [Asc StopSequence] >>= mapM (\(Entity _ stop) -> do
+ station <- getJust (stopStation stop)
+ tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopDeparture stop))
+ pure (station, stop, tzseries))
+ pure (ticket, stops))
+
+ if null ticketsWithStation then pure Nothing else do
+ let (closestTicket, _) = ticketsWithStation
+ & minimumBy (compare `on` (\(Entity _ ticket, stations) ->
+ let
+ runningDay = ticketDay ticket
+ smallestDistance = stations
+ <&> (\(station, stop, tzseries) -> spaceAndTimeDiff
+ (sentPingGeopos, utcToSeconds sentPingTimestamp runningDay)
+ (stationGeopos station, GTFS.toSeconds (stopDeparture stop) tzseries runningDay))
+ & minimum
+ in smallestDistance))
+
+ logInfoN
+ $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
+ " (trip "+|ticketTripName (entityVal closestTicket)|+")."
+
+ update sentPingToken
+ [TrackerCurrentTicket =. Just (entityKey closestTicket)]
+
+ pure (Just (entityKey closestTicket))
+
+spaceAndTimeDiff :: (Geopos, GTFS.Seconds) -> (Geopos, GTFS.Seconds) -> Double
+spaceAndTimeDiff (pos1, time1) (pos2, time2) =
+ spaceDistance + abs (seconds2Double timeDiff / 3600)
+ where spaceDistance = euclid pos1 pos2
+ timeDiff = time1 - time2
+
-- TODO: proper debug logging for expired tokens
isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker)
isTokenValid dbpool token = runSql dbpool $ get token >>= \case