From 64dd90ac41f3db7f79b3826c823b971c21a49707 Mon Sep 17 00:00:00 2001 From: stuebinm Date: Wed, 17 Feb 2021 19:00:23 +0100 Subject: Seperate room states from global server state The main server state is now a `HashMap` of `Text` to `MVar Room` instead of just `Room`. This allows for changing room states independently from the server state, which should make the entire thing scale better on multi-core architectures (nevermind that "switching slides" is presumably not something in much need of multicore servers ...) --- Main.lhs | 129 ++++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 78 insertions(+), 51 deletions(-) diff --git a/Main.lhs b/Main.lhs index cffb4c7..3799fbe 100644 --- a/Main.lhs +++ b/Main.lhs @@ -14,7 +14,7 @@ example of the haskell websockets library. > import Data.HashMap.Strict (HashMap) > import Control.Exception (finally) > import Control.Monad (forM_, forever) -> import Control.Concurrent (MVar, newMVar, modifyMVar_, readMVar) +> import Control.Concurrent (MVar, newMVar, modifyMVar_, modifyMVar, readMVar, takeMVar, putMVar) > import qualified Data.Text as T > import qualified Data.Text.Encoding as T > import qualified Data.Text.IO as T @@ -22,6 +22,8 @@ example of the haskell websockets library. > import qualified Data.HashMap.Strict as M > import qualified Network.WebSockets as WS +Application State +================= We only show slides, so clients can be represented entirely by their websocket connection and some id, which is only necessarry for bookkeeping @@ -36,9 +38,11 @@ end up with invalid values). > type Room = ([Client], Int) -The entire server state is a map of room names to rooms: +The entire server state is a map of room names to rooms, which are each +wrapped in a MVar, so we can change room states without touching the +global server state: -> type ServerState = HashMap Text Room +> type ServerState = HashMap Text (MVar Room) Since rooms are created dynamically, the initial server state is just an empty map: @@ -46,28 +50,6 @@ an empty map: > initialState :: ServerState > initialState = M.empty -Since all clients should be notified of all state changes, we define -a function to broadcast messages to all clients in one room: - -> broadcast :: Text -> [Client] -> IO () -> broadcast message cs = do -> --T.putStrLn message -> forM_ cs $ \(_,conn) -> WS.sendTextData conn message - - - - -Then there are some bookkeeping functions: - -> addClient :: Client -> Maybe Room -> Room -> addClient c room = case room of -> Nothing -> ([c],0) -> Just (cs,n) -> (c:cs,n) - -> removeClient :: Int -> Room -> Room -> removeClient i (cs,n) = (filter ((/= i) . fst) cs, n) - - Protocol @@ -113,29 +95,49 @@ dropped), and then start processing: > Left err -> putStrLn $ "error while join: " <> err > Right join -> do -Once we now that a new client wants to join, retrieve the current slide -number and a new, free index, perform some housekeeping, and send the -new client the current state. +Once we now that a new client wants to join, we can start actually +processing the connection. First, we can fork a pinging thread to +the background: + +> WS.withPingThread conn 30 (return ()) $ do + +Then we can retrieve the global server state. Note that rooms are +ephemeral — they just get created as soon as someone joins them; +so we also have to check if the room already exists, and, if not, +create a new MVar to store that room's state. + +> s <- takeMVar state +> (i,n, roomstate) <- case M.lookup (room join) s of +> Nothing -> do +> room' <- newMVar ([(0,conn)],0) +> let s' = M.insert (room join) room' s +> putMVar state s' +> return (0,0, room') +> Just room' -> do +> (i,n) <- modifyMVar room' (\state -> return $ insertClient conn state) +> putMVar state s +> return (i,n, room') + +Now the client has joined, and we can print some debug output, send the +new client the current state so it can update its view, and hand over +to the usual message handling loop, which just needs the room's state, +not the server's global state: -Additionally, we can fork a pinging thread to the background: - -> s <- readMVar state -> let (i,n) = getNewIndex s -> WS.withPingThread conn 30 (return ()) $ flip finally (disconnect i) $ do > putStrLn $ show i <> " joined room " <> (show $ room join) > WS.sendTextData conn (T.pack $ "state " <> show n) -> modifyMVar_ state $ \map -> return (M.insert (room join) (addClient (i,conn) $ M.lookup (room join) map) map) -> talk (i,conn) (room join) state +> talk (i, conn) roomstate + +Only one thing is still left to do, which is to define the `insertClient` +function that was used above for brevity. It gets an already-existing +room, adds a client to it, and then returns the new room along with +the new client's index and the room's current slide, to make the call +of `modifyMVar` above look nicer. + > where -> getNewIndex :: ServerState -> (Int, Int) -> getNewIndex s = case M.lookup (room join) s of -> Nothing -> (0,0) -> Just (cs,n) -> case cs of -> [] -> (0,0) -> (i,c):_ -> (i+1,n) -> disconnect i = do -> modifyMVar_ state $ \map -> return (M.adjust (removeClient i) (room join) map) -> putStrLn $ show i <> " disconnected" +> insertClient :: WS.Connection -> Room -> (Room, (Int,Int)) +> insertClient client room = case room of +> ([],n) -> (([(0,client)], 0), (0,0)) +> ((i,conn'):cs,n) -> (((i+1,conn):(i,conn'):cs, n), (i+1,n)) @@ -144,18 +146,43 @@ Message Loop Still todo: how to dynamically handle different incoming data types via Aeson? -> talk :: Client -> Text -> MVar ServerState -> IO () -> talk (i,conn) room s = forever $ do +Before we start the message loop, we first set up a disconnect handler +which will remove the client from the room's state once the socket closes. + +After that, we just read in new messages, parse them as json messages, +and change the room's state accordingly (note: currently, this server is +"nice" and does not drop clients which send garbage instead of json; this +isn't really much of a concern here, but it would probably be better if +it did drop them). + +> talk :: Client -> MVar Room -> IO () +> talk (i,conn) roomstate = flip finally (disconnect i) $ forever $ do > msg <- WS.receiveData conn > let d = (eitherDecode msg) :: (Either String State) > case d of > Left err -> putStrLn $ "json malformed" <> err > Right new -> do -> modifyMVar_ s $ \map -> return $ M.adjust (\(cs,_) -> (cs, state new)) room map -> state <- readMVar s -> case M.lookup room state of -> Nothing -> putStrLn $ "whoops, room " <> show room <> " somehow got lost" -> Just (cs,n) -> broadcast ("state " <> (T.pack $ show n)) cs +> peers <- modifyMVar roomstate $ \(cs,n) -> return ((cs, state new), cs) +> broadcast ("state " <> (T.pack $ show $ state new)) peers +> where +> disconnect i = do +> modifyMVar_ roomstate (\room -> return $ removeClient i room) +> putStrLn $ show i <> " disconnected" +> removeClient :: Int -> Room -> Room +> removeClient i (cs,n) = (filter ((/= i) . fst) cs, n) + +Broadcasting is equivalent to just going through the list of clients. +Note that this is a linked list (i.e. may be slow and cause some cache +misses while iterating), but it's probably going to be fine unless there's +a couple thousand clients in a room. + +> broadcast :: Text -> [Client] -> IO () +> broadcast message cs = do +> --T.putStrLn message -- log messages +> forM_ cs $ \(_,conn) -> WS.sendTextData conn message + + + -- cgit v1.2.3