diff options
Diffstat (limited to '')
| -rw-r--r-- | lib/Server/Ingest.hs | 134 |
1 files changed, 101 insertions, 33 deletions
diff --git a/lib/Server/Ingest.hs b/lib/Server/Ingest.hs index 959a4c6..8e122a7 100644 --- a/lib/Server/Ingest.hs +++ b/lib/Server/Ingest.hs @@ -1,7 +1,7 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE RecordWildCards #-} -module Server.Ingest (handleTrackerRegister, handleTrainPing, handleWS) where +module Server.Ingest (handleTrackerRegister, handlePing, handleWS, handleOwntracksMessage) where import API (Metrics (..), RegisterJson (..), SentPing (..)) @@ -13,7 +13,8 @@ import Control.Monad.Catch (handle) import Control.Monad.Extra (ifM, mapMaybeM, whenJust, whenJustM) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (LoggingT, logInfoN, +import Control.Monad.Logger (LoggingT, logDebugN, + logErrorN, logInfoN, logWarnN) import Control.Monad.Reader (ReaderT) import qualified Data.Aeson as A @@ -36,7 +37,8 @@ import Fmt ((+|), (|+)) import qualified GTFS import qualified Network.WebSockets as WS import Persist -import Servant (err400, throwError) +import Servant (err400, err401, + throwError) import Servant.Server (Handler) import Server.Util (ServiceM, getTzseries, utcToSeconds) @@ -44,46 +46,49 @@ import Server.Util (ServiceM, getTzseries, import Config (LoggingConfig, ServerConfig (..)) import Control.Exception (throw) -import Control.Monad.Logger.CallStack (logErrorN) import Data.ByteString (ByteString) import Data.ByteString.Lazy (toStrict) import Data.Foldable (find, minimumBy) import Data.Function (on, (&)) +import Data.Maybe (fromJust) import qualified Data.Text as T import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries) import qualified Data.UUID as UUID +import Database.Esqueleto.Experimental (from, selectOne, table, + val, where_, (^.)) +import qualified Database.Esqueleto.Experimental as E import Extrapolation (Extrapolator (..), LinearExtrapolator (..), euclid) import GHC.Generics (Generic) import GTFS (seconds2Double) +import OwnTracks hiding (Ping) import Prometheus (decGauge, incGauge) import Server.Base (ServerState) - handleTrackerRegister :: Pool SqlBackend -> RegisterJson - -> ServiceM Token + -> ServiceM TrackerId handleTrackerRegister dbpool RegisterJson{..} = do today <- liftIO getCurrentTime <&> utctDay expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod runSql dbpool $ do - TrackerKey tracker <- insert (Tracker expires False registerAgent Nothing) - pure tracker + TrackerKey tracker <- insert (Tracker "dummy" expires False registerAgent Nothing) + pure (coerce tracker) where validityPeriod :: NominalDiffTime validityPeriod = nominalDay -handleTrainPing +handlePing :: Pool SqlBackend -> ServerState -> ServerConfig -> LoggingT (ReaderT LoggingConfig Handler) a -> SentPing -> LoggingT (ReaderT LoggingConfig Handler) (Maybe TrainAnchor) -handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} = - isTokenValid dbpool sentPingToken >>= \case +handlePing dbpool subscribers cfg onError ping@SentPing{..} = + isTrackerIdValid dbpool sentPingTrackerId >>= \case Nothing -> onError >> pure Nothing Just tracker@Tracker{..} -> do @@ -107,13 +112,76 @@ handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} = Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case Just ticketId -> pure ticketId Nothing -> do - logWarnN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+ + logWarnN $ "Tracker "+|UUID.toString (coerce sentPingTrackerId)|+ " sent a ping, but no trips are running today." throwError err400 - runSql dbpool $ insertSentPing subscribers cfg ping tracker ticketId + +handleOwntracksMessage + :: Pool SqlBackend + -> ServerState + -> ServerConfig + -> Maybe Text + -> Maybe Text + -> Message + -> LoggingT (ReaderT LoggingConfig Handler) () +handleOwntracksMessage dbpool subscribers cfg maybeUser device msg = do + user <- case maybeUser of + Just user -> pure user + Nothing -> throwError err401 + + -- TODO: maybe get the basic json here, and put it into a log-msg table? + + logDebugN $ "received msg: "+|show msg|+"." + + Entity trackerId tracker@Tracker{..} <- runSql dbpool $ (selectOne do + tracker <- from (table @Tracker) + where_ (tracker ^. TrackerName E.==. val user) + pure tracker) + >>= \case + Just tracker -> pure tracker + Nothing -> throw err401 + + case msg of + MsgStatus status -> do + now <- liftIO getCurrentTime + logInfoN $ "received status msg: "+|show status|+"" + runSql dbpool $ insert_ $ TrackerStatus trackerId now status + MsgLocation Location{..} -> do + let ping = SentPing + { sentPingTrackerId = trackerId + , sentPingGeopos = Geopos (locationLatitude, locationLongitude) + , sentPingTimestamp = locationTimestamp + } + + maybeTicketId <- case trackerCurrentTicket of + -- if the tracker is not associated with a ticket, it is probably new + -- & should be auto-associated with the most fitting current ticket + Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case + Just ticketId -> pure (Just ticketId) + Nothing -> do + -- unfortunately, cannot really communicate anything useful back? + logWarnN $ "Owntracks user "+|user|+ + " sent a ping, but no trips are running today." + pure Nothing + + case maybeTicketId of + Nothing -> do + runSql dbpool $ insert $ Ping + { pingTicket = Nothing + , pingTrackerId = trackerId + , pingGeopos = Geopos (locationLatitude, locationLongitude) + , pingTimestamp = locationTimestamp + , pingSequence = Nothing + } + pure () + Just ticketId -> do + runSql dbpool $ insertSentPing subscribers cfg undefined tracker ticketId + pure () + + insertSentPing :: ServerState -> ServerConfig @@ -140,9 +208,9 @@ insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId = maybeReassign <- selectFirst - [ TrainPingTicket ==. ticketId ] - [ Desc TrainPingTimestamp ] - <&> find (\ping -> trainPingSequence (entityVal ping) > trainAnchorSequence anchor) + [ PingTicket ==. Just ticketId, PingSequence !=. Nothing ] + [ Desc PingTimestamp ] + <&> find (\ping -> fromJust (pingSequence (entityVal ping)) > trainAnchorSequence anchor) >> guessTicketFromPing cfg ping <&> find (/= ticketId) @@ -154,19 +222,19 @@ insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId = case maybeReassign of Just newTicketId -> do - update sentPingToken + update sentPingTrackerId [TrackerCurrentTicket =. Just newTicketId ] - logInfoN $ "tracker "+|UUID.toText (coerce sentPingToken)|+ + logInfoN $ "tracker "+|UUID.toText (coerce sentPingTrackerId)|+ "has switched direction, and was reassigned to ticket " +|UUID.toText (coerce newTicketId)|+"." insertSentPing subscribers cfg ping tracker newTicketId Nothing -> do - let trackedPing = TrainPing - { trainPingToken = sentPingToken - , trainPingGeopos = sentPingGeopos - , trainPingTimestamp = sentPingTimestamp - , trainPingSequence = trainAnchorSequence anchor - , trainPingTicket = ticketId + let trackedPing = Ping + { pingTrackerId = sentPingTrackerId + , pingGeopos = sentPingGeopos + , pingTimestamp = sentPingTimestamp + , pingSequence = Just (trainAnchorSequence anchor) + , pingTicket = Just ticketId } insert trackedPing @@ -182,11 +250,11 @@ insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId = & (\(stop, _, _) -> stopSequence stop) & fromIntegral when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do - update sentPingToken + update sentPingTrackerId [TrackerCurrentTicket =. Nothing] update ticketId [TicketCompleted =. True] - logInfoN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+ + logInfoN $ "Tracker "+|UUID.toString (coerce sentPingTrackerId)|+ " has completed ticket "+|UUID.toString (coerce ticketId)|+ " (trip "+|ticketTripName|+")" @@ -214,9 +282,9 @@ handleWS dbpool subscribers cfg Metrics{..} conn = do liftIO $ WS.sendClose conn (C8.pack err) -- TODO: send a close msg (Nothing) to the subscribed queues? decGauge metricsWSGauge Right ping -> do - -- if invalid token, send a "polite" close request. Note that the client may + -- if invalid trackerId, send a "polite" close request. Note that the client may -- ignore this and continue sending messages, which will continue to be handled. - handleTrainPing dbpool subscribers cfg (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case + handlePing dbpool subscribers cfg (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor) Nothing -> pure () @@ -245,11 +313,11 @@ guessTicketFromPing cfg SentPing{..} = do in smallestDistance)) logInfoN - $ "Tracker "+|UUID.toString (coerce sentPingToken)|+ + $ "Tracker "+|UUID.toString (coerce sentPingTrackerId)|+ " is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+ " (trip "+|ticketTripName (entityVal closestTicket)|+")." - update sentPingToken + update sentPingTrackerId [TrackerCurrentTicket =. Just (entityKey closestTicket)] pure (Just (entityKey closestTicket)) @@ -260,9 +328,9 @@ spaceAndTimeDiff (pos1, time1) (pos2, time2) = where spaceDistance = euclid pos1 pos2 timeDiff = time1 - time2 --- TODO: proper debug logging for expired tokens -isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker) -isTokenValid dbpool token = runSql dbpool $ get token >>= \case +-- TODO: proper debug logging for expired trackerIds +isTrackerIdValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker) +isTrackerIdValid dbpool trackerId = runSql dbpool $ get trackerId >>= \case Just tracker | not (trackerBlocked tracker) -> do ifM (hasExpired (trackerExpires tracker)) (pure Nothing) |
