From 76303ad71e0d7e63cf34a68a81548cb791798f97 Mon Sep 17 00:00:00 2001 From: stuebinm Date: Sat, 10 Sep 2022 23:45:54 +0200 Subject: gtfs realtime: add tripUpdate feed --- lib/Extrapolation.hs | 17 +++------- lib/Server.hs | 6 ++-- lib/Server/ControlRoom.hs | 4 +-- lib/Server/GTFS_RT.hs | 80 +++++++++++++++++++++++++++++++++++++++++------ lib/Server/Util.hs | 30 +++++++++++++----- 5 files changed, 104 insertions(+), 33 deletions(-) diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs index cc77a92..f505e73 100644 --- a/lib/Extrapolation.hs +++ b/lib/Extrapolation.hs @@ -7,7 +7,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RecordWildCards #-} -module Extrapolation (Extrapolator(..), LinearExtrapolator, linearDelay, secondsNow, distanceAlongLine) where +module Extrapolation (Extrapolator(..), LinearExtrapolator, linearDelay, distanceAlongLine) where import Data.Foldable (maximumBy, minimumBy) import Data.Function (on) import Data.List.NonEmpty (NonEmpty) @@ -23,10 +23,12 @@ import GHC.IO (unsafePerformIO) import Conduit (MonadIO (liftIO)) import Data.List (sortBy) import GTFS (Depth (Deep), GTFS (..), Seconds (..), - Shape (..), Stop (..), Time, Trip (..), - seconds2Double, stationGeopos, toSeconds, Station (stationName)) + Shape (..), Station (stationName), + Stop (..), Time, Trip (..), seconds2Double, + stationGeopos, toSeconds) import Persist (Running (..), TrainAnchor (..), TrainPing (..)) +import Server.Util (utcToSeconds) class Extrapolator a where -- | here's a position ping, guess things from that! @@ -123,16 +125,7 @@ distanceAlongLine line p1 p2 = along2 - along1 sumSegments line = snd $ foldl (\(p,a) p' -> (p', a + euclid p p')) (V.head line,0) $ line --- | convert utc time to seconds on a day, with wrap-around --- for trains that cross midnight. -utcToSeconds :: UTCTime -> Day -> Seconds -utcToSeconds time day = - Seconds $ round $ nominalDiffTimeToSeconds $ diffUTCTime time (UTCTime day 0) -secondsNow :: MonadIO m => Day -> m Seconds -secondsNow runningDay = do - now <- liftIO getCurrentTime - pure $ utcToSeconds now runningDay euclid :: Floating f => (f,f) -> (f,f) -> f euclid (x1,y1) (x2,y2) = sqrt (x*x + y*y) diff --git a/lib/Server.hs b/lib/Server.hs index ef5663a..db23932 100644 --- a/lib/Server.hs +++ b/lib/Server.hs @@ -12,9 +12,9 @@ -- Implementation of the API. This module is the main point of the program. module Server (application) where import Control.Monad (forever, unless, void, when) -import Control.Monad.Extra (maybeM, unlessM, whenM, ifM) +import Control.Monad.Extra (ifM, maybeM, unlessM, whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (logWarnN, LoggingT) +import Control.Monad.Logger (LoggingT, logWarnN) import Control.Monad.Reader (forM) import Control.Monad.Trans (lift) import qualified Data.Aeson as A @@ -57,7 +57,7 @@ import Extrapolation (Extrapolator (..), import System.IO.Unsafe import Config (ServerConfig) -import Data.ByteString (ByteString) +import Data.ByteString (ByteString) application :: GTFS -> Pool SqlBackend -> IO Application application gtfs dbpool = do diff --git a/lib/Server/ControlRoom.hs b/lib/Server/ControlRoom.hs index 86f8deb..e5e9b7c 100644 --- a/lib/Server/ControlRoom.hs +++ b/lib/Server/ControlRoom.hs @@ -43,7 +43,7 @@ import Database.Persist.Sql (PersistFieldSql, SqlBackend, import Fmt ((+|), (|+)) import GHC.Float (int2Double) import GHC.Generics (Generic) -import Server.Util (Service) +import Server.Util (Service, secondsNow) import Text.Blaze.Html (ToMarkup (..)) import Text.Blaze.Internal (MarkupM (Empty)) import Text.ProtocolBuffers (Default (defaultValue)) @@ -53,7 +53,7 @@ import Yesod import Yesod.Form import Extrapolation (Extrapolator (..), - LinearExtrapolator, secondsNow) + LinearExtrapolator) import GTFS import Numeric (showFFloat) import Persist diff --git a/lib/Server/GTFS_RT.hs b/lib/Server/GTFS_RT.hs index dfdd1eb..e0bbeda 100644 --- a/lib/Server/GTFS_RT.hs +++ b/lib/Server/GTFS_RT.hs @@ -1,8 +1,10 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} module Server.GTFS_RT (gtfsRealtimeServer) where @@ -13,6 +15,7 @@ import Control.Monad (forM) import Control.Monad.IO.Class (MonadIO (..)) import Data.ByteString.Lazy (fromStrict) import Data.Functor ((<&>)) +import qualified Data.Map as M import Data.Maybe (catMaybes) import Data.Pool (Pool) import Data.Sequence (Seq) @@ -21,19 +24,24 @@ import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Time.Calendar (Day, toGregorian) -import Data.Time.Clock (UTCTime) +import Data.Time.Clock (UTCTime (utctDay), + getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Time.Clock.System (SystemTime (systemSeconds), getSystemTime, utcToSystemTime) import qualified Data.UUID as UUID import Data.Word (Word64) -import Database.Persist (Entity (Entity), +import Database.Persist (Entity (..), PersistQueryRead (selectFirst), selectList, (==.)) import Database.Persist.Postgresql (SqlBackend) -import GHC.Float (double2Float) -import GTFS (GTFS, TripID) +import GHC.Float (double2Float, + int2Double) +import GTFS (GTFS (..), + Seconds (..), + Stop (..), + Trip (..), TripID) import GTFS.Realtime.Alert as AL (Alert (..)) import GTFS.Realtime.Alert.SeverityLevel (SeverityLevel (WARNING)) import GTFS.Realtime.EntitySelector as ES (EntitySelector (..)) @@ -46,19 +54,34 @@ import GTFS.Realtime.TimeRange (TimeRange (TimeRang import GTFS.Realtime.TranslatedString (TranslatedString (TranslatedString)) import GTFS.Realtime.TranslatedString.Translation (Translation (Translation)) import GTFS.Realtime.TripDescriptor as TD (TripDescriptor (..)) +import GTFS.Realtime.TripUpdate as TU (TripUpdate (..)) +import GTFS.Realtime.TripUpdate.StopTimeEvent as STE (StopTimeEvent (..)) +import GTFS.Realtime.TripUpdate.StopTimeUpdate as STU (StopTimeUpdate (..)) import GTFS.Realtime.VehicleDescriptor as VD (VehicleDescriptor (..)) import GTFS.Realtime.VehiclePosition as VP (VehiclePosition (..)) import Persist (Announcement (..), EntityField (..), Key (..), Running (..), + Token (..), + TrainAnchor (..), TrainPing (..), runSql) import Servant.API ((:<|>) (..)) import Text.ProtocolBuffers (Utf8 (Utf8), defaultValue) -import Server.Util (Service) +import Control.Monad.Extra (mapMaybeM) +import Data.List.NonEmpty (nonEmpty) +import Data.UUID (toASCIIBytes, + toLazyASCIIBytes) +import qualified Data.Vector as V +import Extrapolation (Extrapolator (extrapolateAtPosition, extrapolateAtSeconds), + LinearExtrapolator) +import GTFS (Depth (..)) +import GTFS.Realtime.TripUpdate (TripUpdate (TripUpdate)) +import Server.Util (Service, + secondsNow) uuidUtf8 :: UUID.UUID -> Utf8 uuidUtf8 = Utf8 . fromStrict . UUID.toASCIIBytes @@ -81,7 +104,7 @@ toStupidTime :: UTCTime -> Word64 toStupidTime = fromIntegral . systemSeconds . utcToSystemTime gtfsRealtimeServer :: GTFS -> Pool SqlBackend -> Service GtfsRealtimeAPI -gtfsRealtimeServer gtfs dbpool = handleServiceAlerts :<|> handleTripUpdates :<|> handleVehiclePositions +gtfsRealtimeServer gtfs@GTFS{..} dbpool = handleServiceAlerts :<|> handleTripUpdates :<|> handleVehiclePositions where handleServiceAlerts = runSql dbpool $ do -- TODO filter: only select current & future days announcements <- selectList [] [] @@ -110,10 +133,49 @@ gtfsRealtimeServer gtfs dbpool = handleServiceAlerts :<|> handleTripUpdates :<|> , AL.ext'field = defaultValue } } + + handleTripUpdates = runSql dbpool $ do - error "unimplemented!" - -- TODO: how to propagate delay values to next stops? - pure undefined + today <- liftIO $ getCurrentTime <&> utctDay + nowSeconds <- secondsNow today + running <- selectList [RunningDay ==. today] [] + anchors <- flip mapMaybeM running $ \r@(Entity key Running{..}) -> do + entities <- selectList [TrainAnchorTrip ==. runningTrip, TrainAnchorDay ==. today] [] + case nonEmpty (fmap entityVal entities) of + Nothing -> pure Nothing + Just anchors -> case M.lookup runningTrip trips of + Nothing -> pure Nothing + Just trip -> pure (Just (r, trip, anchors)) + + dFeedMessage $ Seq.fromList $ mkTripUpdate nowSeconds <$> anchors + where mkTripUpdate nowSeconds (Entity (RunningKey (Token uuid)) Running{..}, Trip{..} :: Trip Deep Deep, anchors) = + let lastCall = extrapolateAtSeconds @LinearExtrapolator anchors nowSeconds + stations = tripStops + <&> (\stop@Stop{..} -> fmap (, stop) $ extrapolateAtPosition @LinearExtrapolator anchors (int2Double stopSequence)) + in (dFeedEntity (Utf8 $ toLazyASCIIBytes uuid)) + { FE.trip_update = Just $ TripUpdate + { TU.trip = dTripDescriptor runningTrip (Just runningDay) + , TU.vehicle = Nothing + , TU.stop_time_update = Seq.fromList + $ fmap (\(TrainAnchor{..}, Stop{..}) -> StopTimeUpdate + { STU.stop_sequence = Just (fromIntegral stopSequence) + , STU.stop_id = Nothing + , STU.arrival = Just ( + defaultValue { STE.delay = Just $ fromIntegral $ unSeconds $ trainAnchorDelay }) + , STU.departure = Nothing + , STU.departure_occupancy_status = Nothing + , STU.schedule_relationship = Nothing + , STU.stop_time_properties = Nothing + , STU.ext'field = defaultValue + }) + $ catMaybes $ V.toList stations + , TU.delay = lastCall <&> (fromIntegral . unSeconds . trainAnchorDelay) + , TU.timestamp = lastCall <&> (toStupidTime . trainAnchorCreated) + , TU.trip_properties = Nothing + , TU.ext'field = defaultValue + } + } + handleVehiclePositions = runSql dbpool $ do (running :: [Entity Running]) <- selectList [] [] pings <- forM running $ \(Entity key entity) -> do diff --git a/lib/Server/Util.hs b/lib/Server/Util.hs index 5bfba52..4410711 100644 --- a/lib/Server/Util.hs +++ b/lib/Server/Util.hs @@ -1,14 +1,19 @@ {-# LANGUAGE FlexibleContexts #-} -- | mostly the monad the service runs in -module Server.Util (Service, ServiceM, runService, sendErrorMsg) where +module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds) where -import Control.Monad.Logger (LoggingT, runStderrLoggingT) -import qualified Data.Aeson as A -import Data.ByteString (ByteString) -import Data.Text (Text) -import Servant (Handler, ServerError, ServerT, err404, - errBody, errHeaders, throwError) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Logger (LoggingT, runStderrLoggingT) +import qualified Data.Aeson as A +import Data.ByteString (ByteString) +import Data.Text (Text) +import Data.Time (Day, UTCTime (..), diffUTCTime, + getCurrentTime, + nominalDiffTimeToSeconds) +import GTFS (Seconds (..)) +import Servant (Handler, ServerError, ServerT, err404, + errBody, errHeaders, throwError) type ServiceM = LoggingT Handler type Service api = ServerT api ServiceM @@ -19,3 +24,14 @@ runService = runStderrLoggingT sendErrorMsg :: Text -> ServiceM a sendErrorMsg msg = throwError err404 { errBody = A.encode $ A.object ["error" A..= (404 :: Int), "msg" A..= msg] } + +secondsNow :: MonadIO m => Day -> m Seconds +secondsNow runningDay = do + now <- liftIO getCurrentTime + pure $ utcToSeconds now runningDay + +-- | convert utc time to seconds on a day, with wrap-around +-- for trains that cross midnight. +utcToSeconds :: UTCTime -> Day -> Seconds +utcToSeconds time day = + Seconds $ round $ nominalDiffTimeToSeconds $ diffUTCTime time (UTCTime day 0) -- cgit v1.2.3