aboutsummaryrefslogtreecommitdiff
path: root/lib/Server/GTFS_RT.hs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Server/GTFS_RT.hs')
-rw-r--r--lib/Server/GTFS_RT.hs446
1 files changed, 172 insertions, 274 deletions
diff --git a/lib/Server/GTFS_RT.hs b/lib/Server/GTFS_RT.hs
index 70f3c63..cfb02ce 100644
--- a/lib/Server/GTFS_RT.hs
+++ b/lib/Server/GTFS_RT.hs
@@ -1,113 +1,60 @@
-{-# LANGUAGE DataKinds #-}
-{-# LANGUAGE NamedFieldPuns #-}
-{-# LANGUAGE OverloadedLists #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE RecordWildCards #-}
-{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedLists #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PartialTypeSignatures #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TupleSections #-}
module Server.GTFS_RT (gtfsRealtimeServer) where
-import Prelude hiding
- (id)
-
-import API (GtfsRealtimeAPI)
-import Control.Monad (forM)
-import Control.Monad.IO.Class (MonadIO (..))
-import qualified Data.ByteString.Char8 as C8
-import Data.ByteString.Lazy (fromStrict)
-import Data.Functor ((<&>))
-import qualified Data.Map as M
-import Data.Maybe (catMaybes,
- mapMaybe)
-import Data.Pool (Pool)
-import Data.Sequence (Seq)
-import qualified Data.Sequence as Seq
-import Data.Text (Text)
-import qualified Data.Text as T
-import Data.Text.Encoding (encodeUtf8)
-import Data.Time.Calendar (Day,
- toGregorian)
-import Data.Time.Clock (UTCTime (utctDay),
- addUTCTime,
- getCurrentTime)
-import Data.Time.Clock.POSIX (getPOSIXTime)
-import Data.Time.Clock.System (SystemTime (systemSeconds),
- getSystemTime,
- utcToSystemTime)
-import qualified Data.UUID as UUID
-import Data.Word (Word64)
-import Database.Persist (Entity (..),
- PersistQueryRead (selectFirst),
- selectList,
- (==.))
-import Database.Persist.Postgresql (SqlBackend)
-import GHC.Float (double2Float,
- int2Double)
-import GTFS (Depth (..),
- GTFS (..),
- Seconds (..),
- Stop (..),
- Trip (..),
- TripID,
- showTimeWithSeconds,
- stationId,
- toSeconds,
- toUTC,
- tripsOnDay)
-import GTFS.Realtime.Alert as AL (Alert (..))
-import GTFS.Realtime.Alert.SeverityLevel (SeverityLevel (WARNING))
-import GTFS.Realtime.EntitySelector as ES (EntitySelector (..))
-import GTFS.Realtime.FeedEntity as FE (FeedEntity (..))
-import GTFS.Realtime.FeedHeader (FeedHeader (FeedHeader))
-import GTFS.Realtime.FeedHeader.Incrementality (Incrementality (FULL_DATASET))
-import GTFS.Realtime.FeedMessage as FM (FeedMessage (..))
-import GTFS.Realtime.Position as POS (Position (..))
-import GTFS.Realtime.TimeRange (TimeRange (TimeRange))
-import GTFS.Realtime.TranslatedString (TranslatedString (TranslatedString))
-import GTFS.Realtime.TranslatedString.Translation (Translation (Translation))
-import GTFS.Realtime.TripDescriptor as TD (TripDescriptor (..))
-import qualified GTFS.Realtime.TripDescriptor.ScheduleRelationship as TSR
-import GTFS.Realtime.TripUpdate as TU (TripUpdate (..))
-import GTFS.Realtime.TripUpdate.StopTimeEvent as STE (StopTimeEvent (..))
-import GTFS.Realtime.TripUpdate.StopTimeUpdate as STU (StopTimeUpdate (..))
-import qualified GTFS.Realtime.TripUpdate.StopTimeUpdate.ScheduleRelationship as SR
-import GTFS.Realtime.VehicleDescriptor as VD (VehicleDescriptor (..))
-import GTFS.Realtime.VehiclePosition as VP (VehiclePosition (..))
-import Persist (Announcement (..),
- EntityField (..),
- Key (..),
- Running (..),
- Token (..),
- TrainAnchor (..),
- TrainPing (..),
- runSql)
-import Servant.API ((:<|>) (..))
-import Text.ProtocolBuffers (Utf8 (Utf8),
- defaultValue)
-
-import Control.Monad.Extra (mapMaybeM)
-import Data.List.NonEmpty (nonEmpty)
-import Data.Time.Format.ISO8601 (ISO8601 (iso8601Format),
- iso8601Show)
-import Data.UUID (toASCIIBytes,
- toLazyASCIIBytes)
-import qualified Data.Vector as V
-import Extrapolation (Extrapolator (extrapolateAtPosition, extrapolateAtSeconds),
- LinearExtrapolator (..))
-import GTFS.Realtime.TripUpdate (TripUpdate (TripUpdate))
-import Server.Util (Service,
- secondsNow)
-
-uuidUtf8 :: UUID.UUID -> Utf8
-uuidUtf8 = Utf8 . fromStrict . UUID.toASCIIBytes
-
-toUtf8 :: Text -> Utf8
-toUtf8 = Utf8 . fromStrict . encodeUtf8
+import API (GtfsRealtimeAPI)
+import Control.Lens ((&), (.~))
+import Control.Monad (forM)
+import Control.Monad.Extra (mapMaybeM)
+import Control.Monad.IO.Class (MonadIO (..))
+import Data.Functor ((<&>))
+import Data.List.NonEmpty (NonEmpty, nonEmpty)
+import qualified Data.Map as M
+import Data.Maybe (catMaybes, mapMaybe)
+import Data.Pool (Pool)
+import Data.ProtoLens (defMessage)
+import Data.Text (Text)
+import qualified Data.Text as T
+import Data.Time.Calendar (Day, toGregorian)
+import Data.Time.Clock (UTCTime (utctDay), addUTCTime,
+ getCurrentTime)
+import Data.Time.Clock.System (SystemTime (systemSeconds),
+ getSystemTime, utcToSystemTime)
+import Data.Time.Format.ISO8601 (iso8601Show)
+import Data.UUID (toASCIIBytes, toLazyASCIIBytes)
+import qualified Data.UUID as UUID
+import qualified Data.Vector as V
+import Database.Persist (Entity (..),
+ PersistQueryRead (selectFirst),
+ selectList, (==.))
+import Database.Persist.Postgresql (SqlBackend)
+import Extrapolation (Extrapolator (extrapolateAtPosition, extrapolateAtSeconds),
+ LinearExtrapolator (..))
+import GHC.Float (double2Float, int2Double)
+import GTFS (Depth (..), GTFS (..),
+ Seconds (..), Stop (..),
+ Trip (..), TripID,
+ showTimeWithSeconds, stationId,
+ toSeconds, toUTC, tripsOnDay)
+import Persist (Announcement (..),
+ EntityField (..), Key (..),
+ Running (..), Token (..),
+ TrainAnchor (..), TrainPing (..),
+ runSql)
+import qualified Proto.GtfsRealtime as RT
+import qualified Proto.GtfsRealtime_Fields as RT
+import Servant.API ((:<|>) (..))
+import Server.Util (Service, secondsNow)
-- | formats a day in the "stupid" format used by gtfs realtime
-toStupidDate :: Day -> Utf8
-toStupidDate date = toUtf8
- $ pad 4 year <> pad 2 month <> pad 2 day
+toStupidDate :: Day -> Text
+toStupidDate date =
+ pad 4 year <> pad 2 month <> pad 2 day
where (year, month, day) = toGregorian date
pad len num = T.pack $ if ndigits < len
then replicate (len - ndigits) '0' <> show num
@@ -119,173 +66,124 @@ toStupidTime :: Num i => UTCTime -> i
toStupidTime = fromIntegral . systemSeconds . utcToSystemTime
gtfsRealtimeServer :: GTFS -> Pool SqlBackend -> Service GtfsRealtimeAPI
-gtfsRealtimeServer gtfs@GTFS{..} dbpool = handleServiceAlerts :<|> handleTripUpdates :<|> handleVehiclePositions
- where handleServiceAlerts = runSql dbpool $ do
- -- TODO filter: only select current & future days
- announcements <- selectList [] []
- dFeedMessage $ Seq.fromList $ fmap mkAlert announcements
- where mkAlert (Entity (AnnouncementKey uuid) Announcement{..}) =
- (dFeedEntity (uuidUtf8 uuid))
- { alert = Just $ Alert
- { active_period = [TimeRange Nothing Nothing defaultValue]
- -- TODO: is this time range reasonable, needed, etc.?
- , informed_entity =
- [dEntitySelector
- { ES.trip =
- Just (dTripDescriptor announcementTrip (Just announcementDay) Nothing)
- }
- ]
- , cause = Nothing
- , effect = Nothing
- , url = fmap (lang "de" . toUtf8) announcementUrl
- , header_text = Just $ lang "de" (toUtf8 announcementHeader)
- , description_text = Just $ lang "de" (toUtf8 announcementMessage)
- , tts_header_text = Nothing
- , tts_description_text = Nothing
- , severity_level = Nothing
- , image = Nothing
- , image_alternative_text = Nothing
- , AL.ext'field = defaultValue
- }
- }
-
-
- handleTripUpdates = runSql dbpool $ do
- today <- liftIO $ getCurrentTime <&> utctDay
- nowSeconds <- secondsNow today
- let running = M.toList $ tripsOnDay gtfs today
- anchors <- flip mapMaybeM running $ \(tripId, trip@Trip{..}) -> do
- entities <- selectList [TrainAnchorTrip ==. tripId, TrainAnchorDay ==. today] []
- case nonEmpty (fmap entityVal entities) of
- Nothing -> pure Nothing
- Just anchors ->
- pure $ Just (tripId, trip, anchors)
-
- dFeedMessage $ Seq.fromList $ mapMaybe (mkTripUpdate today nowSeconds) anchors
- where mkTripUpdate today nowSeconds (tripId :: Text, Trip{..} :: Trip Deep Deep, anchors) =
- let lastCall = extrapolateAtSeconds LinearExtrapolator anchors nowSeconds
- stations = tripStops
- <&> (\stop@Stop{..} -> (, stop) <$> extrapolateAtPosition LinearExtrapolator anchors (int2Double stopSequence))
- (lastAnchor, lastStop) = V.last (V.catMaybes stations)
- stillRunning = trainAnchorDelay lastAnchor + toSeconds (stopArrival lastStop) tzseries today
- < nowSeconds + 5 * 60
- -- note: these IDs should be stable across iterations, so just do tripId + runningday. TODO: breaks in case of cross-midnight?
- in if not stillRunning then Nothing else Just
- (dFeedEntity (Utf8 $ fromStrict (encodeUtf8 tripId <> "-" <> C8.pack (iso8601Show today))))
- { FE.trip_update = Just $ TripUpdate
- { TU.trip = dTripDescriptor tripId (Just today) (Just $ toUtf8 $ T.pack $ showTimeWithSeconds $ stopDeparture $ V.head tripStops) -- TODO will break if cross-midnight train
- , TU.vehicle = Nothing
- , TU.stop_time_update = Seq.fromList
- $ fmap (\(TrainAnchor{..}, Stop{..}) -> StopTimeUpdate
- { STU.stop_sequence = Just (fromIntegral stopSequence)
- , STU.stop_id = Just (toUtf8 $ stationId stopStation)
- , STU.arrival = Just (
- defaultValue
- { STE.delay = Just $ fromIntegral $ unSeconds trainAnchorDelay
- , STE.time = Just $ toStupidTime (addUTCTime (fromIntegral $ unSeconds trainAnchorDelay) (toUTC stopArrival tzseries today))
- , STE.uncertainty = Just 60 })
- , STU.departure = Just (
- defaultValue
- { STE.delay = Just $ fromIntegral $ unSeconds trainAnchorDelay
- , STE.time = Just $ toStupidTime (addUTCTime (fromIntegral $ unSeconds trainAnchorDelay) (toUTC stopDeparture tzseries today))
- , STE.uncertainty = Just 60 })
- , STU.departure_occupancy_status = Nothing
- , STU.schedule_relationship = Just SR.SCHEDULED
- , STU.stop_time_properties = Nothing
- , STU.ext'field = defaultValue
- })
- $ catMaybes $ V.toList stations
- , TU.delay = Nothing -- lastCall <&> (fromIntegral . unSeconds . trainAnchorDelay)
- , TU.timestamp = lastCall <&> (toStupidTime . trainAnchorCreated)
- , TU.trip_properties = Nothing
- , TU.ext'field = defaultValue
- }
- }
-
- handleVehiclePositions = runSql dbpool $ do
- (running :: [Entity Running]) <- selectList [] []
- pings <- forM running $ \(Entity key entity) -> do
- selectFirst [TrainPingToken ==. key] [] <&> fmap (, entity)
- dFeedMessage $ Seq.fromList $ mkPosition <$> catMaybes pings
- where mkPosition (Entity (TrainPingKey key) TrainPing{..}, Running{..}) =
- (dFeedEntity (toUtf8 . T.pack . show $ key))
- { FE.vehicle = Just $ VehiclePosition
- { trip = Just (dTripDescriptor runningTrip Nothing Nothing)
- , VP.vehicle = case runningVehicle of
- Nothing -> Nothing
- Just trainset -> Just $ VehicleDescriptor
- { VD.id = Nothing
- , VD.label = Just (toUtf8 trainset)
- , VD.license_plate = Nothing
- , VD.ext'field = defaultValue
- }
- , position = Just $ Position
- { latitude = double2Float trainPingLat
- , longitude = double2Float trainPingLong
- , odometer = Nothing
- , speed = Nothing
- , bearing = Nothing
- , POS.ext'field = defaultValue
- }
- -- TODO: at least one of these should probably be given
- , current_stop_sequence = Nothing
- , stop_id = Nothing
- , current_status = Nothing
- , timestamp = Just (toStupidTime trainPingTimestamp)
- , congestion_level = Nothing
- , occupancy_status = Nothing
- , occupancy_percentage = Nothing
- , multi_carriage_details = []
- , VP.ext'field = defaultValue
- }
- }
-
-
-lang :: Utf8 -> Utf8 -> TranslatedString
-lang code msg = TranslatedString [Translation msg (Just code) defaultValue] defaultValue
-
--- | a default FeedMessage, issued at the current system time
--- TODO: do we ever need incremental updates?
--- TODO: maybe instead use last update time?
-dFeedMessage :: MonadIO m => Seq FeedEntity -> m FeedMessage
-dFeedMessage entities = do
+gtfsRealtimeServer gtfs@GTFS{..} dbpool =
+ handleServiceAlerts :<|> handleTripUpdates :<|> handleVehiclePositions
+ where
+ handleServiceAlerts = runSql dbpool $ do
+ announcements <- selectList [] []
+ defFeedMessage (fmap mkAlert announcements)
+
+ where
+ mkAlert :: Entity Announcement -> RT.FeedEntity
+ mkAlert (Entity (AnnouncementKey uuid) Announcement{..}) =
+ defMessage
+ & RT.id .~ UUID.toText uuid
+ & RT.alert .~ (defMessage
+ & RT.activePeriod .~ [ defMessage :: RT.TimeRange ]
+ & RT.informedEntity .~ [ defMessage
+ & RT.trip .~ defTripDescriptor announcementTrip (Just announcementDay) Nothing
+ ]
+ & RT.maybe'url .~ fmap (monolingual "de") announcementUrl
+ & RT.headerText .~ monolingual "de" announcementHeader
+ & RT.descriptionText .~ monolingual "de" announcementMessage
+ )
+
+ handleTripUpdates = runSql dbpool $ do
+ today <- liftIO $ getCurrentTime <&> utctDay
+ nowSeconds <- secondsNow today
+ let running = M.toList (tripsOnDay gtfs today)
+ anchors <- flip mapMaybeM running $ \(tripId, trip@Trip{..}) -> do
+ entities <- selectList [TrainAnchorTrip ==. tripId, TrainAnchorDay ==. today] []
+ case nonEmpty (fmap entityVal entities) of
+ Nothing -> pure Nothing
+ Just anchors -> pure $ Just (tripId, trip, anchors)
+
+ defFeedMessage (mapMaybe (mkTripUpdate today nowSeconds) anchors)
+ where
+ mkTripUpdate :: Day -> Seconds -> (Text, Trip 'Deep 'Deep, NonEmpty TrainAnchor) -> Maybe RT.FeedEntity
+ mkTripUpdate today nowSeconds (tripId :: Text, Trip{..} :: Trip Deep Deep, anchors) =
+ let lastCall = extrapolateAtSeconds LinearExtrapolator anchors nowSeconds
+ stations = tripStops
+ <&> (\stop@Stop{..} -> (, stop)
+ <$> extrapolateAtPosition LinearExtrapolator anchors (int2Double stopSequence))
+ (lastAnchor, lastStop) = V.last (V.catMaybes stations)
+ stillRunning = trainAnchorDelay lastAnchor + toSeconds (stopArrival lastStop) tzseries today
+ < nowSeconds + 5 * 60
+ in if not stillRunning then Nothing else Just $ defMessage
+ & RT.id .~ (tripId <> "-" <> T.pack (iso8601Show today))
+ & RT.tripUpdate .~ (defMessage
+ & RT.trip .~ defTripDescriptor tripId (Just today) (Just $ T.pack (showTimeWithSeconds $ stopDeparture $ V.head tripStops))
+ & RT.stopTimeUpdate .~ fmap mkStopTimeUpdate (catMaybes $ V.toList stations)
+ & RT.maybe'delay .~ Nothing -- lastCall <&> (fromIntegral . unSeconds . trainAnchorDelay)
+ & RT.maybe'timestamp .~ fmap (toStupidTime . trainAnchorCreated) lastCall
+ )
+ where
+ mkStopTimeUpdate :: (TrainAnchor, Stop Deep) -> RT.TripUpdate'StopTimeUpdate
+ mkStopTimeUpdate (TrainAnchor{..}, Stop{..}) = defMessage
+ & RT.stopSequence .~ fromIntegral stopSequence
+ & RT.stopId .~ stationId stopStation
+ & RT.arrival .~ (defMessage
+ & RT.delay .~ fromIntegral (unSeconds trainAnchorDelay)
+ & RT.time .~ toStupidTime (addUTCTime
+ (fromIntegral $ unSeconds trainAnchorDelay)
+ (toUTC stopArrival tzseries today))
+ & RT.uncertainty .~ 60
+ )
+ & RT.departure .~ (defMessage
+ & RT.delay .~ fromIntegral (unSeconds trainAnchorDelay)
+ & RT.time .~ toStupidTime (addUTCTime
+ (fromIntegral $ unSeconds trainAnchorDelay)
+ (toUTC stopDeparture tzseries today))
+ & RT.uncertainty .~ 60
+ )
+ & RT.scheduleRelationship .~ RT.TripUpdate'StopTimeUpdate'SCHEDULED
+
+ handleVehiclePositions = runSql dbpool $ do
+ (running :: [Entity Running]) <- selectList [] []
+ pings <- forM running $ \(Entity key entity) -> do
+ selectFirst [TrainPingToken ==. key] [] <&> fmap (, entity)
+ defFeedMessage (mkPosition <$> catMaybes pings)
+
+ where
+ mkPosition :: (Entity TrainPing, Running) -> RT.FeedEntity
+ mkPosition (Entity (TrainPingKey key) TrainPing{..}, Running{..}) = defMessage
+ & RT.id .~ T.pack (show key)
+ & RT.vehicle .~ (defMessage
+ & RT.trip .~ defTripDescriptor runningTrip Nothing Nothing
+ & RT.maybe'vehicle .~ case runningVehicle of
+ Nothing -> Nothing
+ Just trainset -> Just $ defMessage
+ & RT.label .~ trainset
+ & RT.position .~ (defMessage
+ & RT.latitude .~ double2Float trainPingLat
+ & RT.longitude .~ double2Float trainPingLong
+ )
+ -- TODO: should probably give currentStopSequence/stopId here as well
+ & RT.timestamp .~ toStupidTime trainPingTimestamp
+ )
+
+
+monolingual :: Text -> Text -> RT.TranslatedString
+monolingual code msg = defMessage & RT.translation .~ [
+ defMessage
+ & RT.text .~ msg
+ & RT.language .~ code
+ ]
+
+defFeedMessage :: MonadIO m => [RT.FeedEntity] -> m RT.FeedMessage
+defFeedMessage entities = do
now <- liftIO getSystemTime <&> systemSeconds
- pure $ FeedMessage
- { header = FeedHeader "2.0" (Just FULL_DATASET) (Just $ fromIntegral now) defaultValue
- , entity = entities
- , FM.ext'field = defaultValue
- }
-
--- | a dummy FeedEntity (use record updates to add meaningful values to this)
-dFeedEntity :: Utf8 -> FeedEntity
-dFeedEntity id = FeedEntity
- { id
- , is_deleted = Nothing
- , trip_update = Nothing
- , vehicle = Nothing
- , alert = Nothing
- , shape = Nothing
- , FE.ext'field = defaultValue
- }
-
-dEntitySelector :: EntitySelector
-dEntitySelector = EntitySelector
- { agency_id = Nothing
- , route_id = Nothing
- , route_type = Nothing
- , trip = Nothing
- , stop_id = Nothing
- , direction_id = Nothing
- , ES.ext'field = defaultValue
- }
-
-dTripDescriptor :: TripID -> Maybe Day -> Maybe Utf8 -> TripDescriptor
-dTripDescriptor tripID day starttime = TripDescriptor
- { trip_id = Just (toUtf8 tripID)
- , route_id = Nothing
- , direction_id = Nothing
- , start_time = starttime
- , start_date = fmap toStupidDate day
- , schedule_relationship = Just TSR.SCHEDULED
- , TD.ext'field = defaultValue
- }
+ pure $ defMessage
+ & RT.header .~ (defMessage
+ & RT.gtfsRealtimeVersion .~ "2.0"
+ & RT.incrementality .~ RT.FeedHeader'FULL_DATASET
+ & RT.timestamp .~ fromIntegral now
+ )
+ & RT.entity .~ entities
+
+defTripDescriptor :: TripID -> Maybe Day -> Maybe Text -> RT.TripDescriptor
+defTripDescriptor tripId day starttime = defMessage
+ & RT.tripId .~ tripId
+ & RT.scheduleRelationship .~ RT.TripDescriptor'SCHEDULED
+ & RT.maybe'startTime .~ starttime
+ & RT.maybe'startDate .~ fmap toStupidDate day