diff options
author | stuebinm | 2022-09-10 22:40:38 +0200 |
---|---|---|
committer | stuebinm | 2022-09-10 22:40:38 +0200 |
commit | 1d8c2f078b4920c8813c48618bf443a7c8c767f3 (patch) | |
tree | eca4c792bec740c9f1e9a0285ed6aabcbe1cd429 | |
parent | 676dfae3263799806da1a3cf5d4162b434b84259 (diff) |
use websockets for the on-board-unit
-rw-r--r-- | lib/Server.hs | 63 | ||||
-rw-r--r-- | messages/en.msg | 2 | ||||
-rw-r--r-- | site/obu.hamlet | 65 |
3 files changed, 79 insertions, 51 deletions
diff --git a/lib/Server.hs b/lib/Server.hs index 759080c..ef5663a 100644 --- a/lib/Server.hs +++ b/lib/Server.hs @@ -12,9 +12,9 @@ -- Implementation of the API. This module is the main point of the program. module Server (application) where import Control.Monad (forever, unless, void, when) -import Control.Monad.Extra (maybeM, unlessM, whenM) +import Control.Monad.Extra (maybeM, unlessM, whenM, ifM) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Logger (logWarnN) +import Control.Monad.Logger (logWarnN, LoggingT) import Control.Monad.Reader (forM) import Control.Monad.Trans (lift) import qualified Data.Aeson as A @@ -57,6 +57,7 @@ import Extrapolation (Extrapolator (..), import System.IO.Unsafe import Config (ServerConfig) +import Data.ByteString (ByteString) application :: GTFS -> Pool SqlBackend -> IO Application application gtfs dbpool = do @@ -73,7 +74,7 @@ doMigration pool = runSql pool $ server :: GTFS -> Pool SqlBackend -> Service CompleteAPI server gtfs@GTFS{..} dbpool = handleDebugAPI :<|> (handleStations :<|> handleTimetable :<|> handleTrip - :<|> handleRegister :<|> handleTrainPing :<|> handleWS + :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS :<|> handleDebugState :<|> handleDebugTrain :<|> handleDebugRegister :<|> gtfsRealtimeServer gtfs dbpool) :<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool))) @@ -99,22 +100,23 @@ server gtfs@GTFS{..} dbpool = handleDebugAPI expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod RunningKey token <- runSql dbpool $ insert (Running expires False tripID day Nothing "debug key") pure token - handleTrainPing ping = do - running@Running{..} <- lift $ checkTokenValid dbpool (coerce $ trainPingToken ping) - let anchor = extrapolateAnchorFromPing @LinearExtrapolator gtfs running ping - -- TODO: are these always inserted in order? - runSql dbpool $ do - insert ping - last <- selectFirst - [TrainAnchorTrip ==. runningTrip, TrainAnchorDay ==. runningDay] - [Desc TrainAnchorWhen] - -- only insert new estimates if they've actually changed anything - when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor)) - $ void $ insert anchor - - pure NoContent + handleTrainPing onError ping = isTokenValid dbpool (coerce $ trainPingToken ping) >>= \case + Nothing -> do + onError + pure NoContent + Just running@Running{..} -> do + let anchor = extrapolateAnchorFromPing @LinearExtrapolator gtfs running ping + -- TODO: are these always inserted in order? + runSql dbpool $ do + insert ping + last <- selectFirst + [TrainAnchorTrip ==. runningTrip, TrainAnchorDay ==. runningDay] + [Desc TrainAnchorWhen] + -- only insert new estimates if they've actually changed anything + when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor)) + $ void $ insert anchor + pure NoContent handleWS conn = do - -- TODO test this!! liftIO $ WS.forkPingThread conn 30 forever $ do msg <- liftIO $ WS.receiveData conn @@ -122,9 +124,10 @@ server gtfs@GTFS{..} dbpool = handleDebugAPI Left err -> do logWarnN ("stray websocket message: "+|show msg|+" (could not decode: "+|err|+")") liftIO $ WS.sendClose conn (C8.pack err) - Right ping -> do - lift $ checkTokenValid dbpool (coerce $ trainPingToken ping) - void $ runSql dbpool $ insert ping + Right ping -> + -- if invalid token, send a "polite" close request. Note that the client may + -- ignore this and continue sending messages, which will continue to be handled. + liftIO $ void $ handleTrainPing (WS.sendClose conn ("" :: ByteString)) ping handleDebugState = do now <- liftIO getCurrentTime runSql dbpool $ do @@ -145,17 +148,13 @@ server gtfs@GTFS{..} dbpool = handleDebugAPI -- TODO: proper debug logging for expired tokens -checkTokenValid :: Pool SqlBackend -> Token -> Handler Running -checkTokenValid dbpool token = do - trip <- try $ runSql dbpool $ get (coerce token) - when (runningBlocked trip) - $ throwError err401 - whenM (hasExpired (runningExpires trip)) - $ throwError err401 - pure trip - where try m = m >>= \case - Just a -> pure a - Nothing -> throwError err404 +isTokenValid :: MonadIO m => Pool SqlBackend -> Token -> m (Maybe Running) +isTokenValid dbpool token = runSql dbpool $ get (coerce token) >>= \case + Just trip | not (runningBlocked trip) -> do + ifM (hasExpired (runningExpires trip)) + (pure Nothing) + (pure (Just trip)) + _ -> pure Nothing hasExpired :: MonadIO m => UTCTime -> m Bool hasExpired limit = do diff --git a/messages/en.msg b/messages/en.msg index 2a9c67a..ddbfdfd 100644 --- a/messages/en.msg +++ b/messages/en.msg @@ -29,3 +29,5 @@ OBU: Onboard-Unit ChooseTrain: Choose a Train TokenFailed: Failed to acquire token PermissionFailed: permission failed +WebsocketError: Websocket Error +Error: Error diff --git a/site/obu.hamlet b/site/obu.hamlet index d96b96f..9aec4c0 100644 --- a/site/obu.hamlet +++ b/site/obu.hamlet @@ -12,6 +12,7 @@ <section> <h2>Status <p id="status">_{MsgNone} + <p id>_{MsgError}: <span id="error"> <script> @@ -34,6 +35,46 @@ } let counter = 0; + let ws; + let id; + + async function geoError(error) { + document.getElementById("status").innerText = "error"; + alert(`_{MsgPermissionFailed}: \n${error.message}`); + console.log(error); + } + + async function wsError(error) { + // alert(`_{MsgWebsocketError}: \n${error.message === undefined ? error.reason : error.message}`); + console.log(error); + navigator.geolocation.clearWatch(id); + } + + async function wsClose(error) { + console.log(error); + document.getElementById("error").innerText = `websocket closed (reason: ${error.reason}). reconnecting …`; + navigator.geolocation.clearWatch(id); + setTimeout(openWebsocket, 1000); + } + + + function initGeopos() { + document.getElementById("error").innerText = ""; + id = navigator.geolocation.watchPosition( + geoPing, + geoError, + {enableHighAccuracy: true} + ); + } + + + function openWebsocket () { + ws = new WebSocket((location.protocol == "http:" ? "ws" : "wss") + "://" + location.host + "/api/train/ping/ws"); + ws.onerror = wsError; + ws.onclose = wsClose; + ws.onmessage = (msg) => console.log(msg.data); // TODO: display delays etc. + ws.onopen = (event) => initGeopos(); + } async function geoPing(geoloc) { console.log("got position update " + counter); @@ -41,26 +82,16 @@ document.getElementById("long").innerText = geoloc.coords.longitude; document.getElementById("acc").innerText = geoloc.coords.accuracy; - fetch(`/api/train/ping`, { - method: "POST", - body: JSON.stringify({ + ws.send(JSON.stringify({ token: token, lat: geoloc.coords.latitude, long: geoloc.coords.longitude, timestamp: (new Date()).toISOString() - }), - headers: {"Content-Type": "application/json"} - }).then((resp) => { - counter = counter + 1; - document.getElementById("status").innerText = `${counter}: sent`; - }).catch((error) => document.getElementById("status").innerText = error); + })); + counter += 1; + document.getElementById("status").innerText = `sent ${counter} pings`; } - async function geoError(error) { - document.getElementById("status").innerText = "error"; - alert(`_{MsgPermissionFailed}: \n${error.message}`); - console.log(error); - } async function main() { let trip = await (await fetch("/api/trip/#{tripId}")).json(); @@ -81,11 +112,7 @@ document.getElementById("token").innerText = token; - navigator.geolocation.watchPosition( - geoPing, - geoError, - {enableHighAccuracy: true} - ); + openWebsocket(); } } |