aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorstuebinm2022-09-10 22:40:38 +0200
committerstuebinm2022-09-10 22:40:38 +0200
commit1d8c2f078b4920c8813c48618bf443a7c8c767f3 (patch)
treeeca4c792bec740c9f1e9a0285ed6aabcbe1cd429
parent676dfae3263799806da1a3cf5d4162b434b84259 (diff)
use websockets for the on-board-unit
-rw-r--r--lib/Server.hs63
-rw-r--r--messages/en.msg2
-rw-r--r--site/obu.hamlet65
3 files changed, 79 insertions, 51 deletions
diff --git a/lib/Server.hs b/lib/Server.hs
index 759080c..ef5663a 100644
--- a/lib/Server.hs
+++ b/lib/Server.hs
@@ -12,9 +12,9 @@
-- Implementation of the API. This module is the main point of the program.
module Server (application) where
import Control.Monad (forever, unless, void, when)
-import Control.Monad.Extra (maybeM, unlessM, whenM)
+import Control.Monad.Extra (maybeM, unlessM, whenM, ifM)
import Control.Monad.IO.Class (MonadIO (liftIO))
-import Control.Monad.Logger (logWarnN)
+import Control.Monad.Logger (logWarnN, LoggingT)
import Control.Monad.Reader (forM)
import Control.Monad.Trans (lift)
import qualified Data.Aeson as A
@@ -57,6 +57,7 @@ import Extrapolation (Extrapolator (..),
import System.IO.Unsafe
import Config (ServerConfig)
+import Data.ByteString (ByteString)
application :: GTFS -> Pool SqlBackend -> IO Application
application gtfs dbpool = do
@@ -73,7 +74,7 @@ doMigration pool = runSql pool $
server :: GTFS -> Pool SqlBackend -> Service CompleteAPI
server gtfs@GTFS{..} dbpool = handleDebugAPI
:<|> (handleStations :<|> handleTimetable :<|> handleTrip
- :<|> handleRegister :<|> handleTrainPing :<|> handleWS
+ :<|> handleRegister :<|> handleTrainPing (throwError err401) :<|> handleWS
:<|> handleDebugState :<|> handleDebugTrain :<|> handleDebugRegister
:<|> gtfsRealtimeServer gtfs dbpool)
:<|> pure (unsafePerformIO (toWaiAppPlain (ControlRoom gtfs dbpool)))
@@ -99,22 +100,23 @@ server gtfs@GTFS{..} dbpool = handleDebugAPI
expires <- liftIO $ getCurrentTime <&> addUTCTime validityPeriod
RunningKey token <- runSql dbpool $ insert (Running expires False tripID day Nothing "debug key")
pure token
- handleTrainPing ping = do
- running@Running{..} <- lift $ checkTokenValid dbpool (coerce $ trainPingToken ping)
- let anchor = extrapolateAnchorFromPing @LinearExtrapolator gtfs running ping
- -- TODO: are these always inserted in order?
- runSql dbpool $ do
- insert ping
- last <- selectFirst
- [TrainAnchorTrip ==. runningTrip, TrainAnchorDay ==. runningDay]
- [Desc TrainAnchorWhen]
- -- only insert new estimates if they've actually changed anything
- when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor))
- $ void $ insert anchor
-
- pure NoContent
+ handleTrainPing onError ping = isTokenValid dbpool (coerce $ trainPingToken ping) >>= \case
+ Nothing -> do
+ onError
+ pure NoContent
+ Just running@Running{..} -> do
+ let anchor = extrapolateAnchorFromPing @LinearExtrapolator gtfs running ping
+ -- TODO: are these always inserted in order?
+ runSql dbpool $ do
+ insert ping
+ last <- selectFirst
+ [TrainAnchorTrip ==. runningTrip, TrainAnchorDay ==. runningDay]
+ [Desc TrainAnchorWhen]
+ -- only insert new estimates if they've actually changed anything
+ when (fmap (trainAnchorDelay . entityVal) last /= Just (trainAnchorDelay anchor))
+ $ void $ insert anchor
+ pure NoContent
handleWS conn = do
- -- TODO test this!!
liftIO $ WS.forkPingThread conn 30
forever $ do
msg <- liftIO $ WS.receiveData conn
@@ -122,9 +124,10 @@ server gtfs@GTFS{..} dbpool = handleDebugAPI
Left err -> do
logWarnN ("stray websocket message: "+|show msg|+" (could not decode: "+|err|+")")
liftIO $ WS.sendClose conn (C8.pack err)
- Right ping -> do
- lift $ checkTokenValid dbpool (coerce $ trainPingToken ping)
- void $ runSql dbpool $ insert ping
+ Right ping ->
+ -- if invalid token, send a "polite" close request. Note that the client may
+ -- ignore this and continue sending messages, which will continue to be handled.
+ liftIO $ void $ handleTrainPing (WS.sendClose conn ("" :: ByteString)) ping
handleDebugState = do
now <- liftIO getCurrentTime
runSql dbpool $ do
@@ -145,17 +148,13 @@ server gtfs@GTFS{..} dbpool = handleDebugAPI
-- TODO: proper debug logging for expired tokens
-checkTokenValid :: Pool SqlBackend -> Token -> Handler Running
-checkTokenValid dbpool token = do
- trip <- try $ runSql dbpool $ get (coerce token)
- when (runningBlocked trip)
- $ throwError err401
- whenM (hasExpired (runningExpires trip))
- $ throwError err401
- pure trip
- where try m = m >>= \case
- Just a -> pure a
- Nothing -> throwError err404
+isTokenValid :: MonadIO m => Pool SqlBackend -> Token -> m (Maybe Running)
+isTokenValid dbpool token = runSql dbpool $ get (coerce token) >>= \case
+ Just trip | not (runningBlocked trip) -> do
+ ifM (hasExpired (runningExpires trip))
+ (pure Nothing)
+ (pure (Just trip))
+ _ -> pure Nothing
hasExpired :: MonadIO m => UTCTime -> m Bool
hasExpired limit = do
diff --git a/messages/en.msg b/messages/en.msg
index 2a9c67a..ddbfdfd 100644
--- a/messages/en.msg
+++ b/messages/en.msg
@@ -29,3 +29,5 @@ OBU: Onboard-Unit
ChooseTrain: Choose a Train
TokenFailed: Failed to acquire token
PermissionFailed: permission failed
+WebsocketError: Websocket Error
+Error: Error
diff --git a/site/obu.hamlet b/site/obu.hamlet
index d96b96f..9aec4c0 100644
--- a/site/obu.hamlet
+++ b/site/obu.hamlet
@@ -12,6 +12,7 @@
<section>
<h2>Status
<p id="status">_{MsgNone}
+ <p id>_{MsgError}: <span id="error">
<script>
@@ -34,6 +35,46 @@
}
let counter = 0;
+ let ws;
+ let id;
+
+ async function geoError(error) {
+ document.getElementById("status").innerText = "error";
+ alert(`_{MsgPermissionFailed}: \n${error.message}`);
+ console.log(error);
+ }
+
+ async function wsError(error) {
+ // alert(`_{MsgWebsocketError}: \n${error.message === undefined ? error.reason : error.message}`);
+ console.log(error);
+ navigator.geolocation.clearWatch(id);
+ }
+
+ async function wsClose(error) {
+ console.log(error);
+ document.getElementById("error").innerText = `websocket closed (reason: ${error.reason}). reconnecting …`;
+ navigator.geolocation.clearWatch(id);
+ setTimeout(openWebsocket, 1000);
+ }
+
+
+ function initGeopos() {
+ document.getElementById("error").innerText = "";
+ id = navigator.geolocation.watchPosition(
+ geoPing,
+ geoError,
+ {enableHighAccuracy: true}
+ );
+ }
+
+
+ function openWebsocket () {
+ ws = new WebSocket((location.protocol == "http:" ? "ws" : "wss") + "://" + location.host + "/api/train/ping/ws");
+ ws.onerror = wsError;
+ ws.onclose = wsClose;
+ ws.onmessage = (msg) => console.log(msg.data); // TODO: display delays etc.
+ ws.onopen = (event) => initGeopos();
+ }
async function geoPing(geoloc) {
console.log("got position update " + counter);
@@ -41,26 +82,16 @@
document.getElementById("long").innerText = geoloc.coords.longitude;
document.getElementById("acc").innerText = geoloc.coords.accuracy;
- fetch(`/api/train/ping`, {
- method: "POST",
- body: JSON.stringify({
+ ws.send(JSON.stringify({
token: token,
lat: geoloc.coords.latitude,
long: geoloc.coords.longitude,
timestamp: (new Date()).toISOString()
- }),
- headers: {"Content-Type": "application/json"}
- }).then((resp) => {
- counter = counter + 1;
- document.getElementById("status").innerText = `${counter}: sent`;
- }).catch((error) => document.getElementById("status").innerText = error);
+ }));
+ counter += 1;
+ document.getElementById("status").innerText = `sent ${counter} pings`;
}
- async function geoError(error) {
- document.getElementById("status").innerText = "error";
- alert(`_{MsgPermissionFailed}: \n${error.message}`);
- console.log(error);
- }
async function main() {
let trip = await (await fetch("/api/trip/#{tripId}")).json();
@@ -81,11 +112,7 @@
document.getElementById("token").innerText = token;
- navigator.geolocation.watchPosition(
- geoPing,
- geoError,
- {enableHighAccuracy: true}
- );
+ openWebsocket();
}
}