aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorstuebinm2024-05-01 03:07:06 +0200
committerstuebinm2024-05-02 00:18:16 +0200
commit80984549172d7de83564757de80996487ca2fb15 (patch)
tree1e4bfe43fa9fc96fa5642fe34f502005775f257f
parentb26a3d617e90c9693a4ee8b7cc8bbba506cd4746 (diff)
restructure: get the tracker to work again
This should hopefully be the final (major) part of the restructuring: a tracker no longer has to know which trip it is on (and indeed it has no idea for now), instead the server keeps state about which trips are currently running and will insert incoming pings in a hopefully reasonable manner, based on their geoposition & time. There's lots of associated TODO items here (especially there should be manual overrides for all this logic in the web ui), but that's work for a future me. (incidentally, this also adds support for sending all log messages out via ntfy-sh)
-rw-r--r--lib/API.hs54
-rw-r--r--lib/Config.hs20
-rw-r--r--lib/Extrapolation.hs9
-rw-r--r--lib/Persist.hs38
-rw-r--r--lib/Server.hs179
-rw-r--r--lib/Server/ControlRoom.hs39
-rw-r--r--lib/Server/GTFS_RT.hs15
-rw-r--r--lib/Server/Util.hs49
-rw-r--r--messages/en.msg1
-rw-r--r--site/tracker.hamlet137
-rw-r--r--tracktrain.cabal1
11 files changed, 407 insertions, 135 deletions
diff --git a/lib/API.hs b/lib/API.hs
index 2c8123a..b2635c1 100644
--- a/lib/API.hs
+++ b/lib/API.hs
@@ -39,7 +39,8 @@ import Data.Aeson (FromJSON (..), Value,
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as LB
import Data.HashMap.Strict.InsOrd (singleton)
-import Data.ProtoLens (Message, encodeMessage)
+import Data.ProtoLens (Message (messageName),
+ encodeMessage)
import GHC.Generics (Generic)
import GTFS (Depth (Deep), GTFSFile (..),
StationID, Trip, TripId,
@@ -50,45 +51,24 @@ import Prometheus
import Proto.GtfsRealtime (FeedMessage)
import Servant.API.ContentTypes (Accept (..))
-newtype RegisterJson = RegisterJson
- { registerAgent :: Text }
- deriving (Show, Generic)
-
-instance FromJSON RegisterJson where
- parseJSON = genericParseJSON (aesonOptions "register")
-instance ToSchema RegisterJson where
- declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register")
-instance ToSchema Value where
- declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty
- & type_ ?~ SwaggerObject
-
--- | The server's API (as it is actually intended).
-type API = "timetable" :> Capture "Station Id" StationID :> QueryParam "day" Day :> Get '[JSON] (Map TripId (Trip Deep Deep))
- :<|> "timetable" :> "stops" :> Capture "Date" Day :> Get '[JSON] Value
- :<|> "trip" :> Capture "Trip Id" TripId :> Get '[JSON] (Trip Deep Deep)
+-- | tracktrain's API
+type API =
-- ingress API (put this behind BasicAuth?)
-- TODO: perhaps require a first ping for registration?
- :<|> "train" :> "register" :> Capture "Ticket Id" UUID :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token
- -- TODO: perhaps a websocket instead?
- :<|> "train" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor)
- :<|> "train" :> "ping" :> "ws" :> WebSocket
- :<|> "train" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket
- -- debug things
+ "tracker" :> "register" :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token
+ :<|> "tracker" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor)
+ :<|> "tracker" :> "ping" :> "ws" :> WebSocket
+ :<|> "ticket" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket
:<|> "debug" :> "pings" :> Get '[JSON] (Map Token [TrainPing])
:<|> "debug" :> "pings" :> Capture "Ticket Id" UUID :> Get '[JSON] [TrainPing]
- :<|> "debug" :> "register" :> Capture "Ticket Id" UUID :> Post '[JSON] Token
:<|> "gtfs.zip" :> Get '[OctetStream] GTFSFile
:<|> "gtfs" :> GtfsRealtimeAPI
--- | The API used for publishing gtfs realtime updates
type GtfsRealtimeAPI = "servicealerts" :> Get '[Proto] FeedMessage
:<|> "tripupdates" :> Get '[Proto] FeedMessage
:<|> "vehiclepositions" :> Get '[Proto] FeedMessage
--- | The server's API with an additional debug route for accessing the specification
--- itself. Split from API to prevent the API documenting the format in which it is
--- documented, which would be silly and way to verbose.
type CompleteAPI =
"api" :> "openapi" :> Get '[JSON] Swagger
:<|> "api" :> API
@@ -103,6 +83,18 @@ data Metrics = Metrics
instance MimeRender OctetStream GTFSFile where
mimeRender p (GTFSFile bytes) = mimeRender p bytes
+newtype RegisterJson = RegisterJson
+ { registerAgent :: Text }
+ deriving (Show, Generic)
+
+instance FromJSON RegisterJson where
+ parseJSON = genericParseJSON (aesonOptions "register")
+instance ToSchema RegisterJson where
+ declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register")
+instance ToSchema Value where
+ declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty
+ & type_ ?~ SwaggerObject
+
-- TODO write something useful here! (and if it's just "hey this is some websocket thingie")
@@ -128,7 +120,7 @@ instance Accept Proto where
instance Message msg => MimeRender Proto msg where
mimeRender _ = LB.fromStrict . encodeMessage
--- TODO: this instance is horrible; ideally it should at least include
--- the name of the message type (if at all possible)
+-- | Not an ideal instance, hides fields of the protobuf message
instance {-# OVERLAPPABLE #-} Message msg => ToSchema msg where
- declareNamedSchema _ = declareNamedSchema (Proxy @String)
+ declareNamedSchema proxy =
+ pure (NamedSchema (Just (messageName proxy)) mempty)
diff --git a/lib/Config.hs b/lib/Config.hs
index 2349e66..94fdd28 100644
--- a/lib/Config.hs
+++ b/lib/Config.hs
@@ -1,10 +1,11 @@
{-# LANGUAGE RecordWildCards #-}
-module Config where
+module Config (UffdConfig(..), ServerConfig(..), LoggingConfig(..)) where
import Conferer (DefaultConfig (configDef))
import Conferer.FromConfig
import Conferer.FromConfig.Warp ()
import Data.ByteString (ByteString)
+import Data.Functor ((<&>))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.Wai.Handler.Warp (Settings)
@@ -24,6 +25,13 @@ data ServerConfig = ServerConfig
, serverConfigAssets :: FilePath
, serverConfigZoneinfoPath :: FilePath
, serverConfigLogin :: UffdConfig
+ , serverConfigLogging :: LoggingConfig
+ } deriving (Generic)
+
+data LoggingConfig = LoggingConfig
+ { loggingConfigNtfyToken :: Maybe Text
+ , loggingConfigNtfyTopic :: Text
+ , loggingConfigHostname :: Text
} deriving (Generic)
instance FromConfig ServerConfig
@@ -36,6 +44,7 @@ instance DefaultConfig ServerConfig where
, serverConfigAssets = "./assets"
, serverConfigZoneinfoPath = "/etc/zoneinfo/"
, serverConfigLogin = configDef
+ , serverConfigLogging = configDef
}
instance DefaultConfig UffdConfig where
@@ -50,3 +59,12 @@ instance FromConfig UffdConfig where
uffdConfigClientSecret <- fetchFromConfig (key /. "clientSecret") config
uffdConfigEnable <- fetchFromConfig (key /. "enable") config
pure UffdConfig {..}
+
+instance FromConfig LoggingConfig where
+ fromConfig key config = LoggingConfig
+ <$> fetchFromConfig (key /. "ntfyToken") config
+ <*> fetchFromConfig (key /. "ntfyTopic") config
+ <*> fetchFromConfig (key /. "name") config
+
+instance DefaultConfig LoggingConfig where
+ configDef = LoggingConfig Nothing "tracktrain" ""
diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs
index 01e5f6f..389d047 100644
--- a/lib/Extrapolation.hs
+++ b/lib/Extrapolation.hs
@@ -27,8 +27,8 @@ import GTFS (Seconds (..),
import Persist (Geopos (..),
ShapePoint (shapePointGeopos),
Station (..), Stop (..),
- Ticket (..), Token (..),
- Tracker (..),
+ Ticket (..), TicketId,
+ Token (..), Tracker (..),
TrainAnchor (..),
TrainPing (..))
import Server.Util (utcToSeconds)
@@ -40,6 +40,7 @@ class Extrapolator strategy where
-- | here's a position ping, guess things from that!
extrapolateAnchorFromPing
:: strategy
+ -> TicketId
-> Ticket
-> V.Vector (Stop, Station, TimeZoneSeries)
-> V.Vector ShapePoint
@@ -68,9 +69,9 @@ instance Extrapolator LinearExtrapolator where
$ NE.filter (\a -> trainAnchorSequence a < positionNow) history
where difference status = positionNow - trainAnchorSequence status
- extrapolateAnchorFromPing _ Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor
+ extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor
{ trainAnchorCreated = trainPingTimestamp
- , trainAnchorTicket = trainPingTicket
+ , trainAnchorTicket = ticketId
, trainAnchorWhen = utcToSeconds trainPingTimestamp ticketDay
, trainAnchorSequence
, trainAnchorDelay
diff --git a/lib/Persist.hs b/lib/Persist.hs
index 7613fd9..46e5ef4 100644
--- a/lib/Persist.hs
+++ b/lib/Persist.hs
@@ -21,13 +21,19 @@ import Database.Persist.Sql (PersistFieldSql (..),
import Database.Persist.TH
import qualified GTFS
import PersistOrphans
-import Servant (FromHttpApiData (..),
+import Servant (FromHttpApiData (..), Handler,
ToHttpApiData)
-import Conduit (ResourceT)
+import Conduit (MonadTrans (lift), MonadUnliftIO,
+ ResourceT, runResourceT)
+import Config (LoggingConfig)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (NoLoggingT)
-import Control.Monad.Reader (ReaderT)
+import Control.Monad.Logger (LoggingT, MonadLogger, NoLoggingT,
+ runNoLoggingT, runStderrLoggingT)
+import Control.Monad.Reader (MonadReader (ask),
+ ReaderT (runReaderT), runReader)
+import Control.Monad.Trans.Control (MonadBaseControl (liftBaseWith),
+ MonadTransControl (liftWith, restoreT))
import Data.Data (Proxy (..))
import Data.Map (Map)
import Data.Pool (Pool)
@@ -37,10 +43,11 @@ import Data.Time (NominalDiffTime, TimeOfDay,
getCurrentTime, nominalDay)
import Data.Time.Calendar (Day, DayOfWeek (..))
import Data.Vector (Vector)
-import Database.Persist.Postgresql (SqlBackend)
+import Database.Persist.Postgresql (SqlBackend, runSqlPool)
import Fmt
import GHC.Generics (Generic)
import MultiLangText (MultiLangText)
+import Server.Util (runLogging)
import Web.PathPieces (PathPiece)
import Yesod (Lang)
@@ -120,6 +127,7 @@ Tracker sql=tt_tracker_token
expires UTCTime
blocked Bool
agent Text
+ currentTicket TicketId Maybe
deriving Eq Show Generic
TrackerTicket
@@ -130,7 +138,7 @@ TrackerTicket
-- raw frames as received from OBUs
TrainPing json sql=tt_trip_ping
- ticket TicketId
+ -- ticket TicketId
token TrackerId
geopos Geopos
timestamp UTCTime
@@ -169,5 +177,19 @@ instance ToSchema TrainAnchor where
instance ToSchema Announcement where
declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "announcement")
-runSql :: MonadIO m => Pool SqlBackend -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a -> m a
-runSql pool = liftIO . flip runSqlPersistMPool pool
+runSqlWithoutLog :: MonadIO m
+ => Pool SqlBackend
+ -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a
+ -> m a
+runSqlWithoutLog pool = liftIO . flip runSqlPersistMPool pool
+
+-- It's a bit unfortunate that we have an extra reader here for just the
+-- logging config, but since Handler is not MonadUnliftIO there seems to be (?)
+-- no better way than to nest logger monads …
+runSql :: (MonadLogger m, MonadIO m, MonadReader LoggingConfig m)
+ => Pool SqlBackend
+ -> ReaderT SqlBackend (LoggingT (ResourceT IO)) a
+ -> m a
+runSql pool x = do
+ conf <- ask
+ liftIO $ runResourceT $ runLogging conf $ runSqlPool x pool
diff --git a/lib/Server.hs b/lib/Server.hs
index 3922a7b..73c55cb 100644
--- a/lib/Server.hs
+++ b/lib/Server.hs
@@ -16,12 +16,14 @@ import Control.Concurrent.STM (TQueue, TVar, atomically,
import Control.Monad (forever, unless, void,
when)
import Control.Monad.Catch (handle)
-import Control.Monad.Extra (ifM, maybeM, unlessM,
- whenJust, whenM)
+import Control.Monad.Extra (ifM, mapMaybeM, maybeM,
+ unlessM, whenJust, whenM)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (LoggingT, NoLoggingT,
+import Control.Monad.Logger (LoggingT, MonadLogger,
+ NoLoggingT, logInfoN,
logWarnN)
-import Control.Monad.Reader (ReaderT, forM)
+import Control.Monad.Reader (MonadReader, ReaderT,
+ forM)
import Control.Monad.Trans (lift)
import Data.Aeson ((.=))
import qualified Data.Aeson as A
@@ -33,7 +35,7 @@ import Data.Pool (Pool)
import Data.Proxy (Proxy (Proxy))
import Data.Swagger (toSchema)
import Data.Text (Text)
-import Data.Text.Encoding (decodeUtf8)
+import Data.Text.Encoding (decodeASCII, decodeUtf8)
import Data.Time (NominalDiffTime,
UTCTime (utctDay),
addUTCTime, diffUTCTime,
@@ -48,7 +50,8 @@ import Fmt ((+|), (|+))
import qualified Network.WebSockets as WS
import Servant (Application,
ServerError (errBody),
- err401, err404, serve,
+ err400, err401, err404,
+ serve,
serveDirectoryFileServer,
throwError)
import Servant.API (NoContent (..),
@@ -62,24 +65,33 @@ import Persist
import Server.ControlRoom
import Server.GTFS_RT (gtfsRealtimeServer)
import Server.Util (Service, ServiceM,
- runService, sendErrorMsg)
+ runService, sendErrorMsg,
+ utcToSeconds)
import Yesod (toWaiAppPlain)
-import Extrapolation (Extrapolator (..),
- LinearExtrapolator (..))
-import System.IO.Unsafe
-
import Conduit (ResourceT)
-import Config (ServerConfig (serverConfigAssets, serverConfigZoneinfoPath))
+import Config (LoggingConfig,
+ ServerConfig (..))
+import Control.Exception (throw)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (toStrict)
+import Data.Foldable (minimumBy)
+import Data.Function (on, (&))
+import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Data.Time.LocalTime.TimeZone.Olson (getTimeZoneSeriesFromOlsonFile)
import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries)
import Data.UUID (UUID)
+import qualified Data.UUID as UUID
+import Extrapolation (Extrapolator (..),
+ LinearExtrapolator (..),
+ euclid)
+import GTFS (Seconds (unSeconds),
+ seconds2Double)
import Prometheus
import Prometheus.Metric.GHC
import System.FilePath ((</>))
+import System.IO.Unsafe
application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application
application gtfs dbpool settings = do
@@ -94,65 +106,84 @@ application gtfs dbpool settings = do
(serverConfigZoneinfoPath settings </> T.unpack tzname)
subscribers <- newTVarIO mempty
- pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService
+ pure $ serve (Proxy @CompleteAPI)
+ $ hoistServer (Proxy @CompleteAPI) (runService (serverConfigLogging settings))
$ server gtfs getTzseries metrics subscribers dbpool settings
-- databaseMigration :: ConnectionString -> IO ()
-doMigration pool = runSql pool $ runMigration $ do
+doMigration pool = runSqlWithoutLog pool $ runMigration $ do
migrateEnableExtension "uuid-ossp"
migrateAll
server :: GTFS.GTFS -> (Text -> IO TimeZoneSeries) -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI
server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
- :<|> (handleTimetable :<|> handleTimetableStops :<|> handleTrip
- :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS
+ :<|> (handleTrackerRegister :<|> handleTrainPing (throwError err401) :<|> handleWS
:<|> handleSubscribe :<|> handleDebugState :<|> handleDebugTrain
- :<|> handleDebugRegister :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
+ :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
:<|> metrics
:<|> serveDirectoryFileServer (serverConfigAssets settings)
:<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool settings)))
- where handleTimetable station maybeDay =
- M.filter isLastStop . GTFS.tripsOnDay gtfs <$> liftIO day
- where isLastStop = (==) station . GTFS.stationId . GTFS.stopStation . V.last . GTFS.tripStops
- day = maybeM (getCurrentTime <&> utctDay) pure (pure maybeDay)
- handleTimetableStops day =
- pure . A.toJSON . fmap mkJson . M.elems $ GTFS.tripsOnDay gtfs day
- where mkJson :: GTFS.Trip GTFS.Deep GTFS.Deep -> A.Value
- mkJson GTFS.Trip {..} = A.object
- [ "trip" .= tripTripId
- , "sequencelength" .= (GTFS.stopSequence . V.last) tripStops
- , "stops" .= fmap (\GTFS.Stop{..} -> A.object
- [ "departure" .= GTFS.toUTC stopDeparture (GTFS.tzseries gtfs) day
- , "arrival" .= GTFS.toUTC stopArrival (GTFS.tzseries gtfs) day
- , "station" .= GTFS.stationId stopStation
- , "lat" .= GTFS.stationLat stopStation
- , "lon" .= GTFS.stationLon stopStation
- ]) tripStops
- ]
- handleTrip trip = case M.lookup trip (GTFS.trips gtfs) of
- Just res -> pure res
- Nothing -> throwError err404
- handleRegister (ticketId :: UUID) RegisterJson{..} = do
+ where handleTrackerRegister RegisterJson{..} = do
today <- liftIO getCurrentTime <&> utctDay
expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod
runSql dbpool $ do
- TrackerKey tracker <- insert (Tracker expires False registerAgent)
- insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker))
- pure tracker
- handleDebugRegister (ticketId :: UUID) = do
- expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod
- runSql dbpool $ do
- TrackerKey tracker <- insert (Tracker expires False "debug key")
- insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker))
+ TrackerKey tracker <- insert (Tracker expires False registerAgent Nothing)
pure tracker
handleTrainPing onError ping@TrainPing{..} =
- let ticketId = trainPingTicket in
- isTokenValid dbpool trainPingToken ticketId >>= \case
- Nothing -> do
- onError
- pure Nothing
- Just (tracker@Tracker{..}, ticket@Ticket{..}) -> do
+ isTokenValid dbpool trainPingToken >>= \case
+ Nothing -> onError >> pure Nothing
+ Just tracker@Tracker{..} -> do
+
+ -- if the tracker is not associated with a ticket, it is probably
+ -- just starting out on a new trip, or has finished an old one.
+ maybeTicketId <- case trackerCurrentTicket of
+ Just ticketId -> pure (Just ticketId)
+ Nothing -> runSql dbpool $ do
+ now <- liftIO getCurrentTime
+ tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] []
+ ticketsWithFirstStation <- flip mapMaybeM tickets
+ (\ticket@(Entity ticketId _) -> do
+ selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case
+ Nothing -> pure Nothing
+ Just (Entity _ stop) -> do
+ station <- getJust (stopStation stop)
+ tzseries <- liftIO $ getTzseries (GTFS.tzname (stopDeparture stop))
+ pure (Just (ticket, station, stop, tzseries)))
+
+ if null ticketsWithFirstStation then pure Nothing else do
+ let (closestTicket, _, _, _) = minimumBy
+ -- (compare `on` euclid trainPingGeopos . stationGeopos . snd)
+ (compare `on`
+ (\(Entity _ ticket, station, stop, tzseries) ->
+ let
+ runningDay = ticketDay ticket
+ spaceDistance = euclid trainPingGeopos (stationGeopos station)
+ timeDiff =
+ GTFS.toSeconds (stopDeparture stop) tzseries runningDay
+ - utcToSeconds now runningDay
+ in
+ euclid trainPingGeopos (stationGeopos station)
+ + abs (seconds2Double timeDiff / 3600)))
+ ticketsWithFirstStation
+ logInfoN
+ $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
+ " (trip "+|ticketTripName (entityVal closestTicket)|+")."
+
+ update (coerce trainPingToken)
+ [TrackerCurrentTicket =. Just (entityKey closestTicket)]
+
+ pure (Just (entityKey closestTicket))
+
+ ticketId <- case maybeTicketId of
+ Just ticketId -> pure ticketId
+ Nothing -> do
+ logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ " sent a ping, but no trips are running today."
+ throwError err400
+
runSql dbpool $ do
+ ticket@Ticket{..} <- getJust ticketId
stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
>>= mapM (\stop -> do
@@ -165,17 +196,31 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
<&> (V.fromList . fmap entityVal)
let anchor = extrapolateAnchorFromPing LinearExtrapolator
- ticket stations shapePoints ping
+ ticketId ticket stations shapePoints ping
insert ping
- last <- selectFirst [TrainAnchorTicket ==. trainPingTicket] [Desc TrainAnchorWhen]
+ last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen]
-- only insert new estimates if they've actually changed anything
when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor))
$ void $ insert anchor
+ -- are we at the final stop? if so, mark this ticket as done
+ -- & the tracker as free
+ let maxSequence = V.last stations
+ & (\(stop, _, _) -> stopSequence stop)
+ & fromIntegral
+ when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do
+ update (coerce trainPingToken)
+ [TrackerCurrentTicket =. Nothing]
+ update ticketId
+ [TicketCompleted =. True]
+ logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ " has completed ticket "+|UUID.toString (coerce ticketId)|+
+ " (trip "+|ticketTripName|+")"
+
queues <- liftIO $ atomically $ do
- queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket)
+ queues <- readTVar subscribers <&> M.lookup (coerce ticketId)
whenJust queues $
mapM_ (\q -> writeTQueue q (Just ping))
pure queues
@@ -187,14 +232,14 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
msg <- liftIO $ WS.receiveData conn
case A.eitherDecode msg of
Left err -> do
- logWarnN ("stray websocket message: "+|show msg|+" (could not decode: "+|err|+")")
+ logWarnN ("stray websocket message: "+|decodeASCII (toStrict msg)|+" (could not decode: "+|err|+")")
liftIO $ WS.sendClose conn (C8.pack err)
-- TODO: send a close msg (Nothing) to the subscribed queues? decGauge metricsWSGauge
- Right ping ->
+ Right ping -> do
-- if invalid token, send a "polite" close request. Note that the client may
-- ignore this and continue sending messages, which will continue to be handled.
- liftIO $ handleTrainPing (WS.sendClose conn ("" :: ByteString)) ping >>= \case
- Just anchor -> WS.sendTextData conn (A.encode anchor)
+ handleTrainPing (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case
+ Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor)
Nothing -> pure ()
handleSubscribe (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do
queue <- atomically $ do
@@ -204,7 +249,7 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
$ M.insertWith (<>) ticketId [queue] qs
pure queue
-- send most recent ping, if any (so we won't have to wait for movement)
- lastPing <- runSql dbpool $ do
+ lastPing <- runSqlWithoutLog dbpool $ do
trackers <- getTicketTrackers ticketId
<&> fmap entityKey
selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp]
@@ -239,21 +284,21 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
handleDebugAPI = pure $ toSwagger (Proxy @API)
metrics = exportMetricsAsText <&> (decodeUtf8 . toStrict)
-getTicketTrackers :: UUID -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) [Entity Tracker]
+getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
+ => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = do
joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
<&> fmap (trackerTicketTracker . entityVal)
- selectList [TrackerId <-. joins] []
+ selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []
-- TODO: proper debug logging for expired tokens
-isTokenValid :: MonadIO m => Pool SqlBackend -> TrackerId -> TicketId -> m (Maybe (Tracker, Ticket))
-isTokenValid dbpool token ticketId = runSql dbpool $ get token >>= \case
+isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker)
+isTokenValid dbpool token = runSql dbpool $ get token >>= \case
Just tracker | not (trackerBlocked tracker) -> do
ifM (hasExpired (trackerExpires tracker))
(pure Nothing)
- $ runSql dbpool $ get ticketId
- <&> (\case { Nothing -> Nothing; Just ticket -> Just (tracker, ticket) })
+ (pure (Just tracker))
_ -> pure Nothing
hasExpired :: MonadIO m => UTCTime -> m Bool
diff --git a/lib/Server/ControlRoom.hs b/lib/Server/ControlRoom.hs
index 9d15bcf..5292620 100644
--- a/lib/Server/ControlRoom.hs
+++ b/lib/Server/ControlRoom.hs
@@ -17,6 +17,7 @@ import Control.Monad.IO.Class (MonadIO (liftIO))
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as C8
import qualified Data.ByteString.Lazy as LB
+import Data.Coerce (coerce)
import Data.Function (on, (&))
import Data.Functor ((<&>))
import Data.List (lookup, nubBy)
@@ -86,6 +87,7 @@ mkYesod "ControlRoom" [parseRoutes|
/obu OnboardUnitMenuR GET
/obu/#UUID OnboardUnitR GET
+/tracker OnboardTrackerR GET
|]
emptyMarkup :: MarkupM a -> Bool
@@ -96,6 +98,7 @@ instance Yesod ControlRoom where
authRoute _ = Just $ AuthR LoginR
isAuthorized OnboardUnitMenuR _ = pure Authorized
isAuthorized (OnboardUnitR _) _ = pure Authorized
+ isAuthorized OnboardTrackerR _ = pure Authorized
isAuthorized (AuthR _) _ = pure Authorized
isAuthorized _ _ = do
UffdConfig{..} <- getYesod <&> serverConfigLogin . getSettings
@@ -200,7 +203,7 @@ getTicketsR = do
gtfs <- getYesod <&> getGtfs
-- TODO: tickets should have all trip information saved
- tickets <- runDB $ selectList [ TicketDay ==. day ] [] >>= mapM (\ticket -> do
+ tickets <- runDB $ selectList [ TicketDay ==. day ] [ Asc TicketTripName ] >>= mapM (\ticket -> do
stops <- selectList [ StopTicket ==. entityKey ticket ] []
startStation <- getJust (stopStation $ entityVal $ head stops)
pure (ticket, startStation, fmap entityVal stops))
@@ -317,9 +320,11 @@ getTicketViewR ticketId = do
pure (entityVal stop, station))
anns <- runDB $ selectList [ AnnouncementTicket ==. ticketKey ] []
- trackerIds <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] []
+ joins <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] []
<&> fmap (trackerTicketTracker . entityVal)
- trackers <- runDB $ selectList [ TrackerId <-. trackerIds ] [Asc TrackerExpires]
+ trackers <- runDB $ selectList
+ ([ TrackerId <-. joins ] ||. [ TrackerCurrentTicket ==. Just ticketKey ])
+ [Asc TrackerExpires]
lastPing <- runDB $ selectFirst [ TrainPingToken <-. fmap entityKey trackers ] [Desc TrainPingTimestamp]
anchors <- runDB $ selectList [ TrainAnchorTicket ==. ticketKey ] []
<&> nonEmpty . fmap entityVal
@@ -511,7 +516,9 @@ getTokenBlock token = do
case maybe of
Just r@Tracker{..} -> do
liftIO $ print r
- redirect RootR
+ redirect $ case trackerCurrentTicket of
+ Just ticket -> TicketViewR (coerce ticket)
+ Nothing -> RootR
Nothing -> notFound
getOnboardUnitMenuR :: Handler Html
@@ -525,14 +532,16 @@ getOnboardUnitMenuR = do
defaultLayout $ do
[whamlet|
-<h1>_{MsgOBU}
-<section>
- _{MsgChooseTrain}
- $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets
- <hr>
- <a href="@{OnboardUnitR ticketId}">
- #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop}
-|]
+ <h1>_{MsgOBU}
+ <section>
+ _{MsgChooseTrain}
+ $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets
+ <hr>
+ <a href="@{OnboardUnitR ticketId}">
+ #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop}
+ <section>
+ <a href="@{OnboardTrackerR}">_{MsgStartTracking}
+ |]
getOnboardUnitR :: UUID -> Handler Html
getOnboardUnitR ticketId = do
@@ -541,6 +550,12 @@ getOnboardUnitR ticketId = do
Just ticket -> pure ticket
defaultLayout $(whamletFile "site/obu.hamlet")
+getOnboardTrackerR :: Handler Html
+getOnboardTrackerR = do
+ defaultLayout
+ $( whamletFile "site/tracker.hamlet")
+
+
announceForm :: UUID -> Html -> MForm Handler (FormResult Announcement, Widget)
announceForm ticketId = renderDivs $ Announcement
<$> pure (TicketKey ticketId)
diff --git a/lib/Server/GTFS_RT.hs b/lib/Server/GTFS_RT.hs
index 48a84db..d2e53a1 100644
--- a/lib/Server/GTFS_RT.hs
+++ b/lib/Server/GTFS_RT.hs
@@ -155,13 +155,14 @@ gtfsRealtimeServer gtfs@GTFS{..} dbpool =
ticket <- selectList [TicketCompleted ==. False] []
- positions <- forM ticket $ \(Entity key ticket) -> do
- selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case
- Nothing -> pure Nothing
- Just lastPing ->
- pure (Just $ mkPosition (lastPing, ticket))
-
- defFeedMessage (catMaybes positions)
+ -- TODO: reimplement this (since trainpings no longer reference tickets it's gone for now)
+ -- positions <- forM ticket $ \(Entity key ticket) -> do
+ -- selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case
+ -- Nothing -> pure Nothing
+ -- Just lastPing ->
+ -- pure (Just $ mkPosition (lastPing, ticket))
+
+ defFeedMessage [] -- (catMaybes positions)
where
mkPosition :: (Entity TrainPing, Ticket) -> RT.FeedEntity
diff --git a/lib/Server/Util.hs b/lib/Server/Util.hs
index 5ffb829..0106428 100644
--- a/lib/Server/Util.hs
+++ b/lib/Server/Util.hs
@@ -1,28 +1,67 @@
+{-# LANGUAGE BlockArguments #-}
+{-# LANGUAGE RecordWildCards #-}
-- | mostly the monad the service runs in
-module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds) where
+module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds, runLogging) where
+import Config (LoggingConfig (..))
+import Control.Exception (handle, try)
+import Control.Monad.Extra (void, whenJust)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (LoggingT, runStderrLoggingT)
+import Control.Monad.Logger (Loc, LogLevel (..), LogSource, LogStr,
+ LoggingT (..), defaultOutput,
+ fromLogStr, runStderrLoggingT)
+import Control.Monad.Reader (ReaderT (..))
import qualified Data.Aeson as A
import Data.ByteString (ByteString)
+import qualified Data.ByteString as C8
import Data.Text (Text)
+import qualified Data.Text as T
+import Data.Text.Encoding (decodeUtf8Lenient)
import Data.Time (Day, UTCTime (..), diffUTCTime,
getCurrentTime,
nominalDiffTimeToSeconds)
+import Fmt ((+|), (|+))
+import GHC.IO.Exception (IOException (IOError))
import GTFS (Seconds (..))
import Prometheus (MonadMonitor (doIO))
import Servant (Handler, ServerError, ServerT, err404,
errBody, errHeaders, throwError)
+import System.IO (stderr)
+import System.Process.Extra (callProcess)
-type ServiceM = LoggingT Handler
+type ServiceM = LoggingT (ReaderT LoggingConfig Handler)
type Service api = ServerT api ServiceM
-runService :: ServiceM a -> Handler a
-runService = runStderrLoggingT
+runService :: LoggingConfig -> ServiceM a -> Handler a
+runService conf m = runReaderT (runLogging conf m) conf
instance MonadMonitor ServiceM where
doIO = liftIO
+runLogging :: MonadIO m => LoggingConfig -> LoggingT m a -> m a
+runLogging LoggingConfig{..} logging = runLoggingT logging printLogMsg
+ where printLogMsg loc source level msg = do
+ -- this is what runStderrLoggingT does
+ defaultOutput stderr loc source level msg
+
+ whenJust loggingConfigNtfyToken \token -> handle ntfyFailed do
+ callProcess "ntfy"
+ [ "send"
+ , "--token=" <> T.unpack token
+ , "--title="+|loggingConfigHostname|+"/"+|"tracktrain"
+ , "--priority="+|show (ntfyPriority level)|+""
+ , T.unpack loggingConfigNtfyTopic
+ , T.unpack (decodeUtf8Lenient (fromLogStr msg)) ]
+
+ ntfyFailed (e :: IOError) =
+ putStrLn ("calling ntfy failed:"+|show e|+".")
+ ntfyPriority level = case level of
+ LevelDebug -> 2
+ LevelInfo -> 3
+ LevelWarn -> 4
+ LevelError -> 5
+ LevelOther _ -> 0
+
sendErrorMsg :: Text -> ServiceM a
sendErrorMsg msg = throwError err404
diff --git a/messages/en.msg b/messages/en.msg
index 41ced10..a2ef189 100644
--- a/messages/en.msg
+++ b/messages/en.msg
@@ -40,6 +40,7 @@ Tickets: Tickets
ImportTrips: import selected trips
delete: delete
AccordingToGtfs: Additional Trips contained in the Gtfs
+StartTracking: Start Tracking
OBU: Onboard-Unit
ChooseTrain: Choose a Train
diff --git a/site/tracker.hamlet b/site/tracker.hamlet
new file mode 100644
index 0000000..5877c5d
--- /dev/null
+++ b/site/tracker.hamlet
@@ -0,0 +1,137 @@
+<h1>_{MsgOBU}
+
+<section>
+ <h2>Blub
+ <strong>Token:</strong> <span id="token">
+<section>
+ <h2>Status
+ <p id="status">_{MsgNone}
+ <p id>_{MsgError}: <span id="error">
+<section>
+ <h2>_{MsgLive}
+ <p><strong>Position: </strong><span id="lat"></span>, <span id="long"></span>
+ <p><strong>Accuracy: </strong><span id="acc">
+<section>
+ <h2>_{MsgEstimated}
+ <p><strong>_{MsgDelay}</strong>: <span id="delay">
+ <p><strong>_{MsgSequence}</strong>: <span id="sequence">
+
+
+<script>
+ var token = null;
+
+ let euclid = (a,b) => {
+ let x = a[0]-b[0];
+ let y = a[1]-b[1];
+ return x*x+y*y;
+ }
+
+ let minimalDist = (point, list, proj, norm) => {
+ return list.reduce (
+ (min, x) => {
+ let dist = norm(point, proj(x));
+ return dist < min[0] ? [dist,x] : min
+ },
+ [norm(point, proj(list[0])), list[0]]
+ )[1]
+ }
+
+ let counter = 0;
+ let ws;
+ let id;
+
+ function setStatus(msg) {
+ document.getElementById("status").innerText = msg
+ }
+
+ async function geoError(error) {
+ setStatus("error");
+ alert(`_{MsgPermissionFailed}: \n${error.message}`);
+ console.error(error);
+ main();
+ }
+
+ async function wsError(error) {
+ // alert(`_{MsgWebsocketError}: \n${error.message === undefined ? error.reason : error.message}`);
+ console.log(error);
+ navigator.geolocation.clearWatch(id);
+ }
+
+ async function wsClose(error) {
+ console.log(error);
+ document.getElementById("error").innerText = `websocket closed (reason: ${error.reason}). reconnecting …`;
+ navigator.geolocation.clearWatch(id);
+ setTimeout(openWebsocket, 1000);
+ }
+
+ function wsMsg(msg) {
+ let json = JSON.parse(msg.data);
+ console.log(json);
+ document.getElementById("delay").innerText =
+ `${json.delay}s (${Math.floor(json.delay / 60)}min)`;
+ document.getElementById("sequence").innerText = json.sequence;
+ }
+
+
+ function initGeopos() {
+ document.getElementById("error").innerText = "";
+ id = navigator.geolocation.watchPosition(
+ geoPing,
+ geoError,
+ {enableHighAccuracy: true}
+ );
+ }
+
+
+ function openWebsocket () {
+ ws = new WebSocket((location.protocol == "http:" ? "ws" : "wss") + "://" + location.host + "/api/tracker/ping/ws");
+ ws.onerror = wsError;
+ ws.onclose = wsClose;
+ ws.onmessage = wsMsg;
+ ws.onopen = (event) => {
+ setStatus("connected");
+ };
+ }
+
+ async function geoPing(geoloc) {
+ console.log("got position update " + counter);
+ document.getElementById("lat").innerText = geoloc.coords.latitude;
+ document.getElementById("long").innerText = geoloc.coords.longitude;
+ document.getElementById("acc").innerText = geoloc.coords.accuracy;
+
+ if (ws !== undefined && ws.readyState == 1) {
+ ws.send(JSON.stringify({
+ token: token,
+ geopos: [ geoloc.coords.latitude, geoloc.coords.longitude ],
+ timestamp: (new Date()).toISOString()
+ }));
+ counter += 1;
+ setStatus(`sent ${counter} pings`);
+ } else {
+ setStatus(`websocket readystate ${ws.readyState}`);
+ }
+ }
+
+
+ async function main() {
+ initGeopos();
+
+ token = await (await fetch("/api/tracker/register/", {
+ method: "POST",
+ body: JSON.stringify({agent: "tracktrain-website"}),
+ headers: {"Content-Type": "application/json"}
+ })).json();
+
+ if (token.error) {
+ alert("could not obtain token: \n" + token.msg);
+ setStatus("_{MsgTokenFailed}");
+ } else {
+ console.log("got token");
+
+ document.getElementById("token").innerText = token;
+
+ openWebsocket();
+ }
+ }
+
+ main()
diff --git a/tracktrain.cabal b/tracktrain.cabal
index ad1624f..a8e42a4 100644
--- a/tracktrain.cabal
+++ b/tracktrain.cabal
@@ -97,6 +97,7 @@ library
, proto-lens
, http-media
, filepath
+ , monad-control
hs-source-dirs: lib
exposed-modules: GTFS
, Server