aboutsummaryrefslogtreecommitdiff
path: root/lib/Server/Ingest.hs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/Server/Ingest.hs134
1 files changed, 101 insertions, 33 deletions
diff --git a/lib/Server/Ingest.hs b/lib/Server/Ingest.hs
index 959a4c6..8e122a7 100644
--- a/lib/Server/Ingest.hs
+++ b/lib/Server/Ingest.hs
@@ -1,7 +1,7 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
-module Server.Ingest (handleTrackerRegister, handleTrainPing, handleWS) where
+module Server.Ingest (handleTrackerRegister, handlePing, handleWS, handleOwntracksMessage) where
import API (Metrics (..),
RegisterJson (..),
SentPing (..))
@@ -13,7 +13,8 @@ import Control.Monad.Catch (handle)
import Control.Monad.Extra (ifM, mapMaybeM, whenJust,
whenJustM)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (LoggingT, logInfoN,
+import Control.Monad.Logger (LoggingT, logDebugN,
+ logErrorN, logInfoN,
logWarnN)
import Control.Monad.Reader (ReaderT)
import qualified Data.Aeson as A
@@ -36,7 +37,8 @@ import Fmt ((+|), (|+))
import qualified GTFS
import qualified Network.WebSockets as WS
import Persist
-import Servant (err400, throwError)
+import Servant (err400, err401,
+ throwError)
import Servant.Server (Handler)
import Server.Util (ServiceM, getTzseries,
utcToSeconds)
@@ -44,46 +46,49 @@ import Server.Util (ServiceM, getTzseries,
import Config (LoggingConfig,
ServerConfig (..))
import Control.Exception (throw)
-import Control.Monad.Logger.CallStack (logErrorN)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (toStrict)
import Data.Foldable (find, minimumBy)
import Data.Function (on, (&))
+import Data.Maybe (fromJust)
import qualified Data.Text as T
import Data.Time.LocalTime.TimeZone.Series (TimeZoneSeries)
import qualified Data.UUID as UUID
+import Database.Esqueleto.Experimental (from, selectOne, table,
+ val, where_, (^.))
+import qualified Database.Esqueleto.Experimental as E
import Extrapolation (Extrapolator (..),
LinearExtrapolator (..),
euclid)
import GHC.Generics (Generic)
import GTFS (seconds2Double)
+import OwnTracks hiding (Ping)
import Prometheus (decGauge, incGauge)
import Server.Base (ServerState)
-
handleTrackerRegister
:: Pool SqlBackend
-> RegisterJson
- -> ServiceM Token
+ -> ServiceM TrackerId
handleTrackerRegister dbpool RegisterJson{..} = do
today <- liftIO getCurrentTime <&> utctDay
expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod
runSql dbpool $ do
- TrackerKey tracker <- insert (Tracker expires False registerAgent Nothing)
- pure tracker
+ TrackerKey tracker <- insert (Tracker "dummy" expires False registerAgent Nothing)
+ pure (coerce tracker)
where
validityPeriod :: NominalDiffTime
validityPeriod = nominalDay
-handleTrainPing
+handlePing
:: Pool SqlBackend
-> ServerState
-> ServerConfig
-> LoggingT (ReaderT LoggingConfig Handler) a
-> SentPing
-> LoggingT (ReaderT LoggingConfig Handler) (Maybe TrainAnchor)
-handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} =
- isTokenValid dbpool sentPingToken >>= \case
+handlePing dbpool subscribers cfg onError ping@SentPing{..} =
+ isTrackerIdValid dbpool sentPingTrackerId >>= \case
Nothing -> onError >> pure Nothing
Just tracker@Tracker{..} -> do
@@ -107,13 +112,76 @@ handleTrainPing dbpool subscribers cfg onError ping@SentPing{..} =
Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case
Just ticketId -> pure ticketId
Nothing -> do
- logWarnN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ logWarnN $ "Tracker "+|UUID.toString (coerce sentPingTrackerId)|+
" sent a ping, but no trips are running today."
throwError err400
-
runSql dbpool $ insertSentPing subscribers cfg ping tracker ticketId
+
+handleOwntracksMessage
+ :: Pool SqlBackend
+ -> ServerState
+ -> ServerConfig
+ -> Maybe Text
+ -> Maybe Text
+ -> Message
+ -> LoggingT (ReaderT LoggingConfig Handler) ()
+handleOwntracksMessage dbpool subscribers cfg maybeUser device msg = do
+ user <- case maybeUser of
+ Just user -> pure user
+ Nothing -> throwError err401
+
+ -- TODO: maybe get the basic json here, and put it into a log-msg table?
+
+ logDebugN $ "received msg: "+|show msg|+"."
+
+ Entity trackerId tracker@Tracker{..} <- runSql dbpool $ (selectOne do
+ tracker <- from (table @Tracker)
+ where_ (tracker ^. TrackerName E.==. val user)
+ pure tracker)
+ >>= \case
+ Just tracker -> pure tracker
+ Nothing -> throw err401
+
+ case msg of
+ MsgStatus status -> do
+ now <- liftIO getCurrentTime
+ logInfoN $ "received status msg: "+|show status|+""
+ runSql dbpool $ insert_ $ TrackerStatus trackerId now status
+ MsgLocation Location{..} -> do
+ let ping = SentPing
+ { sentPingTrackerId = trackerId
+ , sentPingGeopos = Geopos (locationLatitude, locationLongitude)
+ , sentPingTimestamp = locationTimestamp
+ }
+
+ maybeTicketId <- case trackerCurrentTicket of
+ -- if the tracker is not associated with a ticket, it is probably new
+ -- & should be auto-associated with the most fitting current ticket
+ Nothing -> runSql dbpool (guessTicketFromPing cfg ping) >>= \case
+ Just ticketId -> pure (Just ticketId)
+ Nothing -> do
+ -- unfortunately, cannot really communicate anything useful back?
+ logWarnN $ "Owntracks user "+|user|+
+ " sent a ping, but no trips are running today."
+ pure Nothing
+
+ case maybeTicketId of
+ Nothing -> do
+ runSql dbpool $ insert $ Ping
+ { pingTicket = Nothing
+ , pingTrackerId = trackerId
+ , pingGeopos = Geopos (locationLatitude, locationLongitude)
+ , pingTimestamp = locationTimestamp
+ , pingSequence = Nothing
+ }
+ pure ()
+ Just ticketId -> do
+ runSql dbpool $ insertSentPing subscribers cfg undefined tracker ticketId
+ pure ()
+
+
insertSentPing
:: ServerState
-> ServerConfig
@@ -140,9 +208,9 @@ insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId =
maybeReassign <- selectFirst
- [ TrainPingTicket ==. ticketId ]
- [ Desc TrainPingTimestamp ]
- <&> find (\ping -> trainPingSequence (entityVal ping) > trainAnchorSequence anchor)
+ [ PingTicket ==. Just ticketId, PingSequence !=. Nothing ]
+ [ Desc PingTimestamp ]
+ <&> find (\ping -> fromJust (pingSequence (entityVal ping)) > trainAnchorSequence anchor)
>> guessTicketFromPing cfg ping
<&> find (/= ticketId)
@@ -154,19 +222,19 @@ insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId =
case maybeReassign of
Just newTicketId -> do
- update sentPingToken
+ update sentPingTrackerId
[TrackerCurrentTicket =. Just newTicketId ]
- logInfoN $ "tracker "+|UUID.toText (coerce sentPingToken)|+
+ logInfoN $ "tracker "+|UUID.toText (coerce sentPingTrackerId)|+
"has switched direction, and was reassigned to ticket "
+|UUID.toText (coerce newTicketId)|+"."
insertSentPing subscribers cfg ping tracker newTicketId
Nothing -> do
- let trackedPing = TrainPing
- { trainPingToken = sentPingToken
- , trainPingGeopos = sentPingGeopos
- , trainPingTimestamp = sentPingTimestamp
- , trainPingSequence = trainAnchorSequence anchor
- , trainPingTicket = ticketId
+ let trackedPing = Ping
+ { pingTrackerId = sentPingTrackerId
+ , pingGeopos = sentPingGeopos
+ , pingTimestamp = sentPingTimestamp
+ , pingSequence = Just (trainAnchorSequence anchor)
+ , pingTicket = Just ticketId
}
insert trackedPing
@@ -182,11 +250,11 @@ insertSentPing subscribers cfg ping@SentPing{..} tracker@Tracker{..} ticketId =
& (\(stop, _, _) -> stopSequence stop)
& fromIntegral
when (trainAnchorSequence anchor + 0.1 >= maxSequence) $ do
- update sentPingToken
+ update sentPingTrackerId
[TrackerCurrentTicket =. Nothing]
update ticketId
[TicketCompleted =. True]
- logInfoN $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ logInfoN $ "Tracker "+|UUID.toString (coerce sentPingTrackerId)|+
" has completed ticket "+|UUID.toString (coerce ticketId)|+
" (trip "+|ticketTripName|+")"
@@ -214,9 +282,9 @@ handleWS dbpool subscribers cfg Metrics{..} conn = do
liftIO $ WS.sendClose conn (C8.pack err)
-- TODO: send a close msg (Nothing) to the subscribed queues? decGauge metricsWSGauge
Right ping -> do
- -- if invalid token, send a "polite" close request. Note that the client may
+ -- if invalid trackerId, send a "polite" close request. Note that the client may
-- ignore this and continue sending messages, which will continue to be handled.
- handleTrainPing dbpool subscribers cfg (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case
+ handlePing dbpool subscribers cfg (liftIO $ WS.sendClose conn ("" :: ByteString)) ping >>= \case
Just anchor -> liftIO $ WS.sendTextData conn (A.encode anchor)
Nothing -> pure ()
@@ -245,11 +313,11 @@ guessTicketFromPing cfg SentPing{..} = do
in smallestDistance))
logInfoN
- $ "Tracker "+|UUID.toString (coerce sentPingToken)|+
+ $ "Tracker "+|UUID.toString (coerce sentPingTrackerId)|+
" is now handling ticket "+|UUID.toString (coerce (entityKey closestTicket))|+
" (trip "+|ticketTripName (entityVal closestTicket)|+")."
- update sentPingToken
+ update sentPingTrackerId
[TrackerCurrentTicket =. Just (entityKey closestTicket)]
pure (Just (entityKey closestTicket))
@@ -260,9 +328,9 @@ spaceAndTimeDiff (pos1, time1) (pos2, time2) =
where spaceDistance = euclid pos1 pos2
timeDiff = time1 - time2
--- TODO: proper debug logging for expired tokens
-isTokenValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker)
-isTokenValid dbpool token = runSql dbpool $ get token >>= \case
+-- TODO: proper debug logging for expired trackerIds
+isTrackerIdValid :: Pool SqlBackend -> TrackerId -> ServiceM (Maybe Tracker)
+isTrackerIdValid dbpool trackerId = runSql dbpool $ get trackerId >>= \case
Just tracker | not (trackerBlocked tracker) -> do
ifM (hasExpired (trackerExpires tracker))
(pure Nothing)