Skip to content

Commit

Permalink
Run message listening loop in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
joashc committed Jan 22, 2016
1 parent 1f42677 commit 2a2f635
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
4 changes: 4 additions & 0 deletions network/DcServerFree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ data DcServerOperator next =
InitServer next
| GetMessage (ServerMessage -> next)
| AddPeer Participant next
| GetFullPeerList ([Participant] -> next)
| AddStream RoundStream next
| SendBroadcast [Participant] Broadcast next
| GetServerState (ServerState -> next)
Expand Down Expand Up @@ -59,6 +60,9 @@ addStream s = liftF $ AddStream s ()
getServerState :: DcServer ServerState
getServerState = liftF $ GetServerState id

getFullPeerList :: DcServer [Participant]
getFullPeerList = liftF $ GetFullPeerList id

throw :: ServerError -> DcServer ()
throw err = liftF $ Throw err ()

19 changes: 15 additions & 4 deletions network/DcServerIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ instead of writing the interpreter directly in @Free f a@ and having to type @Pu
serverIO :: DcServerOperator (DcServerIO next) -> DcServerIO next
serverIO (InitServer next) = do
n <- liftIO getGroupSize
ss <- readState
socket <- liftIO $ listenOn $ PortNumber 6969
let initialized = ss & (numPeers .~ n) <$> (listenSocket .~ Just socket)
_ <- swapState initialized
let initialized = SS n [] [] (Just socket)
state <- get
liftIO . atomically $ putTMVar state initialized
next
serverIO (SayString s next) = do
liftIO $ putStrLn s
Expand All @@ -92,6 +92,14 @@ serverIO (SendBroadcast ps msg next) = do
liftIO $ putStrLn "Broadcasting..."
liftIO $ mapM_ (sendToPeer msg) ps
next
serverIO (GetFullPeerList next) = do
state <- get
peers <- liftIO . atomically $ do
s <- readTMVar state
if s ^. numPeers == s ^. registeredPeers . to length
then return $ s ^. registeredPeers
else retry
next peers
serverIO (Throw err next) = throwError err >> next

addStreamIfNeeded :: RoundStream -> ServerState -> ServerState
Expand Down Expand Up @@ -126,6 +134,9 @@ reportResult (Left a) = print a

-- | Run our Program
runServer = do
serverStateTMVar <- liftIO . atomically $ newTMVar $ SS 0 [] [] Nothing
serverStateTMVar <- liftIO . atomically $ newEmptyTMVar
liftIO . forkIO $ do
_ <- runStateT (runExceptT (serve listenForMessages)) serverStateTMVar
return ()
(r, _) <- runStateT (runExceptT (serve serverProg)) serverStateTMVar
reportResult r
6 changes: 5 additions & 1 deletion network/DcServerSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import Messaging
serverProg :: DcServer ()
serverProg = do
initServer
forever $ getMessage >>= messageHandler
peers <- getFullPeerList
sendBroadcast peers $ PeerListB peers

listenForMessages :: DcServer()
listenForMessages = forever $ getMessage >>= messageHandler

messageHandler :: ServerMessage -> DcServer ()
messageHandler (PeerJoin ps) = addPeer ps
Expand Down

0 comments on commit 2a2f635

Please sign in to comment.