aboutsummaryrefslogtreecommitdiff
path: root/lib/Server/Ingest.hs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/Server/Ingest.hs206
1 files changed, 135 insertions, 71 deletions
diff --git a/lib/Server/Ingest.hs b/lib/Server/Ingest.hs
index d774017..6a8383f 100644
--- a/lib/Server/Ingest.hs
+++ b/lib/Server/Ingest.hs
@@ -3,12 +3,15 @@
module Server.Ingest (handleTrackerRegister, handleTrainPing, handleWS) where
import API (Metrics (..),
- RegisterJson (..))
+ RegisterJson (..),
+ SentPing (..))
import Control.Concurrent.STM (atomically, readTVar,
writeTQueue)
-import Control.Monad (forever, void, when)
+import Control.Monad (forM, forever, unless,
+ void, when)
import Control.Monad.Catch (handle)
-import Control.Monad.Extra (ifM, mapMaybeM, whenJust)
+import Control.Monad.Extra (ifM, mapMaybeM, whenJust,
+ whenJustM)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Logger (LoggingT, logInfoN,
logWarnN)
@@ -23,6 +26,7 @@ import Data.Text (Text)
import Data.Text.Encoding (decodeASCII, decodeUtf8)
import Data.Time (NominalDiffTime,
UTCTime (..), addUTCTime,
+ diffUTCTime,
getCurrentTime,
nominalDay)
import qualified Data.Vector as V
@@ -38,10 +42,12 @@ import Server.Util (ServiceM, getTzseries,
utcToSeconds)
import Config (LoggingConfig,
- ServerConfig)
+ ServerConfig (..))
+import Control.Exception (throw)
+import Control.Monad.Logger.CallStack (logErrorN)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (toStrict)
-import Data.Foldable (minimumBy)
+import Data.Foldable (find, minimumBy)
import Data.Function (on, (&))
import qualified Data.Text as T
import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries)
@@ -49,12 +55,12 @@ import qualified Data.UUID as UUID
import Extrapolation (Extrapolator (..),
LinearExtrapolator (..),
euclid)
+import GHC.Generics (Generic)
import GTFS (seconds2Double)
import Prometheus (decGauge, incGauge)
import Server.Base (ServerState)
-
handleTrackerRegister
:: Pool SqlBackend
-> RegisterJson
@@ -74,78 +80,96 @@ handleTrainPing
-> ServerState
-> ServerConfig
-> LoggingT (ReaderT LoggingConfig Handler) a
- -> TrainPing
+ -> SentPing
-> LoggingT (ReaderT LoggingConfig Handler) (Maybe TrainAnchor)
-handleTrainPing dbpool subscribers cfg onError ping@TrainPing{..} =
- isTokenValid dbpool trainPingToken >>= \case
+handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} =
+ isTokenValid dbpool sentPingToken >>= \case
Nothing -> onError >> pure Nothing
Just tracker@Tracker{..} -> do
- -- if the tracker is not associated with a ticket, it is probably
- -- just starting out on a new trip, or has finished an old one.
- maybeTicketId <- case trackerCurrentTicket of
- Just ticketId -> pure (Just ticketId)
- Nothing -> runSql dbpool $ do
- now <- liftIO getCurrentTime
- tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] []
- ticketsWithFirstStation <- flip mapMaybeM tickets
- (\ticket@(Entity ticketId _) -> do
- selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case
- Nothing -> pure Nothing
- Just (Entity _ stop) -> do
- station <- getJust (stopStation stop)
- tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopDeparture stop))
- pure (Just (ticket, station, stop, tzseries)))
-
- if null ticketsWithFirstStation then pure Nothing else do
- let (closestTicket, _, _, _) = minimumBy
- -- (compare `on` euclid trainPingGeopos . stationGeopos . snd)
- (compare `on`
- (\(Entity _ ticket, station, stop, tzseries) ->
- let
- runningDay = ticketDay ticket
- spaceDistance = euclid trainPingGeopos (stationGeopos station)
- timeDiff =
- GTFS.toSeconds (stopDeparture stop) tzseries runningDay
- - utcToSeconds now runningDay
- in
- euclid trainPingGeopos (stationGeopos station)
- + abs (seconds2Double timeDiff / 3600)))
- ticketsWithFirstStation
- logInfoN
- $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
- " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
- " (trip "+|ticketTripName (entityVal closestTicket)|+")."
-
- update trainPingToken
- [TrackerCurrentTicket =. Just (entityKey closestTicket)]
-
- pure (Just (entityKey closestTicket))
-
- ticketId <- case maybeTicketId of
+ unless (serverConfigDebugMode cfg) $ do
+ now <- liftIO getCurrentTime
+ let timeDiff = sentPingTimestamp `diffUTCTime` now
+ when (utctDay sentPingTimestamp /= utctDay now) $ do
+ logErrorN "received ping for wrong day"
+ throw err400
+ when (timeDiff < 10) $ do
+ logWarnN "received ping more than 10 seconds out of date"
+ throw err400
+ when (timeDiff > 10) $ do
+ logWarnN "received ping from more than 10 seconds in the future"
+ throw err400
+
+ ticketId <- case trackerCurrentTicket of
Just ticketId -> pure ticketId
- Nothing -> do
- logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
- " sent a ping, but no trips are running today."
- throwError err400
+ -- if the tracker is not associated with a ticket, it is probably new
+ -- & should be auto-associated with the most fitting current ticket
+ Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case
+ Just ticketId -> pure ticketId
+ Nothing -> do
+ logWarnN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ " sent a ping, but no trips are running today."
+ throwError err400
+
+
+ runSql dbpool $ insertSentPing subscribers cfg ping tracker ticketId
+
+insertSentPing
+ :: ServerState
+ -> ServerConfig
+ -> SentPing
+ -> Tracker
+ -> TicketId
+ -> InSql (Maybe TrainAnchor)
+insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId = do
+ ticket@Ticket{..} <- getJust ticketId
+
+ stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
+ >>= mapM (\stop -> do
+ station <- getJust (stopStation (entityVal stop))
+ tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopArrival (entityVal stop)))
+ pure (entityVal stop, station, tzseries))
+ <&> V.fromList
+
+ shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex]
+ <&> (V.fromList . fmap entityVal)
+
+
+ let anchor = extrapolateAnchorFromPing LinearExtrapolator
+ ticketId ticket stations shapePoints ping
- runSql dbpool $ do
- ticket@Ticket{..} <- getJust ticketId
- stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
- >>= mapM (\stop -> do
- station <- getJust (stopStation (entityVal stop))
- tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopArrival (entityVal stop)))
- pure (entityVal stop, station, tzseries))
- <&> V.fromList
+ maybeReassign <- selectFirst
+ [ TrainPingTicket ==. ticketId ]
+ [ Desc TrainPingTimestamp ]
+ <&> find (\ping -> trainPingSequence (entityVal ping) > trainAnchorSequence anchor)
+ >> guessTicketFromPing cfg ping
+ <&> find (/= ticketId)
- shapePoints <- selectList [ShapePointShape ==. ticketShape] [Asc ShapePointIndex]
- <&> (V.fromList . fmap entityVal)
- let anchor = extrapolateAnchorFromPing LinearExtrapolator
- ticketId ticket stations shapePoints ping
+ -- mapM (\newTicketId -> if ticketId /= newTicketId then Just newTicketId else Nothing))
+ -- >>= (\ping -> guessTicketFromPing cfg ping >>= \case
+ -- Just newTicketId | ticketId /= newTicketId -> pure (Just newTicketId)
+ -- _ -> pure Nothing)
- insert ping
+ case maybeReassign of
+ Just newTicketId -> do
+ update sentPingToken
+ [TrackerCurrentTicket =. Just newTicketId ]
+ logInfoN $ "tracker "+|UUID.toText (coerce sentPingToken)|+
+ "has switched direction, and was reassigned to ticket "
+ +|UUID.toText (coerce newTicketId)|+"."
+ insertSentPing subscribers cfg ping tracker newTicketId
+ Nothing -> do
+ let trackedPing = TrainPing
+ { trainPingToken = sentPingToken
+ , trainPingGeopos = sentPingGeopos
+ , trainPingTimestamp = sentPingTimestamp
+ , trainPingSequence = trainAnchorSequence anchor
+ , trainPingTicket = ticketId
+ }
+
+ insert trackedPing
last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen]
-- only insert new estimates if they've actually changed anything
@@ -158,18 +182,18 @@ handleTrainPing dbpool subscribers cfg onError ping@TrainPing{..} =
& (\(stop, _, _) -> stopSequence stop)
& fromIntegral
when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do
- update trainPingToken
+ update sentPingToken
[TrackerCurrentTicket =. Nothing]
update ticketId
[TicketCompleted =. True]
- logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ logInfoN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
" has completed ticket "+|UUID.toString (coerce ticketId)|+
" (trip "+|ticketTripName|+")"
queues <- liftIO $ atomically $ do
queues <- readTVar subscribers <&> M.lookup (coerce ticketId)
whenJust queues $
- mapM_ (\q -> writeTQueue q (Just ping))
+ mapM_ (\q -> writeTQueue q (Just trackedPing))
pure queues
pure (Just anchor)
@@ -196,6 +220,46 @@ handleWS dbpool subscribers cfg Metrics{..} conn = do
Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor)
Nothing -> pure ()
+
+guessTicketFromPing :: ServerConfig -> SentPing -> InSql (Maybe (Key Ticket))
+guessTicketFromPing cfg SentPing{..} = do
+ tickets <- selectList [ TicketDay ==. utctDay sentPingTimestamp, TicketCompleted ==. False ] []
+
+ ticketsWithStation <- forM tickets (\ticket@(Entity ticketId _) -> do
+ stops <- selectList [StopTicket ==. ticketId] [Asc StopSequence] >>= mapM (\(Entity _ stop) -> do
+ station <- getJust (stopStation stop)
+ tzseries <- liftIO $ getTzseries cfg (GTFS.tzname (stopDeparture stop))
+ pure (station, stop, tzseries))
+ pure (ticket, stops))
+
+ if null ticketsWithStation then pure Nothing else do
+ let (closestTicket, _) = ticketsWithStation
+ & minimumBy (compare `on` (\(Entity _ ticket, stations) ->
+ let
+ runningDay = ticketDay ticket
+ smallestDistance = stations
+ <&> (\(station, stop, tzseries) -> spaceAndTimeDiff
+ (sentPingGeopos, utcToSeconds sentPingTimestamp runningDay)
+ (stationGeopos station, GTFS.toSeconds (stopDeparture stop) tzseries runningDay))
+ & minimum
+ in smallestDistance))
+
+ logInfoN
+ $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
+ " (trip "+|ticketTripName (entityVal closestTicket)|+")."
+
+ update sentPingToken
+ [TrackerCurrentTicket =. Just (entityKey closestTicket)]
+
+ pure (Just (entityKey closestTicket))
+
+spaceAndTimeDiff :: (Geopos, GTFS.Seconds) -> (Geopos, GTFS.Seconds) -> Double
+spaceAndTimeDiff (pos1, time1) (pos2, time2) =
+ spaceDistance + abs (seconds2Double timeDiff / 3600)
+ where spaceDistance = euclid pos1 pos2
+ timeDiff = time1 - time2
+
-- TODO: proper debug logging for expired tokens
isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker)
isTokenValid dbpool token = runSql dbpool $ get token >>= \case