aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/API.hs54
-rw-r--r--lib/Config.hs20
-rw-r--r--lib/Extrapolation.hs9
-rw-r--r--lib/Persist.hs38
-rw-r--r--lib/Server.hs179
-rw-r--r--lib/Server/ControlRoom.hs39
-rw-r--r--lib/Server/GTFS_RT.hs15
-rw-r--r--lib/Server/Util.hs49
-rw-r--r--messages/en.msg1
-rw-r--r--site/tracker.hamlet137
-rw-r--r--tracktrain.cabal1
11 files changed, 407 insertions, 135 deletions
diff --git a/lib/API.hs b/lib/API.hs
index 2c8123a..b2635c1 100644
--- a/lib/API.hs
+++ b/lib/API.hs
@@ -39,7 +39,8 @@ import Data.Aeson (FromJSON (..), Value,
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as LB
import Data.HashMap.Strict.InsOrd (singleton)
-import Data.ProtoLens (Message, encodeMessage)
+import Data.ProtoLens (Message (messageName),
+ encodeMessage)
import GHC.Generics (Generic)
import GTFS (Depth (Deep), GTFSFile (..),
StationID, Trip, TripId,
@@ -50,45 +51,24 @@ import Prometheus
import Proto.GtfsRealtime (FeedMessage)
import Servant.API.ContentTypes (Accept (..))
-newtype RegisterJson = RegisterJson
- { registerAgent :: Text }
- deriving (Show, Generic)
-
-instance FromJSON RegisterJson where
- parseJSON = genericParseJSON (aesonOptions "register")
-instance ToSchema RegisterJson where
- declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register")
-instance ToSchema Value where
- declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty
- & type_ ?~ SwaggerObject
-
--- | The server's API (as it is actually intended).
-type API = "timetable" :> Capture "Station Id" StationID :> QueryParam "day" Day :> Get '[JSON] (Map TripId (Trip Deep Deep))
- :<|> "timetable" :> "stops" :> Capture "Date" Day :> Get '[JSON] Value
- :<|> "trip" :> Capture "Trip Id" TripId :> Get '[JSON] (Trip Deep Deep)
+-- | tracktrain's API
+type API =
-- ingress API (put this behind BasicAuth?)
-- TODO: perhaps require a first ping for registration?
- :<|> "train" :> "register" :> Capture "Ticket Id" UUID :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token
- -- TODO: perhaps a websocket instead?
- :<|> "train" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor)
- :<|> "train" :> "ping" :> "ws" :> WebSocket
- :<|> "train" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket
- -- debug things
+ "tracker" :> "register" :> ReqBody '[JSON] RegisterJson :> Post '[JSON] Token
+ :<|> "tracker" :> "ping" :> ReqBody '[JSON] TrainPing :> Post '[JSON] (Maybe TrainAnchor)
+ :<|> "tracker" :> "ping" :> "ws" :> WebSocket
+ :<|> "ticket" :> "subscribe" :> Capture "Ticket Id" UUID :> WebSocket
:<|> "debug" :> "pings" :> Get '[JSON] (Map Token [TrainPing])
:<|> "debug" :> "pings" :> Capture "Ticket Id" UUID :> Get '[JSON] [TrainPing]
- :<|> "debug" :> "register" :> Capture "Ticket Id" UUID :> Post '[JSON] Token
:<|> "gtfs.zip" :> Get '[OctetStream] GTFSFile
:<|> "gtfs" :> GtfsRealtimeAPI
--- | The API used for publishing gtfs realtime updates
type GtfsRealtimeAPI = "servicealerts" :> Get '[Proto] FeedMessage
:<|> "tripupdates" :> Get '[Proto] FeedMessage
:<|> "vehiclepositions" :> Get '[Proto] FeedMessage
--- | The server's API with an additional debug route for accessing the specification
--- itself. Split from API to prevent the API documenting the format in which it is
--- documented, which would be silly and way to verbose.
type CompleteAPI =
"api" :> "openapi" :> Get '[JSON] Swagger
:<|> "api" :> API
@@ -103,6 +83,18 @@ data Metrics = Metrics
instance MimeRender OctetStream GTFSFile where
mimeRender p (GTFSFile bytes) = mimeRender p bytes
+newtype RegisterJson = RegisterJson
+ { registerAgent :: Text }
+ deriving (Show, Generic)
+
+instance FromJSON RegisterJson where
+ parseJSON = genericParseJSON (aesonOptions "register")
+instance ToSchema RegisterJson where
+ declareNamedSchema = genericDeclareNamedSchema (swaggerOptions "register")
+instance ToSchema Value where
+ declareNamedSchema _ = pure $ NamedSchema (Just "json") $ mempty
+ & type_ ?~ SwaggerObject
+
-- TODO write something useful here! (and if it's just "hey this is some websocket thingie")
@@ -128,7 +120,7 @@ instance Accept Proto where
instance Message msg => MimeRender Proto msg where
mimeRender _ = LB.fromStrict . encodeMessage
--- TODO: this instance is horrible; ideally it should at least include
--- the name of the message type (if at all possible)
+-- | Not an ideal instance, hides fields of the protobuf message
instance {-# OVERLAPPABLE #-} Message msg => ToSchema msg where
- declareNamedSchema _ = declareNamedSchema (Proxy @String)
+ declareNamedSchema proxy =
+ pure (NamedSchema (Just (messageName proxy)) mempty)
diff --git a/lib/Config.hs b/lib/Config.hs
index 2349e66..94fdd28 100644
--- a/lib/Config.hs
+++ b/lib/Config.hs
@@ -1,10 +1,11 @@
{-# LANGUAGE RecordWildCards #-}
-module Config where
+module Config (UffdConfig(..), ServerConfig(..), LoggingConfig(..)) where
import Conferer (DefaultConfig (configDef))
import Conferer.FromConfig
import Conferer.FromConfig.Warp ()
import Data.ByteString (ByteString)
+import Data.Functor ((<&>))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.Wai.Handler.Warp (Settings)
@@ -24,6 +25,13 @@ data ServerConfig = ServerConfig
, serverConfigAssets :: FilePath
, serverConfigZoneinfoPath :: FilePath
, serverConfigLogin :: UffdConfig
+ , serverConfigLogging :: LoggingConfig
+ } deriving (Generic)
+
+data LoggingConfig = LoggingConfig
+ { loggingConfigNtfyToken :: Maybe Text
+ , loggingConfigNtfyTopic :: Text
+ , loggingConfigHostname :: Text
} deriving (Generic)
instance FromConfig ServerConfig
@@ -36,6 +44,7 @@ instance DefaultConfig ServerConfig where
, serverConfigAssets = "./assets"
, serverConfigZoneinfoPath = "/etc/zoneinfo/"
, serverConfigLogin = configDef
+ , serverConfigLogging = configDef
}
instance DefaultConfig UffdConfig where
@@ -50,3 +59,12 @@ instance FromConfig UffdConfig where
uffdConfigClientSecret <- fetchFromConfig (key /. "clientSecret") config
uffdConfigEnable <- fetchFromConfig (key /. "enable") config
pure UffdConfig {..}
+
+instance FromConfig LoggingConfig where
+ fromConfig key config = LoggingConfig
+ <$> fetchFromConfig (key /. "ntfyToken") config
+ <*> fetchFromConfig (key /. "ntfyTopic") config
+ <*> fetchFromConfig (key /. "name") config
+
+instance DefaultConfig LoggingConfig where
+ configDef = LoggingConfig Nothing "tracktrain" ""
diff --git a/lib/Extrapolation.hs b/lib/Extrapolation.hs
index 01e5f6f..389d047 100644
--- a/lib/Extrapolation.hs
+++ b/lib/Extrapolation.hs
@@ -27,8 +27,8 @@ import GTFS (Seconds (..),
import Persist (Geopos (..),
ShapePoint (shapePointGeopos),
Station (..), Stop (..),
- Ticket (..), Token (..),
- Tracker (..),
+ Ticket (..), TicketId,
+ Token (..), Tracker (..),
TrainAnchor (..),
TrainPing (..))
import Server.Util (utcToSeconds)
@@ -40,6 +40,7 @@ class Extrapolator strategy where
-- | here's a position ping, guess things from that!
extrapolateAnchorFromPing
:: strategy
+ -> TicketId
-> Ticket
-> V.Vector (Stop, Station, TimeZoneSeries)
-> V.Vector ShapePoint
@@ -68,9 +69,9 @@ instance Extrapolator LinearExtrapolator where
$ NE.filter (\a -> trainAnchorSequence a < positionNow) history
where difference status = positionNow - trainAnchorSequence status
- extrapolateAnchorFromPing _ Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor
+ extrapolateAnchorFromPing _ ticketId Ticket{..} stops shape ping@TrainPing{..} = TrainAnchor
{ trainAnchorCreated = trainPingTimestamp
- , trainAnchorTicket = trainPingTicket
+ , trainAnchorTicket = ticketId
, trainAnchorWhen = utcToSeconds trainPingTimestamp ticketDay
, trainAnchorSequence
, trainAnchorDelay
diff --git a/lib/Persist.hs b/lib/Persist.hs
index 7613fd9..46e5ef4 100644
--- a/lib/Persist.hs
+++ b/lib/Persist.hs
@@ -21,13 +21,19 @@ import Database.Persist.Sql (PersistFieldSql (..),
import Database.Persist.TH
import qualified GTFS
import PersistOrphans
-import Servant (FromHttpApiData (..),
+import Servant (FromHttpApiData (..), Handler,
ToHttpApiData)
-import Conduit (ResourceT)
+import Conduit (MonadTrans (lift), MonadUnliftIO,
+ ResourceT, runResourceT)
+import Config (LoggingConfig)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (NoLoggingT)
-import Control.Monad.Reader (ReaderT)
+import Control.Monad.Logger (LoggingT, MonadLogger, NoLoggingT,
+ runNoLoggingT, runStderrLoggingT)
+import Control.Monad.Reader (MonadReader (ask),
+ ReaderT (runReaderT), runReader)
+import Control.Monad.Trans.Control (MonadBaseControl (liftBaseWith),
+ MonadTransControl (liftWith, restoreT))
import Data.Data (Proxy (..))
import Data.Map (Map)
import Data.Pool (Pool)
@@ -37,10 +43,11 @@ import Data.Time (NominalDiffTime, TimeOfDay,
getCurrentTime, nominalDay)
import Data.Time.Calendar (Day, DayOfWeek (..))
import Data.Vector (Vector)
-import Database.Persist.Postgresql (SqlBackend)
+import Database.Persist.Postgresql (SqlBackend, runSqlPool)
import Fmt
import GHC.Generics (Generic)
import MultiLangText (MultiLangText)
+import Server.Util (runLogging)
import Web.PathPieces (PathPiece)
import Yesod (Lang)
@@ -120,6 +127,7 @@ Tracker sql=tt_tracker_token
expires UTCTime
blocked Bool
agent Text
+ currentTicket TicketId Maybe
deriving Eq Show Generic
TrackerTicket
@@ -130,7 +138,7 @@ TrackerTicket
-- raw frames as received from OBUs
TrainPing json sql=tt_trip_ping
- ticket TicketId
+ -- ticket TicketId
token TrackerId
geopos Geopos
timestamp UTCTime
@@ -169,5 +177,19 @@ instance ToSchema TrainAnchor where
instance ToSchema Announcement where
declareNamedSchema = genericDeclareNamedSchema (GTFS.swaggerOptions "announcement")
-runSql :: MonadIO m => Pool SqlBackend -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a -> m a
-runSql pool = liftIO . flip runSqlPersistMPool pool
+runSqlWithoutLog :: MonadIO m
+ => Pool SqlBackend
+ -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) a
+ -> m a
+runSqlWithoutLog pool = liftIO . flip runSqlPersistMPool pool
+
+-- It's a bit unfortunate that we have an extra reader here for just the
+-- logging config, but since Handler is not MonadUnliftIO there seems to be (?)
+-- no better way than to nest logger monads …
+runSql :: (MonadLogger m, MonadIO m, MonadReader LoggingConfig m)
+ => Pool SqlBackend
+ -> ReaderT SqlBackend (LoggingT (ResourceT IO)) a
+ -> m a
+runSql pool x = do
+ conf <- ask
+ liftIO $ runResourceT $ runLogging conf $ runSqlPool x pool
diff --git a/lib/Server.hs b/lib/Server.hs
index 3922a7b..73c55cb 100644
--- a/lib/Server.hs
+++ b/lib/Server.hs
@@ -16,12 +16,14 @@ import Control.Concurrent.STM (TQueue, TVar, atomically,
import Control.Monad (forever, unless, void,
when)
import Control.Monad.Catch (handle)
-import Control.Monad.Extra (ifM, maybeM, unlessM,
- whenJust, whenM)
+import Control.Monad.Extra (ifM, mapMaybeM, maybeM,
+ unlessM, whenJust, whenM)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (LoggingT, NoLoggingT,
+import Control.Monad.Logger (LoggingT, MonadLogger,
+ NoLoggingT, logInfoN,
logWarnN)
-import Control.Monad.Reader (ReaderT, forM)
+import Control.Monad.Reader (MonadReader, ReaderT,
+ forM)
import Control.Monad.Trans (lift)
import Data.Aeson ((.=))
import qualified Data.Aeson as A
@@ -33,7 +35,7 @@ import Data.Pool (Pool)
import Data.Proxy (Proxy (Proxy))
import Data.Swagger (toSchema)
import Data.Text (Text)
-import Data.Text.Encoding (decodeUtf8)
+import Data.Text.Encoding (decodeASCII, decodeUtf8)
import Data.Time (NominalDiffTime,
UTCTime (utctDay),
addUTCTime, diffUTCTime,
@@ -48,7 +50,8 @@ import Fmt ((+|), (|+))
import qualified Network.WebSockets as WS
import Servant (Application,
ServerError (errBody),
- err401, err404, serve,
+ err400, err401, err404,
+ serve,
serveDirectoryFileServer,
throwError)
import Servant.API (NoContent (..),
@@ -62,24 +65,33 @@ import Persist
import Server.ControlRoom
import Server.GTFS_RT (gtfsRealtimeServer)
import Server.Util (Service, ServiceM,
- runService, sendErrorMsg)
+ runService, sendErrorMsg,
+ utcToSeconds)
import Yesod (toWaiAppPlain)
-import Extrapolation (Extrapolator (..),
- LinearExtrapolator (..))
-import System.IO.Unsafe
-
import Conduit (ResourceT)
-import Config (ServerConfig (serverConfigAssets, serverConfigZoneinfoPath))
+import Config (LoggingConfig,
+ ServerConfig (..))
+import Control.Exception (throw)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (toStrict)
+import Data.Foldable (minimumBy)
+import Data.Function (on, (&))
+import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Data.Time.LocalTime.TimeZone.Olson (getTimeZoneSeriesFromOlsonFile)
import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries)
import Data.UUID (UUID)
+import qualified Data.UUID as UUID
+import Extrapolation (Extrapolator (..),
+ LinearExtrapolator (..),
+ euclid)
+import GTFS (Seconds (unSeconds),
+ seconds2Double)
import Prometheus
import Prometheus.Metric.GHC
import System.FilePath ((</>))
+import System.IO.Unsafe
application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application
application gtfs dbpool settings = do
@@ -94,65 +106,84 @@ application gtfs dbpool settings = do
(serverConfigZoneinfoPath settings </> T.unpack tzname)
subscribers <- newTVarIO mempty
- pure $ serve (Proxy @CompleteAPI) $ hoistServer (Proxy @CompleteAPI) runService
+ pure $ serve (Proxy @CompleteAPI)
+ $ hoistServer (Proxy @CompleteAPI) (runService (serverConfigLogging settings))
$ server gtfs getTzseries metrics subscribers dbpool settings
-- databaseMigration :: ConnectionString -> IO ()
-doMigration pool = runSql pool $ runMigration $ do
+doMigration pool = runSqlWithoutLog pool $ runMigration $ do
migrateEnableExtension "uuid-ossp"
migrateAll
server :: GTFS.GTFS -> (Text -> IO TimeZoneSeries) -> Metrics -> TVar (M.Map UUID [TQueue (Maybe TrainPing)]) -> Pool SqlBackend -> ServerConfig -> Service CompleteAPI
server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
- :<|> (handleTimetable :<|> handleTimetableStops :<|> handleTrip
- :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS
+ :<|> (handleTrackerRegister :<|> handleTrainPing (throwError err401) :<|> handleWS
:<|> handleSubscribe :<|> handleDebugState :<|> handleDebugTrain
- :<|> handleDebugRegister :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
+ :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
:<|> metrics
:<|> serveDirectoryFileServer (serverConfigAssets settings)
:<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool settings)))
- where handleTimetable station maybeDay =
- M.filter isLastStop . GTFS.tripsOnDay gtfs <$> liftIO day
- where isLastStop = (==) station . GTFS.stationId . GTFS.stopStation . V.last . GTFS.tripStops
- day = maybeM (getCurrentTime <&> utctDay) pure (pure maybeDay)
- handleTimetableStops day =
- pure . A.toJSON . fmap mkJson . M.elems $ GTFS.tripsOnDay gtfs day
- where mkJson :: GTFS.Trip GTFS.Deep GTFS.Deep -> A.Value
- mkJson GTFS.Trip {..} = A.object
- [ "trip" .= tripTripId
- , "sequencelength" .= (GTFS.stopSequence . V.last) tripStops
- , "stops" .= fmap (\GTFS.Stop{..} -> A.object
- [ "departure" .= GTFS.toUTC stopDeparture (GTFS.tzseries gtfs) day
- , "arrival" .= GTFS.toUTC stopArrival (GTFS.tzseries gtfs) day
- , "station" .= GTFS.stationId stopStation
- , "lat" .= GTFS.stationLat stopStation
- , "lon" .= GTFS.stationLon stopStation
- ]) tripStops
- ]
- handleTrip trip = case M.lookup trip (GTFS.trips gtfs) of
- Just res -> pure res
- Nothing -> throwError err404
- handleRegister (ticketId :: UUID) RegisterJson{..} = do
+ where handleTrackerRegister RegisterJson{..} = do
today <- liftIO getCurrentTime <&> utctDay
expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod
runSql dbpool $ do
- TrackerKey tracker <- insert (Tracker expires False registerAgent)
- insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker))
- pure tracker
- handleDebugRegister (ticketId :: UUID) = do
- expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod
- runSql dbpool $ do
- TrackerKey tracker <- insert (Tracker expires False "debug key")
- insert (TrackerTicket (TicketKey ticketId) (TrackerKey tracker))
+ TrackerKey tracker <- insert (Tracker expires False registerAgent Nothing)
pure tracker
handleTrainPing onError ping@TrainPing{..} =
- let ticketId = trainPingTicket in
- isTokenValid dbpool trainPingToken ticketId >>= \case
- Nothing -> do
- onError
- pure Nothing
- Just (tracker@Tracker{..}, ticket@Ticket{..}) -> do
+ isTokenValid dbpool trainPingToken >>= \case
+ Nothing -> onError >> pure Nothing
+ Just tracker@Tracker{..} -> do
+
+ -- if the tracker is not associated with a ticket, it is probably
+ -- just starting out on a new trip, or has finished an old one.
+ maybeTicketId <- case trackerCurrentTicket of
+ Just ticketId -> pure (Just ticketId)
+ Nothing -> runSql dbpool $ do
+ now <- liftIO getCurrentTime
+ tickets <- selectList [ TicketDay ==. utctDay now, TicketCompleted ==. False ] []
+ ticketsWithFirstStation <- flip mapMaybeM tickets
+ (\ticket@(Entity ticketId _) -> do
+ selectFirst [StopTicket ==. ticketId] [Asc StopSequence] >>= \case
+ Nothing -> pure Nothing
+ Just (Entity _ stop) -> do
+ station <- getJust (stopStation stop)
+ tzseries <- liftIO $ getTzseries (GTFS.tzname (stopDeparture stop))
+ pure (Just (ticket, station, stop, tzseries)))
+
+ if null ticketsWithFirstStation then pure Nothing else do
+ let (closestTicket, _, _, _) = minimumBy
+ -- (compare `on` euclid trainPingGeopos . stationGeopos . snd)
+ (compare `on`
+ (\(Entity _ ticket, station, stop, tzseries) ->
+ let
+ runningDay = ticketDay ticket
+ spaceDistance = euclid trainPingGeopos (stationGeopos station)
+ timeDiff =
+ GTFS.toSeconds (stopDeparture stop) tzseries runningDay
+ - utcToSeconds now runningDay
+ in
+ euclid trainPingGeopos (stationGeopos station)
+ + abs (seconds2Double timeDiff / 3600)))
+ ticketsWithFirstStation
+ logInfoN
+ $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
+ " (trip "+|ticketTripName (entityVal closestTicket)|+")."
+
+ update (coerce trainPingToken)
+ [TrackerCurrentTicket =. Just (entityKey closestTicket)]
+
+ pure (Just (entityKey closestTicket))
+
+ ticketId <- case maybeTicketId of
+ Just ticketId -> pure ticketId
+ Nothing -> do
+ logWarnN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ " sent a ping, but no trips are running today."
+ throwError err400
+
runSql dbpool $ do
+ ticket@Ticket{..} <- getJust ticketId
stations <- selectList [ StopTicket ==. ticketId ] [Asc StopArrival]
>>= mapM (\stop -> do
@@ -165,17 +196,31 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
<&> (V.fromList . fmap entityVal)
let anchor = extrapolateAnchorFromPing LinearExtrapolator
- ticket stations shapePoints ping
+ ticketId ticket stations shapePoints ping
insert ping
- last <- selectFirst [TrainAnchorTicket ==. trainPingTicket] [Desc TrainAnchorWhen]
+ last <- selectFirst [TrainAnchorTicket ==. ticketId] [Desc TrainAnchorWhen]
-- only insert new estimates if they've actually changed anything
when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor))
$ void $ insert anchor
+ -- are we at the final stop? if so, mark this ticket as done
+ -- & the tracker as free
+ let maxSequence = V.last stations
+ & (\(stop, _, _) -> stopSequence stop)
+ & fromIntegral
+ when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do
+ update (coerce trainPingToken)
+ [TrackerCurrentTicket =. Nothing]
+ update ticketId
+ [TicketCompleted =. True]
+ logInfoN $ "Tracker "+|UUID.toString (coerce trainPingToken)|+
+ " has completed ticket "+|UUID.toString (coerce ticketId)|+
+ " (trip "+|ticketTripName|+")"
+
queues <- liftIO $ atomically $ do
- queues <- readTVar subscribers <&> M.lookup (coerce trainPingTicket)
+ queues <- readTVar subscribers <&> M.lookup (coerce ticketId)
whenJust queues $
mapM_ (\q -> writeTQueue q (Just ping))
pure queues
@@ -187,14 +232,14 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
msg <- liftIO $ WS.receiveData conn
case A.eitherDecode msg of
Left err -> do
- logWarnN ("stray websocket message: "+|show msg|+" (could not decode: "+|err|+")")
+ logWarnN ("stray websocket message: "+|decodeASCII (toStrict msg)|+" (could not decode: "+|err|+")")
liftIO $ WS.sendClose conn (C8.pack err)
-- TODO: send a close msg (Nothing) to the subscribed queues? decGauge metricsWSGauge
- Right ping ->
+ Right ping -> do
-- if invalid token, send a "polite" close request. Note that the client may
-- ignore this and continue sending messages, which will continue to be handled.
- liftIO $ handleTrainPing (WS.sendClose conn ("" :: ByteString)) ping >>= \case
- Just anchor -> WS.sendTextData conn (A.encode anchor)
+ handleTrainPing (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case
+ Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor)
Nothing -> pure ()
handleSubscribe (ticketId :: UUID) conn = liftIO $ WS.withPingThread conn 30 (pure ()) $ do
queue <- atomically $ do
@@ -204,7 +249,7 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
$ M.insertWith (<>) ticketId [queue] qs
pure queue
-- send most recent ping, if any (so we won't have to wait for movement)
- lastPing <- runSql dbpool $ do
+ lastPing <- runSqlWithoutLog dbpool $ do
trackers <- getTicketTrackers ticketId
<&> fmap entityKey
selectFirst [TrainPingToken <-. trackers] [Desc TrainPingTimestamp]
@@ -239,21 +284,21 @@ server gtfs getTzseries Metrics{..} subscribers dbpool settings = handleDebugAPI
handleDebugAPI = pure $ toSwagger (Proxy @API)
metrics = exportMetricsAsText <&> (decodeUtf8 . toStrict)
-getTicketTrackers :: UUID -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) [Entity Tracker]
+getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
+ => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = do
joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
<&> fmap (trackerTicketTracker . entityVal)
- selectList [TrackerId <-. joins] []
+ selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []
-- TODO: proper debug logging for expired tokens
-isTokenValid :: MonadIO m => Pool SqlBackend -> TrackerId -> TicketId -> m (Maybe (Tracker, Ticket))
-isTokenValid dbpool token ticketId = runSql dbpool $ get token >>= \case
+isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker)
+isTokenValid dbpool token = runSql dbpool $ get token >>= \case
Just tracker | not (trackerBlocked tracker) -> do
ifM (hasExpired (trackerExpires tracker))
(pure Nothing)
- $ runSql dbpool $ get ticketId
- <&> (\case { Nothing -> Nothing; Just ticket -> Just (tracker, ticket) })
+ (pure (Just tracker))
_ -> pure Nothing
hasExpired :: MonadIO m => UTCTime -> m Bool
diff --git a/lib/Server/ControlRoom.hs b/lib/Server/ControlRoom.hs
index 9d15bcf..5292620 100644
--- a/lib/Server/ControlRoom.hs
+++ b/lib/Server/ControlRoom.hs
@@ -17,6 +17,7 @@ import Control.Monad.IO.Class (MonadIO (liftIO))
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as C8
import qualified Data.ByteString.Lazy as LB
+import Data.Coerce (coerce)
import Data.Function (on, (&))
import Data.Functor ((<&>))
import Data.List (lookup, nubBy)
@@ -86,6 +87,7 @@ mkYesod "ControlRoom" [parseRoutes|
/obu OnboardUnitMenuR GET
/obu/#UUID OnboardUnitR GET
+/tracker OnboardTrackerR GET
|]
emptyMarkup :: MarkupM a -> Bool
@@ -96,6 +98,7 @@ instance Yesod ControlRoom where
authRoute _ = Just $ AuthR LoginR
isAuthorized OnboardUnitMenuR _ = pure Authorized
isAuthorized (OnboardUnitR _) _ = pure Authorized
+ isAuthorized OnboardTrackerR _ = pure Authorized
isAuthorized (AuthR _) _ = pure Authorized
isAuthorized _ _ = do
UffdConfig{..} <- getYesod <&> serverConfigLogin . getSettings
@@ -200,7 +203,7 @@ getTicketsR = do
gtfs <- getYesod <&> getGtfs
-- TODO: tickets should have all trip information saved
- tickets <- runDB $ selectList [ TicketDay ==. day ] [] >>= mapM (\ticket -> do
+ tickets <- runDB $ selectList [ TicketDay ==. day ] [ Asc TicketTripName ] >>= mapM (\ticket -> do
stops <- selectList [ StopTicket ==. entityKey ticket ] []
startStation <- getJust (stopStation $ entityVal $ head stops)
pure (ticket, startStation, fmap entityVal stops))
@@ -317,9 +320,11 @@ getTicketViewR ticketId = do
pure (entityVal stop, station))
anns <- runDB $ selectList [ AnnouncementTicket ==. ticketKey ] []
- trackerIds <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] []
+ joins <- runDB $ selectList [ TrackerTicketTicket ==. ticketKey ] []
<&> fmap (trackerTicketTracker . entityVal)
- trackers <- runDB $ selectList [ TrackerId <-. trackerIds ] [Asc TrackerExpires]
+ trackers <- runDB $ selectList
+ ([ TrackerId <-. joins ] ||. [ TrackerCurrentTicket ==. Just ticketKey ])
+ [Asc TrackerExpires]
lastPing <- runDB $ selectFirst [ TrainPingToken <-. fmap entityKey trackers ] [Desc TrainPingTimestamp]
anchors <- runDB $ selectList [ TrainAnchorTicket ==. ticketKey ] []
<&> nonEmpty . fmap entityVal
@@ -511,7 +516,9 @@ getTokenBlock token = do
case maybe of
Just r@Tracker{..} -> do
liftIO $ print r
- redirect RootR
+ redirect $ case trackerCurrentTicket of
+ Just ticket -> TicketViewR (coerce ticket)
+ Nothing -> RootR
Nothing -> notFound
getOnboardUnitMenuR :: Handler Html
@@ -525,14 +532,16 @@ getOnboardUnitMenuR = do
defaultLayout $ do
[whamlet|
-<h1>_{MsgOBU}
-<section>
- _{MsgChooseTrain}
- $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets
- <hr>
- <a href="@{OnboardUnitR ticketId}">
- #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop}
-|]
+ <h1>_{MsgOBU}
+ <section>
+ _{MsgChooseTrain}
+ $forall (Entity (TicketKey ticketId) Ticket{..}, firstStop) <- tickets
+ <hr>
+ <a href="@{OnboardUnitR ticketId}">
+ #{ticketTripName}: #{ticketHeadsign} #{stopDeparture firstStop}
+ <section>
+ <a href="@{OnboardTrackerR}">_{MsgStartTracking}
+ |]
getOnboardUnitR :: UUID -> Handler Html
getOnboardUnitR ticketId = do
@@ -541,6 +550,12 @@ getOnboardUnitR ticketId = do
Just ticket -> pure ticket
defaultLayout $(whamletFile "site/obu.hamlet")
+getOnboardTrackerR :: Handler Html
+getOnboardTrackerR = do
+ defaultLayout
+ $( whamletFile "site/tracker.hamlet")
+
+
announceForm :: UUID -> Html -> MForm Handler (FormResult Announcement, Widget)
announceForm ticketId = renderDivs $ Announcement
<$> pure (TicketKey ticketId)
diff --git a/lib/Server/GTFS_RT.hs b/lib/Server/GTFS_RT.hs
index 48a84db..d2e53a1 100644
--- a/lib/Server/GTFS_RT.hs
+++ b/lib/Server/GTFS_RT.hs
@@ -155,13 +155,14 @@ gtfsRealtimeServer gtfs@GTFS{..} dbpool =
ticket <- selectList [TicketCompleted ==. False] []
- positions <- forM ticket $ \(Entity key ticket) -> do
- selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case
- Nothing -> pure Nothing
- Just lastPing ->
- pure (Just $ mkPosition (lastPing, ticket))
-
- defFeedMessage (catMaybes positions)
+ -- TODO: reimplement this (since trainpings no longer reference tickets it's gone for now)
+ -- positions <- forM ticket $ \(Entity key ticket) -> do
+ -- selectFirst [TrainPingTicket ==. key] [Desc TrainPingTimestamp] >>= \case
+ -- Nothing -> pure Nothing
+ -- Just lastPing ->
+ -- pure (Just $ mkPosition (lastPing, ticket))
+
+ defFeedMessage [] -- (catMaybes positions)
where
mkPosition :: (Entity TrainPing, Ticket) -> RT.FeedEntity
diff --git a/lib/Server/Util.hs b/lib/Server/Util.hs
index 5ffb829..0106428 100644
--- a/lib/Server/Util.hs
+++ b/lib/Server/Util.hs
@@ -1,28 +1,67 @@
+{-# LANGUAGE BlockArguments #-}
+{-# LANGUAGE RecordWildCards #-}
-- | mostly the monad the service runs in
-module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds) where
+module Server.Util (Service, ServiceM, runService, sendErrorMsg, secondsNow, utcToSeconds, runLogging) where
+import Config (LoggingConfig (..))
+import Control.Exception (handle, try)
+import Control.Monad.Extra (void, whenJust)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (LoggingT, runStderrLoggingT)
+import Control.Monad.Logger (Loc, LogLevel (..), LogSource, LogStr,
+ LoggingT (..), defaultOutput,
+ fromLogStr, runStderrLoggingT)
+import Control.Monad.Reader (ReaderT (..))
import qualified Data.Aeson as A
import Data.ByteString (ByteString)
+import qualified Data.ByteString as C8
import Data.Text (Text)
+import qualified Data.Text as T
+import Data.Text.Encoding (decodeUtf8Lenient)
import Data.Time (Day, UTCTime (..), diffUTCTime,
getCurrentTime,
nominalDiffTimeToSeconds)
+import Fmt ((+|), (|+))
+import GHC.IO.Exception (IOException (IOError))
import GTFS (Seconds (..))
import Prometheus (MonadMonitor (doIO))
import Servant (Handler, ServerError, ServerT, err404,
errBody, errHeaders, throwError)
+import System.IO (stderr)
+import System.Process.Extra (callProcess)
-type ServiceM = LoggingT Handler
+type ServiceM = LoggingT (ReaderT LoggingConfig Handler)
type Service api = ServerT api ServiceM
-runService :: ServiceM a -> Handler a
-runService = runStderrLoggingT
+runService :: LoggingConfig -> ServiceM a -> Handler a
+runService conf m = runReaderT (runLogging conf m) conf
instance MonadMonitor ServiceM where
doIO = liftIO
+runLogging :: MonadIO m => LoggingConfig -> LoggingT m a -> m a
+runLogging LoggingConfig{..} logging = runLoggingT logging printLogMsg
+ where printLogMsg loc source level msg = do
+ -- this is what runStderrLoggingT does
+ defaultOutput stderr loc source level msg
+
+ whenJust loggingConfigNtfyToken \token -> handle ntfyFailed do
+ callProcess "ntfy"
+ [ "send"
+ , "--token=" <> T.unpack token
+ , "--title="+|loggingConfigHostname|+"/"+|"tracktrain"
+ , "--priority="+|show (ntfyPriority level)|+""
+ , T.unpack loggingConfigNtfyTopic
+ , T.unpack (decodeUtf8Lenient (fromLogStr msg)) ]
+
+ ntfyFailed (e :: IOError) =
+ putStrLn ("calling ntfy failed:"+|show e|+".")
+ ntfyPriority level = case level of
+ LevelDebug -> 2
+ LevelInfo -> 3
+ LevelWarn -> 4
+ LevelError -> 5
+ LevelOther _ -> 0
+
sendErrorMsg :: Text -> ServiceM a
sendErrorMsg msg = throwError err404
diff --git a/messages/en.msg b/messages/en.msg
index 41ced10..a2ef189 100644
--- a/messages/en.msg
+++ b/messages/en.msg
@@ -40,6 +40,7 @@ Tickets: Tickets
ImportTrips: import selected trips
delete: delete
AccordingToGtfs: Additional Trips contained in the Gtfs
+StartTracking: Start Tracking
OBU: Onboard-Unit
ChooseTrain: Choose a Train
diff --git a/site/tracker.hamlet b/site/tracker.hamlet
new file mode 100644
index 0000000..5877c5d
--- /dev/null
+++ b/site/tracker.hamlet
@@ -0,0 +1,137 @@
+<h1>_{MsgOBU}
+
+<section>
+ <h2>Blub
+ <strong>Token:</strong> <span id="token">
+<section>
+ <h2>Status
+ <p id="status">_{MsgNone}
+ <p id>_{MsgError}: <span id="error">
+<section>
+ <h2>_{MsgLive}
+ <p><strong>Position: </strong><span id="lat"></span>, <span id="long"></span>
+ <p><strong>Accuracy: </strong><span id="acc">
+<section>
+ <h2>_{MsgEstimated}
+ <p><strong>_{MsgDelay}</strong>: <span id="delay">
+ <p><strong>_{MsgSequence}</strong>: <span id="sequence">
+
+
+<script>
+ var token = null;
+
+ let euclid = (a,b) => {
+ let x = a[0]-b[0];
+ let y = a[1]-b[1];
+ return x*x+y*y;
+ }
+
+ let minimalDist = (point, list, proj, norm) => {
+ return list.reduce (
+ (min, x) => {
+ let dist = norm(point, proj(x));
+ return dist < min[0] ? [dist,x] : min
+ },
+ [norm(point, proj(list[0])), list[0]]
+ )[1]
+ }
+
+ let counter = 0;
+ let ws;
+ let id;
+
+ function setStatus(msg) {
+ document.getElementById("status").innerText = msg
+ }
+
+ async function geoError(error) {
+ setStatus("error");
+ alert(`_{MsgPermissionFailed}: \n${error.message}`);
+ console.error(error);
+ main();
+ }
+
+ async function wsError(error) {
+ // alert(`_{MsgWebsocketError}: \n${error.message === undefined ? error.reason : error.message}`);
+ console.log(error);
+ navigator.geolocation.clearWatch(id);
+ }
+
+ async function wsClose(error) {
+ console.log(error);
+ document.getElementById("error").innerText = `websocket closed (reason: ${error.reason}). reconnecting …`;
+ navigator.geolocation.clearWatch(id);
+ setTimeout(openWebsocket, 1000);
+ }
+
+ function wsMsg(msg) {
+ let json = JSON.parse(msg.data);
+ console.log(json);
+ document.getElementById("delay").innerText =
+ `${json.delay}s (${Math.floor(json.delay / 60)}min)`;
+ document.getElementById("sequence").innerText = json.sequence;
+ }
+
+
+ function initGeopos() {
+ document.getElementById("error").innerText = "";
+ id = navigator.geolocation.watchPosition(
+ geoPing,
+ geoError,
+ {enableHighAccuracy: true}
+ );
+ }
+
+
+ function openWebsocket () {
+ ws = new WebSocket((location.protocol == "http:" ? "ws" : "wss") + "://" + location.host + "/api/tracker/ping/ws");
+ ws.onerror = wsError;
+ ws.onclose = wsClose;
+ ws.onmessage = wsMsg;
+ ws.onopen = (event) => {
+ setStatus("connected");
+ };
+ }
+
+ async function geoPing(geoloc) {
+ console.log("got position update " + counter);
+ document.getElementById("lat").innerText = geoloc.coords.latitude;
+ document.getElementById("long").innerText = geoloc.coords.longitude;
+ document.getElementById("acc").innerText = geoloc.coords.accuracy;
+
+ if (ws !== undefined && ws.readyState == 1) {
+ ws.send(JSON.stringify({
+ token: token,
+ geopos: [ geoloc.coords.latitude, geoloc.coords.longitude ],
+ timestamp: (new Date()).toISOString()
+ }));
+ counter += 1;
+ setStatus(`sent ${counter} pings`);
+ } else {
+ setStatus(`websocket readystate ${ws.readyState}`);
+ }
+ }
+
+
+ async function main() {
+ initGeopos();
+
+ token = await (await fetch("/api/tracker/register/", {
+ method: "POST",
+ body: JSON.stringify({agent: "tracktrain-website"}),
+ headers: {"Content-Type": "application/json"}
+ })).json();
+
+ if (token.error) {
+ alert("could not obtain token: \n" + token.msg);
+ setStatus("_{MsgTokenFailed}");
+ } else {
+ console.log("got token");
+
+ document.getElementById("token").innerText = token;
+
+ openWebsocket();
+ }
+ }
+
+ main()
diff --git a/tracktrain.cabal b/tracktrain.cabal
index ad1624f..a8e42a4 100644
--- a/tracktrain.cabal
+++ b/tracktrain.cabal
@@ -97,6 +97,7 @@ library
, proto-lens
, http-media
, filepath
+ , monad-control
hs-source-dirs: lib
exposed-modules: GTFS
, Server