diff options
Diffstat (limited to '')
-rw-r--r-- | lib/API.hs | 54 | ||||
-rw-r--r-- | lib/Config.hs | 20 | ||||
-rw-r--r-- | lib/Extrapolation.hs | 9 | ||||
-rw-r--r-- | lib/Persist.hs | 38 | ||||
-rw-r--r-- | lib/Server.hs | 179 | ||||
-rw-r--r-- | lib/Server/ControlRoom.hs | 39 | ||||
-rw-r--r-- | lib/Server/GTFS_RT.hs | 15 | ||||
-rw-r--r-- | lib/Server/Util.hs | 49 | ||||
-rw-r--r-- | messages/en.msg | 1 | ||||
-rw-r--r-- | site/tracker.hamlet | 137 | ||||
-rw-r--r-- | tracktrain.cabal | 1 |
11 files changed, 407 insertions, 135 deletions
@@ -39,7 +39,8 @@ import Data.Aeson (FromJSON (..), Value, import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as LB import Data.HashMap.Strict.InsOrd (singleton) -import Data.ProtoLens (Message, encodeMessage) +import Data.ProtoLens (Message (messageName), + encodeMessage) import GHC.Generics (Generic) import GTFS (Depth (Deep), GTFSFile (..), StationID, Trip, TripId, @@ -50,45 +51,24 @@ import Prometheus import Proto.GtfsRealtime (FeedMessage) import Servant.API.ContentTypes (Accept (..)) -newtype RegisterJson = RegisterJson - { registerAgent :: Text } - deriving (Show, Generic) - -instance FromJSON RegisterJson where - parseJSON = genericParseJSON (aesonOptions "register") -instance ToSchema RegisterJson where - declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register") -instance ToSchema Value where - declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty - & type_ ?~ SwaggerObject - --- | The server's API (as it is actually intended). -type API = "timetable" :> Capture "Station Id" StationID :> QueryParam "day" Day :> Get '[JSON] (Map TripId (Trip Deep Deep)) - :<|> "timetable" :> "stops" :> Capture "Date" Day :> Get '[JSON] Value - :<|> "trip" :> Capture "Trip Id" TripId :> Get '[JSON] (Trip Deep Deep) +-- | tracktrain's API +type API = -- ingress API (put this behind BasicAuth?) -- TODO: perhaps require a first ping for registration? - :<|> "train" :> "register" :> Capture "Ticket Id" UUID :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token - -- TODO: perhaps a websocket instead? - :<|> "train" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor) - :<|> "train" :> "ping" :> "ws" :> WebSocket - :<|> "train" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket - -- debug things + "tracker" :> "register" :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token + :<|> "tracker" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor) + :<|> "tracker" :> "ping" :> "ws" :> WebSocket + :<|> "ticket" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket :<|> "debug" :> "pings" :> Get '[JSON] (Map Token [TrainPing]) :<|> "debug" :> "pings" :> Capture "Ticket Id" UUID :> Get '[JSON] [TrainPing] - :<|> "debug" :> "register" :> Capture "Ticket Id" UUID :> Post '[JSON] Token :<|> "gtfs.zip" :> Get '[OctetStream] GTFSFile :<|> "gtfs" :> GtfsRealtimeAPI --- | The API used for publishing gtfs realtime updates type GtfsRealtimeAPI = "servicealerts" :> Get '[Proto] FeedMessage :<|> "tripupdates" :> Get '[Proto] FeedMessage :<|> "vehiclepositions" :> Get '[Proto] FeedMessage --- | The server's API with an additional debug route for accessing the specification --- itself. Split from API to prevent the API documenting the format in which it is --- documented, which would be silly and way to verbose. type CompleteAPI = "api" :> "openapi" :> Get '[JSON] Swagger :<|> "api" :> API @@ -103,6 +83,18 @@ data Metrics = Metrics instance MimeRender OctetStream GTFSFile where mimeRender p (GTFSFile bytes) = mimeRender p bytes +newtype RegisterJson = RegisterJson + { registerAgent :: Text } + deriving (Show, Generic) + +instance FromJSON RegisterJson where + parseJSON = genericParseJSON (aesonOptions "register") +instance ToSchema RegisterJson where + declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register") +instance ToSchema Value where + declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty + & type_ ?~ SwaggerObject + -- TODO write something useful here! (and if it's just "hey this is some websocket thingie") @@ -128,7 +120,7 @@ instance Accept Proto where instance Message msg => MimeRender Proto msg where mimeRender _ = LB.fromStrict . encodeMessage --- TODO: this instance is horrible; ideally it should at least include --- the name of the message type (if at all possible) +-- | Not an ideal instance, hides fields of the protobuf message instance {-# OVERLAPPABLE #-} Message msg => ToSchema msg where - declareNamedSchema _ = declareNamedSchema (Proxy @String) + declareNamedSchema proxy = + pure (NamedSchema (Just (messageName proxy)) mempty) diff --git a/lib/Config.hs b/lib/Config.hs index 2349e66..94fdd28 100644 --- a/lib/Config.hs +++ b/lib/Config.hs @@ -1,10 +1,11 @@ {-# LANGUAGE RecordWildCards #-} -module Config where +module Config (UffdConfig(..), ServerConfig(..), LoggingConfig(..)) where import Conferer (DefaultConfig (configDef)) import Conferer.FromConfig import Conferer.FromConfig.Warp () import Data.ByteString (ByteString) +import Data.Functor ((<&>)) import Data.Text (Text) import GHC.Generics (Generic) import Network.Wai.Handler.Warp (Settings) @@ -24,6 +25,13 @@ data ServerConfig = ServerConfig , serverConfigAssets :: FilePath , serverConfigZoneinfoPath :: FilePath , serverConfigLogin :: UffdConfig + , serverConfigLogging :: LoggingConfig + } deriving (Generic) + +data LoggingConfig = LoggingConfig + { loggingConfigNtfyToken :: Maybe Text + , loggingConfigNtfyTopic :: Text + , loggingConfigHostname :: Text } deriving (Generic) instance FromConfig ServerConfig @@ -36,6 +44,7 @@ instance DefaultConfig ServerConfig where , serverConfigAssets = "./assets" , serverConfigZoneinfoPath = "/etc/zoneinfo/" , serverConfigLogin = configDef + , serverConfigLogging = configDef } instance DefaultConfig UffdConfig where @@ -50,3 +59,12 @@ instance FromConfig UffdConfig where uffdConfigClientSecret <- fetchFromConfig (key /. "clientSecret") config uffdConfigEnable <- fetchFromConfig (key /. "enable") config pure UffdConfig {..} + +instance FromConfig LoggingConfig where + fromConfig key config = LoggingConfig + <$> fetchFromConfig (key /. "ntfyToken") config + <*> fetchFromConfig (key /. "ntfyTopic") config + <*> fetchFromConfig (key /. "name") config + +instance DefaultConfig LoggingConfig where + configDef = LoggingConfig Nothing "tracktrain" "" diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs index 01e5f6f..389d047 100644 --- a/lib/Extrapolation.hs +++ b/lib/Extrapolation.hs @@ -27,8 +27,8 @@ import GTFS (Seconds (..), import Persist (Geopos (..), ShapePoint (shapePointGeopos), Station (..), Stop (..), - Ticket (..), Token (..), - Tracker (..), + Ticket (..), TicketId, + Token (..), Tracker (..), TrainAnchor (..), TrainPing (..)) import Server.Util (utcToSeconds) @@ -40,6 +40,7 @@ class Extrapolator strategy where -- | here's a position ping, guess things from that! extrapolateAnchorFromPing :: strategy + -> TicketId -> Ticket -> V.Vector (Stop, Station, TimeZoneSeries) -> V.Vector ShapePoint @@ -68,9 +69,9 @@ instance Extrapolator LinearExtrapolator where $ NE.filter (\a -> trainAnchorSequence a < positionNow) history where difference status = positionNow - trainAnchorSequence status - extrapolateAnchorFromPing _ Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor + extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor { trainAnchorCreated = trainPingTimestamp - , trainAnchorTicket = trainPingTicket + , trainAnchorTicket = ticketId , trainAnchorWhen = utcToSeconds trainPingTimestamp ticketDay , trainAnchorSequence , trainAnchorDelay diff --git a/lib/Persist.hs b/lib/Persist.hs index 7613fd9..46e5ef4 100644 --- a/lib/Persist.hs +++ b/lib/Persist.hs @@ -21,13 +21,19 @@ import Database.Persist.Sql (PersistFieldSql (..), import Database.Persist.TH import qualified GTFS import PersistOrphans -import Servant (FromHttpApiData (..), +import Servant (FromHttpApiData (..), Handler, ToHttpApiData) -import Conduit (ResourceT) +import Conduit (MonadTrans (lift), MonadUnliftIO, + ResourceT, runResourceT) +import Config (LoggingConfig) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (NoLoggingT) -import Control.Monad.Reader (ReaderT) +import Control.Monad.Logger (LoggingT, MonadLogger, NoLoggingT, + runNoLoggingT, runStderrLoggingT) +import Control.Monad.Reader (MonadReader (ask), + ReaderT (runReaderT), runReader) +import Control.Monad.Trans.Control (MonadBaseControl (liftBaseWith), + MonadTransControl (liftWith, restoreT)) import Data.Data (Proxy (..)) import Data.Map (Map) import Data.Pool (Pool) @@ -37,10 +43,11 @@ import Data.Time (NominalDiffTime, TimeOfDay, getCurrentTime, nominalDay) import Data.Time.Calendar (Day, DayOfWeek (..)) import Data.Vector (Vector) -import Database.Persist.Postgresql (SqlBackend) +import Database.Persist.Postgresql (SqlBackend, runSqlPool) import Fmt import GHC.Generics (Generic) import MultiLangText (MultiLangText) +import Server.Util (runLogging) import Web.PathPieces (PathPiece) import Yesod (Lang) @@ -120,6 +127,7 @@ Tracker sql=tt_tracker_token expires UTCTime blocked Bool agent Text + currentTicket TicketId Maybe deriving Eq Show Generic TrackerTicket @@ -130,7 +138,7 @@ TrackerTicket -- raw frames as received from OBUs TrainPing json sql=tt_trip_ping - ticket TicketId + -- ticket TicketId token TrackerId geopos Geopos timestamp UTCTime @@ -169,5 +177,19 @@ instance ToSchema TrainAnchor where instance ToSchema Announcement where declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "announcement") -runSql :: MonadIO m => Pool SqlBackend -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a -> m a -runSql pool = liftIO . flip runSqlPersistMPool pool +runSqlWithoutLog :: MonadIO m + => Pool SqlBackend + -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a + -> m a +runSqlWithoutLog pool = liftIO . flip runSqlPersistMPool pool + +-- It's a bit unfortunate that we have an extra reader here for just the +-- logging config, but since Handler is not MonadUnliftIO there seems to be (?) +-- no better way than to nest logger monads … +runSql :: (MonadLogger m, MonadIO m, MonadReader LoggingConfig m) + => Pool SqlBackend + -> ReaderT SqlBackend (LoggingT (ResourceT IO)) a + -> m a +runSql pool x = do + conf <- ask + liftIO $ runResourceT $ runLogging conf $ runSqlPool x pool diff --git a/lib/Server.hs b/lib/Server.hs index 3922a7b..73c55cb 100644 --- a/lib/Server.hs +++ b/lib/Server.hs @@ -16,12 +16,14 @@ import Control.Concurrent.STM (TQueue, TVar, atomically, import Control.Monad (forever, unless, void, when) import Control.Monad.Catch (handle) -import Control.Monad.Extra (ifM, maybeM, unlessM, - whenJust, whenM) +import Control.Monad.Extra (ifM, mapMaybeM, maybeM, + unlessM, whenJust, whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (LoggingT, NoLoggingT, +import Control.Monad.Logger (LoggingT, MonadLogger, + NoLoggingT, logInfoN, logWarnN) -import Control.Monad.Reader (ReaderT, forM) +import Control.Monad.Reader (MonadReader, ReaderT, + forM) import Control.Monad.Trans (lift) import Data.Aeson ((.=)) import qualified Data.Aeson as A @@ -33,7 +35,7 @@ import Data.Pool (Pool) import Data.Proxy (Proxy (Proxy)) import Data.Swagger (toSchema) import Data.Text (Text) -import Data.Text.Encoding (decodeUtf8) +import Data.Text.Encoding (decodeASCII, decodeUtf8) import Data.Time (NominalDiffTime, UTCTime (utctDay), addUTCTime, diffUTCTime, @@ -48,7 +50,8 @@ import Fmt ((+|), (|+)) import qualified Network.WebSockets as WS import Servant (Application, ServerError (errBody), - err401, err404, serve, + err400, err401, err404, + serve, serveDirectoryFileServer, throwError) import Servant.API (NoContent (..), @@ -62,24 +65,33 @@ import Persist import Server.ControlRoom import Server.GTFS_RT (gtfsRealtimeServer) import Server.Util (Service, ServiceM, - runService, sendErrorMsg) + runService, sendErrorMsg, + utcToSeconds) import Yesod (toWaiAppPlain) -import Extrapolation (Extrapolator (..), - LinearExtrapolator (..)) -import System.IO.Unsafe - import Conduit (ResourceT) -import Config (ServerConfig (serverConfigAssets, serverConfigZoneinfoPath)) +import Config (LoggingConfig, + ServerConfig (..)) +import Control.Exception (throw) import Data.ByteString (ByteString) import Data.ByteString.Lazy (toStrict) +import Data.Foldable (minimumBy) +import Data.Function (on, (&)) +import Data.Maybe (fromMaybe) import qualified Data.Text as T import Data.Time.LocalTime.TimeZone.Olson (getTimeZoneSeriesFromOlsonFile) import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries) import Data.UUID (UUID) +import qualified Data.UUID as UUID +import Extrapolation (Extrapolator (..), + LinearExtrapolator (..), + euclid) +import GTFS (Seconds (unSeconds), + seconds2Double) import Prometheus import Prometheus.Metric.GHC import System.FilePath ((</>)) +import System.IO.Unsafe application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application application gtfs dbpool settings = do @@ -94,65 +106,84 @@ application gtfs dbpool settings = do (serverConfigZoneinfoPath settings </> T.unpack tzname) subscribers <- newTVarIO mempty - pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService + pure $ serve (Proxy @CompleteAPI) + $ hoistServer (Proxy @CompleteAPI) (runService (serverConfigLogging settings)) $ server gtfs getTzseries metrics subscribers dbpool settings -- databaseMigration :: ConnectionString -> IO () -doMigration pool = runSql pool $ runMigration $ do +doMigration pool = runSqlWithoutLog pool $ runMigration $ do migrateEnableExtension "uuid-ossp" migrateAll server :: GTFS.GTFS -> (Text -> IO TimeZoneSeries) -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI - :<|> (handleTimetable :<|> handleTimetableStops :<|> handleTrip - :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS + :<|> (handleTrackerRegister :<|> handleTrainPing (throwError err401) :<|> handleWS :<|> handleSubscribe :<|> handleDebugState :<|> handleDebugTrain - :<|> handleDebugRegister :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool) + :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool) :<|> metrics :<|> serveDirectoryFileServer (serverConfigAssets settings) :<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool settings))) - where handleTimetable station maybeDay = - M.filter isLastStop . GTFS.tripsOnDay gtfs <$> liftIO day - where isLastStop = (==) station . GTFS.stationId . GTFS.stopStation . V.last . GTFS.tripStops - day = maybeM (getCurrentTime <&> utctDay) pure (pure maybeDay) - handleTimetableStops day = - pure . A.toJSON . fmap mkJson . M.elems $ GTFS.tripsOnDay gtfs day - where mkJson :: GTFS.Trip GTFS.Deep GTFS.Deep -> A.Value - mkJson GTFS.Trip {..} = A.object - [ "trip" .= tripTripId - , "sequencelength" .= (GTFS.stopSequence . V.last) tripStops - , "stops" .= fmap (\GTFS.Stop{..} -> A.object - [ "departure" .= GTFS.toUTC stopDeparture (GTFS.tzseries gtfs) day - , "arrival" .= GTFS.toUTC stopArrival (GTFS.tzseries gtfs) day - , "station" .= GTFS.stationId stopStation - , "lat" .= GTFS.stationLat stopStation - , "lon" .= GTFS.stationLon stopStation - ]) tripStops - ] - handleTrip trip = case M.lookup trip (GTFS.trips gtfs) of - Just res -> pure res - Nothing -> throwError err404 - handleRegister (ticketId :: UUID) RegisterJson{..} = do + where handleTrackerRegister RegisterJson{..} = do today <- liftIO getCurrentTime <&> utctDay expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod runSql dbpool $ do - TrackerKey tracker <- insert (Tracker expires False registerAgent) - insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker)) - pure tracker - handleDebugRegister (ticketId :: UUID) = do - expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod - runSql dbpool $ do - TrackerKey tracker <- insert (Tracker expires False "debug key") - insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker)) + TrackerKey tracker <- insert (Tracker expires False registerAgent Nothing) pure tracker handleTrainPing onError ping@TrainPing{..} = - let ticketId = trainPingTicket in - isTokenValid dbpool trainPingToken ticketId >>= \case - Nothing -> do - onError - pure Nothing - Just (tracker@Tracker{..}, ticket@Ticket{..}) -> do + isTokenValid dbpool trainPingToken >>= \case + Nothing -> onError >> pure Nothing + Just tracker@Tracker{..} -> do + + -- if the tracker is not associated with a ticket, it is probably + -- just starting out on a new trip, or has finished an old one. + maybeTicketId <- case trackerCurrentTicket of + Just ticketId -> pure (Just ticketId) + Nothing -> runSql dbpool $ do + now <- liftIO getCurrentTime + tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] [] + ticketsWithFirstStation <- flip mapMaybeM tickets + (\ticket@(Entity ticketId _) -> do + selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case + Nothing -> pure Nothing + Just (Entity _ stop) -> do + station <- getJust (stopStation stop) + tzseries <- liftIO $ getTzseries (GTFS.tzname (stopDeparture stop)) + pure (Just (ticket, station, stop, tzseries))) + + if null ticketsWithFirstStation then pure Nothing else do + let (closestTicket, _, _, _) = minimumBy + -- (compare `on` euclid trainPingGeopos . stationGeopos . snd) + (compare `on` + (\(Entity _ ticket, station, stop, tzseries) -> + let + runningDay = ticketDay ticket + spaceDistance = euclid trainPingGeopos (stationGeopos station) + timeDiff = + GTFS.toSeconds (stopDeparture stop) tzseries runningDay + - utcToSeconds now runningDay + in + euclid trainPingGeopos (stationGeopos station) + + abs (seconds2Double timeDiff / 3600))) + ticketsWithFirstStation + logInfoN + $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+ + " (trip "+|ticketTripName (entityVal closestTicket)|+")." + + update (coerce trainPingToken) + [TrackerCurrentTicket =. Just (entityKey closestTicket)] + + pure (Just (entityKey closestTicket)) + + ticketId <- case maybeTicketId of + Just ticketId -> pure ticketId + Nothing -> do + logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + " sent a ping, but no trips are running today." + throwError err400 + runSql dbpool $ do + ticket@Ticket{..} <- getJust ticketId stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival] >>= mapM (\stop -> do @@ -165,17 +196,31 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI <&> (V.fromList . fmap entityVal) let anchor = extrapolateAnchorFromPing LinearExtrapolator - ticket stations shapePoints ping + ticketId ticket stations shapePoints ping insert ping - last <- selectFirst [TrainAnchorTicket ==. trainPingTicket] [Desc TrainAnchorWhen] + last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen] -- only insert new estimates if they've actually changed anything when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor)) $ void $ insert anchor + -- are we at the final stop? if so, mark this ticket as done + -- & the tracker as free + let maxSequence = V.last stations + & (\(stop, _, _) -> stopSequence stop) + & fromIntegral + when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do + update (coerce trainPingToken) + [TrackerCurrentTicket =. Nothing] + update ticketId + [TicketCompleted =. True] + logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+ + " has completed ticket "+|UUID.toString (coerce ticketId)|+ + " (trip "+|ticketTripName|+")" + queues <- liftIO $ atomically $ do - queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket) + queues <- readTVar subscribers <&> M.lookup (coerce ticketId) whenJust queues $ mapM_ (\q -> writeTQueue q (Just ping)) pure queues @@ -187,14 +232,14 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI msg <- liftIO $ WS.receiveData conn case A.eitherDecode msg of Left err -> do - logWarnN ("stray websocket message: "+|show msg|+" (could not decode: "+|err|+")") + logWarnN ("stray websocket message: "+|decodeASCII (toStrict msg)|+" (could not decode: "+|err|+")") liftIO $ WS.sendClose conn (C8.pack err) -- TODO: send a close msg (Nothing) to the subscribed queues? decGauge metricsWSGauge - Right ping -> + Right ping -> do -- if invalid token, send a "polite" close request. Note that the client may -- ignore this and continue sending messages, which will continue to be handled. - liftIO $ handleTrainPing (WS.sendClose conn ("" :: ByteString)) ping >>= \case - Just anchor -> WS.sendTextData conn (A.encode anchor) + handleTrainPing (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case + Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor) Nothing -> pure () handleSubscribe (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do queue <- atomically $ do @@ -204,7 +249,7 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI $ M.insertWith (<>) ticketId [queue] qs pure queue -- send most recent ping, if any (so we won't have to wait for movement) - lastPing <- runSql dbpool $ do + lastPing <- runSqlWithoutLog dbpool $ do trackers <- getTicketTrackers ticketId <&> fmap entityKey selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp] @@ -239,21 +284,21 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI handleDebugAPI = pure $ toSwagger (Proxy @API) metrics = exportMetricsAsText <&> (decodeUtf8 . toStrict) -getTicketTrackers :: UUID -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) [Entity Tracker] +getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO))) + => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker] getTicketTrackers ticketId = do joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] [] <&> fmap (trackerTicketTracker . entityVal) - selectList [TrackerId <-. joins] [] + selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) [] -- TODO: proper debug logging for expired tokens -isTokenValid :: MonadIO m => Pool SqlBackend -> TrackerId -> TicketId -> m (Maybe (Tracker, Ticket)) -isTokenValid dbpool token ticketId = runSql dbpool $ get token >>= \case +isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker) +isTokenValid dbpool token = runSql dbpool $ get token >>= \case Just tracker | not (trackerBlocked tracker) -> do ifM (hasExpired (trackerExpires tracker)) (pure Nothing) - $ runSql dbpool $ get ticketId - <&> (\case { Nothing -> Nothing; Just ticket -> Just (tracker, ticket) }) + (pure (Just tracker)) _ -> pure Nothing hasExpired :: MonadIO m => UTCTime -> m Bool diff --git a/lib/Server/ControlRoom.hs b/lib/Server/ControlRoom.hs index 9d15bcf..5292620 100644 --- a/lib/Server/ControlRoom.hs +++ b/lib/Server/ControlRoom.hs @@ -17,6 +17,7 @@ import Control.Monad.IO.Class (MonadIO (liftIO)) import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as C8 import qualified Data.ByteString.Lazy as LB +import Data.Coerce (coerce) import Data.Function (on, (&)) import Data.Functor ((<&>)) import Data.List (lookup, nubBy) @@ -86,6 +87,7 @@ mkYesod "ControlRoom" [parseRoutes| /obu OnboardUnitMenuR GET /obu/#UUID OnboardUnitR GET +/tracker OnboardTrackerR GET |] emptyMarkup :: MarkupM a -> Bool @@ -96,6 +98,7 @@ instance Yesod ControlRoom where authRoute _ = Just $ AuthR LoginR isAuthorized OnboardUnitMenuR _ = pure Authorized isAuthorized (OnboardUnitR _) _ = pure Authorized + isAuthorized OnboardTrackerR _ = pure Authorized isAuthorized (AuthR _) _ = pure Authorized isAuthorized _ _ = do UffdConfig{..} <- getYesod <&> serverConfigLogin . getSettings @@ -200,7 +203,7 @@ getTicketsR = do gtfs <- getYesod <&> getGtfs -- TODO: tickets should have all trip information saved - tickets <- runDB $ selectList [ TicketDay ==. day ] [] >>= mapM (\ticket -> do + tickets <- runDB $ selectList [ TicketDay ==. day ] [ Asc TicketTripName ] >>= mapM (\ticket -> do stops <- selectList [ StopTicket ==. entityKey ticket ] [] startStation <- getJust (stopStation $ entityVal $ head stops) pure (ticket, startStation, fmap entityVal stops)) @@ -317,9 +320,11 @@ getTicketViewR ticketId = do pure (entityVal stop, station)) anns <- runDB $ selectList [ AnnouncementTicket ==. ticketKey ] [] - trackerIds <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] [] + joins <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] [] <&> fmap (trackerTicketTracker . entityVal) - trackers <- runDB $ selectList [ TrackerId <-. trackerIds ] [Asc TrackerExpires] + trackers <- runDB $ selectList + ([ TrackerId <-. joins ] ||. [ TrackerCurrentTicket ==. Just ticketKey ]) + [Asc TrackerExpires] lastPing <- runDB $ selectFirst [ TrainPingToken <-. fmap entityKey trackers ] [Desc TrainPingTimestamp] anchors <- runDB $ selectList [ TrainAnchorTicket ==. ticketKey ] [] <&> nonEmpty . fmap entityVal @@ -511,7 +516,9 @@ getTokenBlock token = do case maybe of Just r@Tracker{..} -> do liftIO $ print r - redirect RootR + redirect $ case trackerCurrentTicket of + Just ticket -> TicketViewR (coerce ticket) + Nothing -> RootR Nothing -> notFound getOnboardUnitMenuR :: Handler Html @@ -525,14 +532,16 @@ getOnboardUnitMenuR = do defaultLayout $ do [whamlet| -<h1>_{MsgOBU} -<section> - _{MsgChooseTrain} - $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets - <hr> - <a href="@{OnboardUnitR ticketId}"> - #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop} -|] + <h1>_{MsgOBU} + <section> + _{MsgChooseTrain} + $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets + <hr> + <a href="@{OnboardUnitR ticketId}"> + #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop} + <section> + <a href="@{OnboardTrackerR}">_{MsgStartTracking} + |] getOnboardUnitR :: UUID -> Handler Html getOnboardUnitR ticketId = do @@ -541,6 +550,12 @@ getOnboardUnitR ticketId = do Just ticket -> pure ticket defaultLayout $(whamletFile "site/obu.hamlet") +getOnboardTrackerR :: Handler Html +getOnboardTrackerR = do + defaultLayout + $( whamletFile "site/tracker.hamlet") + + announceForm :: UUID -> Html -> MForm Handler (FormResult Announcement, Widget) announceForm ticketId = renderDivs $ Announcement <$> pure (TicketKey ticketId) diff --git a/lib/Server/GTFS_RT.hs b/lib/Server/GTFS_RT.hs index 48a84db..d2e53a1 100644 --- a/lib/Server/GTFS_RT.hs +++ b/lib/Server/GTFS_RT.hs @@ -155,13 +155,14 @@ gtfsRealtimeServer gtfs@GTFS{..} dbpool = ticket <- selectList [TicketCompleted ==. False] [] - positions <- forM ticket $ \(Entity key ticket) -> do - selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case - Nothing -> pure Nothing - Just lastPing -> - pure (Just $ mkPosition (lastPing, ticket)) - - defFeedMessage (catMaybes positions) + -- TODO: reimplement this (since trainpings no longer reference tickets it's gone for now) + -- positions <- forM ticket $ \(Entity key ticket) -> do + -- selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case + -- Nothing -> pure Nothing + -- Just lastPing -> + -- pure (Just $ mkPosition (lastPing, ticket)) + + defFeedMessage [] -- (catMaybes positions) where mkPosition :: (Entity TrainPing, Ticket) -> RT.FeedEntity diff --git a/lib/Server/Util.hs b/lib/Server/Util.hs index 5ffb829..0106428 100644 --- a/lib/Server/Util.hs +++ b/lib/Server/Util.hs @@ -1,28 +1,67 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE RecordWildCards #-} -- | mostly the monad the service runs in -module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds) where +module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds, runLogging) where +import Config (LoggingConfig (..)) +import Control.Exception (handle, try) +import Control.Monad.Extra (void, whenJust) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (LoggingT, runStderrLoggingT) +import Control.Monad.Logger (Loc, LogLevel (..), LogSource, LogStr, + LoggingT (..), defaultOutput, + fromLogStr, runStderrLoggingT) +import Control.Monad.Reader (ReaderT (..)) import qualified Data.Aeson as A import Data.ByteString (ByteString) +import qualified Data.ByteString as C8 import Data.Text (Text) +import qualified Data.Text as T +import Data.Text.Encoding (decodeUtf8Lenient) import Data.Time (Day, UTCTime (..), diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) +import Fmt ((+|), (|+)) +import GHC.IO.Exception (IOException (IOError)) import GTFS (Seconds (..)) import Prometheus (MonadMonitor (doIO)) import Servant (Handler, ServerError, ServerT, err404, errBody, errHeaders, throwError) +import System.IO (stderr) +import System.Process.Extra (callProcess) -type ServiceM = LoggingT Handler +type ServiceM = LoggingT (ReaderT LoggingConfig Handler) type Service api = ServerT api ServiceM -runService :: ServiceM a -> Handler a -runService = runStderrLoggingT +runService :: LoggingConfig -> ServiceM a -> Handler a +runService conf m = runReaderT (runLogging conf m) conf instance MonadMonitor ServiceM where doIO = liftIO +runLogging :: MonadIO m => LoggingConfig -> LoggingT m a -> m a +runLogging LoggingConfig{..} logging = runLoggingT logging printLogMsg + where printLogMsg loc source level msg = do + -- this is what runStderrLoggingT does + defaultOutput stderr loc source level msg + + whenJust loggingConfigNtfyToken \token -> handle ntfyFailed do + callProcess "ntfy" + [ "send" + , "--token=" <> T.unpack token + , "--title="+|loggingConfigHostname|+"/"+|"tracktrain" + , "--priority="+|show (ntfyPriority level)|+"" + , T.unpack loggingConfigNtfyTopic + , T.unpack (decodeUtf8Lenient (fromLogStr msg)) ] + + ntfyFailed (e :: IOError) = + putStrLn ("calling ntfy failed:"+|show e|+".") + ntfyPriority level = case level of + LevelDebug -> 2 + LevelInfo -> 3 + LevelWarn -> 4 + LevelError -> 5 + LevelOther _ -> 0 + sendErrorMsg :: Text -> ServiceM a sendErrorMsg msg = throwError err404 diff --git a/messages/en.msg b/messages/en.msg index 41ced10..a2ef189 100644 --- a/messages/en.msg +++ b/messages/en.msg @@ -40,6 +40,7 @@ Tickets: Tickets ImportTrips: import selected trips delete: delete AccordingToGtfs: Additional Trips contained in the Gtfs +StartTracking: Start Tracking OBU: Onboard-Unit ChooseTrain: Choose a Train diff --git a/site/tracker.hamlet b/site/tracker.hamlet new file mode 100644 index 0000000..5877c5d --- /dev/null +++ b/site/tracker.hamlet @@ -0,0 +1,137 @@ +<h1>_{MsgOBU} + +<section> + <h2>Blub + <strong>Token:</strong> <span id="token"> +<section> + <h2>Status + <p id="status">_{MsgNone} + <p id>_{MsgError}: <span id="error"> +<section> + <h2>_{MsgLive} + <p><strong>Position: </strong><span id="lat"></span>, <span id="long"></span> + <p><strong>Accuracy: </strong><span id="acc"> +<section> + <h2>_{MsgEstimated} + <p><strong>_{MsgDelay}</strong>: <span id="delay"> + <p><strong>_{MsgSequence}</strong>: <span id="sequence"> + + +<script> + var token = null; + + let euclid = (a,b) => { + let x = a[0]-b[0]; + let y = a[1]-b[1]; + return x*x+y*y; + } + + let minimalDist = (point, list, proj, norm) => { + return list.reduce ( + (min, x) => { + let dist = norm(point, proj(x)); + return dist < min[0] ? [dist,x] : min + }, + [norm(point, proj(list[0])), list[0]] + )[1] + } + + let counter = 0; + let ws; + let id; + + function setStatus(msg) { + document.getElementById("status").innerText = msg + } + + async function geoError(error) { + setStatus("error"); + alert(`_{MsgPermissionFailed}: \n${error.message}`); + console.error(error); + main(); + } + + async function wsError(error) { + // alert(`_{MsgWebsocketError}: \n${error.message === undefined ? error.reason : error.message}`); + console.log(error); + navigator.geolocation.clearWatch(id); + } + + async function wsClose(error) { + console.log(error); + document.getElementById("error").innerText = `websocket closed (reason: ${error.reason}). reconnecting …`; + navigator.geolocation.clearWatch(id); + setTimeout(openWebsocket, 1000); + } + + function wsMsg(msg) { + let json = JSON.parse(msg.data); + console.log(json); + document.getElementById("delay").innerText = + `${json.delay}s (${Math.floor(json.delay / 60)}min)`; + document.getElementById("sequence").innerText = json.sequence; + } + + + function initGeopos() { + document.getElementById("error").innerText = ""; + id = navigator.geolocation.watchPosition( + geoPing, + geoError, + {enableHighAccuracy: true} + ); + } + + + function openWebsocket () { + ws = new WebSocket((location.protocol == "http:" ? "ws" : "wss") + "://" + location.host + "/api/tracker/ping/ws"); + ws.onerror = wsError; + ws.onclose = wsClose; + ws.onmessage = wsMsg; + ws.onopen = (event) => { + setStatus("connected"); + }; + } + + async function geoPing(geoloc) { + console.log("got position update " + counter); + document.getElementById("lat").innerText = geoloc.coords.latitude; + document.getElementById("long").innerText = geoloc.coords.longitude; + document.getElementById("acc").innerText = geoloc.coords.accuracy; + + if (ws !== undefined && ws.readyState == 1) { + ws.send(JSON.stringify({ + token: token, + geopos: [ geoloc.coords.latitude, geoloc.coords.longitude ], + timestamp: (new Date()).toISOString() + })); + counter += 1; + setStatus(`sent ${counter} pings`); + } else { + setStatus(`websocket readystate ${ws.readyState}`); + } + } + + + async function main() { + initGeopos(); + + token = await (await fetch("/api/tracker/register/", { + method: "POST", + body: JSON.stringify({agent: "tracktrain-website"}), + headers: {"Content-Type": "application/json"} + })).json(); + + if (token.error) { + alert("could not obtain token: \n" + token.msg); + setStatus("_{MsgTokenFailed}"); + } else { + console.log("got token"); + + document.getElementById("token").innerText = token; + + openWebsocket(); + } + } + + main() diff --git a/tracktrain.cabal b/tracktrain.cabal index ad1624f..a8e42a4 100644 --- a/tracktrain.cabal +++ b/tracktrain.cabal @@ -97,6 +97,7 @@ library , proto-lens , http-media , filepath + , monad-control hs-source-dirs: lib exposed-modules: GTFS , Server |