From 41bda3d76ab0b9d0064a473c8855d1e9cedaf7a6 Mon Sep 17 00:00:00 2001 From: Sam Balco Date: Wed, 21 Feb 2024 15:13:42 +0000 Subject: [PATCH 1/8] use TMChan instead of TChan to indicate when a session was terminated --- kore-rpc-types/kore-rpc-types.cabal | 1 + kore-rpc-types/src/Kore/JsonRpc/Server.hs | 38 +++++++++++++---------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/kore-rpc-types/kore-rpc-types.cabal b/kore-rpc-types/kore-rpc-types.cabal index d649a5892f..d71bc94963 100644 --- a/kore-rpc-types/kore-rpc-types.cabal +++ b/kore-rpc-types/kore-rpc-types.cabal @@ -74,6 +74,7 @@ common library build-depends: prettyprinter >=1.2 build-depends: stm >=2.5 build-depends: stm-conduit >= 4.0 + build-depends: stm-chans build-depends: text >=1.2 build-depends: unliftio >= 0.2 build-depends: vector >= 0.12.3.1 diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index 262f03c6e2..e3079a4fff 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -15,9 +15,9 @@ module Kore.JsonRpc.Server ( ) where import Control.Concurrent (forkIO, throwTo) -import Control.Concurrent.STM.TChan (newTChan, readTChan, writeTChan) +import Control.Concurrent.STM.TMChan (newTMChan, readTMChan, writeTMChan, closeTMChan) import Control.Exception (Exception (fromException), catch, mask, throw) -import Control.Monad (forM_, forever) +import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Logger (MonadLoggerIO) import Control.Monad.Logger qualified as Log @@ -104,12 +104,12 @@ srv :: [JsonRpcHandler] -> JSONRPCT m () srv respond handlers = do - reqQueue <- liftIO $ atomically newTChan + reqQueue <- liftIO $ atomically newTMChan let mainLoop tid = let loop = receiveBatchRequest >>= \case - Nothing -> do - return () + Nothing -> + atomically $ closeTMChan reqQueue Just (SingleRequest req) | Right True <- isCancel @q <$> fromRequest req -> do Log.logInfoN "Cancel request" liftIO $ throwTo tid CancelRequest @@ -118,7 +118,7 @@ srv respond handlers = do forM_ (getRequests req) $ \r -> do Log.logInfoN $ "Process request " <> mReqId r <> " " <> getReqMethod r Log.logDebugN $ Text.pack $ show r - liftIO $ atomically $ writeTChan reqQueue req + liftIO $ atomically $ writeTMChan reqQueue req loop in loop spawnWorker reqQueue >>= mainLoop @@ -150,17 +150,20 @@ srv respond handlers = do respondTo :: Request -> Log.LoggingT IO (Maybe Response) respondTo req = buildResponse (respond req) req - cancelReq :: ErrorObj -> BatchRequest -> Log.LoggingT IO () + cancelReq :: ErrorObj -> Maybe BatchRequest -> Log.LoggingT IO Bool cancelReq err = \case - SingleRequest req@Request{} -> do + Just (SingleRequest req@Request{}) -> do let reqVersion = getReqVer req reqId = getReqId req sendResponses $ SingleResponse $ ResponseError reqVersion err reqId - SingleRequest Notif{} -> pure () - BatchRequest reqs -> do + pure True + Just (SingleRequest Notif{}) -> pure True + Just (BatchRequest reqs) -> do sendResponses $ BatchResponse $ [ResponseError (getReqVer req) err (getReqId req) | req <- reqs, isRequest req] + pure True + Nothing -> pure False processReq :: BatchRequest -> Log.LoggingT IO () processReq = \case @@ -186,9 +189,12 @@ srv respond handlers = do a <- before restore (thing a) `catch` catchesHandler a - liftIO $ - forkIO $ - forever $ - bracketOnReqException - (atomically $ readTChan reqQueue) - (withLog . processReq) + loop = + bracketOnReqException + (atomically $ readTMChan reqQueue) + (\case + Nothing -> pure False + Just req -> withLog (processReq req) >> pure True + ) + >>= \shouldContinue -> if shouldContinue then loop else pure () + liftIO $ forkIO loop From 40f461c7a5f29c37d86073911e1a91d7b32dc388 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 21 Feb 2024 15:15:13 +0000 Subject: [PATCH 2/8] Format with fourmolu --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index e3079a4fff..9f97214292 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -15,7 +15,7 @@ module Kore.JsonRpc.Server ( ) where import Control.Concurrent (forkIO, throwTo) -import Control.Concurrent.STM.TMChan (newTMChan, readTMChan, writeTMChan, closeTMChan) +import Control.Concurrent.STM.TMChan (closeTMChan, newTMChan, readTMChan, writeTMChan) import Control.Exception (Exception (fromException), catch, mask, throw) import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) @@ -189,12 +189,12 @@ srv respond handlers = do a <- before restore (thing a) `catch` catchesHandler a - loop = + loop = bracketOnReqException (atomically $ readTMChan reqQueue) - (\case + ( \case Nothing -> pure False Just req -> withLog (processReq req) >> pure True - ) - >>= \shouldContinue -> if shouldContinue then loop else pure () + ) + >>= \shouldContinue -> if shouldContinue then loop else pure () liftIO $ forkIO loop From 360294ac11b4dc41bba420d68712ae484cf03443 Mon Sep 17 00:00:00 2001 From: Sam Balco Date: Wed, 21 Feb 2024 18:25:34 +0000 Subject: [PATCH 3/8] close TMChan --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index e3079a4fff..6f6d36ba99 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -63,7 +63,7 @@ runJSONRPCT encodeOpts ver ignore snk src f = do qs <- liftIO . atomically $ initSession ver ignore let inSnk = sinkTBMChan (inCh qs) outSrc = sourceTBMChan (outCh qs) - withAsync (runConduit $ src .| decodeConduit ver .| inSnk) $ + withAsync ((runConduit $ src .| decodeConduit ver .| inSnk) >> liftIO (atomically $ closeTBMChan $ inCh qs)) $ const $ withAsync (runConduit $ outSrc .| encodeConduit encodeOpts .| snk) $ \o -> withAsync (runReaderT processIncoming qs) $ @@ -122,6 +122,8 @@ srv respond handlers = do loop in loop spawnWorker reqQueue >>= mainLoop + liftIO $ putStrLn "session end" + where isRequest = \case Request{} -> True From 35396e4225509ce01b17a1d0a81bbb019ddadba1 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 21 Feb 2024 18:28:28 +0000 Subject: [PATCH 4/8] Format with fourmolu --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index 6ec621afdc..10211b2dec 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -63,7 +63,8 @@ runJSONRPCT encodeOpts ver ignore snk src f = do qs <- liftIO . atomically $ initSession ver ignore let inSnk = sinkTBMChan (inCh qs) outSrc = sourceTBMChan (outCh qs) - withAsync ((runConduit $ src .| decodeConduit ver .| inSnk) >> liftIO (atomically $ closeTBMChan $ inCh qs)) $ + withAsync + ((runConduit $ src .| decodeConduit ver .| inSnk) >> liftIO (atomically $ closeTBMChan $ inCh qs)) $ const $ withAsync (runConduit $ outSrc .| encodeConduit encodeOpts .| snk) $ \o -> withAsync (runReaderT processIncoming qs) $ @@ -123,7 +124,6 @@ srv respond handlers = do in loop spawnWorker reqQueue >>= mainLoop liftIO $ putStrLn "session end" - where isRequest = \case Request{} -> True From 3247738f59e83efa2780f4970ebc271f09552dbd Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 21 Feb 2024 18:30:03 +0000 Subject: [PATCH 5/8] Format with fourmolu --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index 10211b2dec..b2dab508ce 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -64,16 +64,17 @@ runJSONRPCT encodeOpts ver ignore snk src f = do let inSnk = sinkTBMChan (inCh qs) outSrc = sourceTBMChan (outCh qs) withAsync - ((runConduit $ src .| decodeConduit ver .| inSnk) >> liftIO (atomically $ closeTBMChan $ inCh qs)) $ - const $ - withAsync (runConduit $ outSrc .| encodeConduit encodeOpts .| snk) $ \o -> - withAsync (runReaderT processIncoming qs) $ - const $ do - a <- runReaderT f qs - liftIO $ do - atomically . closeTBMChan $ outCh qs - _ <- wait o - return a + ((runConduit $ src .| decodeConduit ver .| inSnk) >> liftIO (atomically $ closeTBMChan $ inCh qs)) + $ const + $ withAsync (runConduit $ outSrc .| encodeConduit encodeOpts .| snk) + $ \o -> + withAsync (runReaderT processIncoming qs) $ + const $ do + a <- runReaderT f qs + liftIO $ do + atomically . closeTBMChan $ outCh qs + _ <- wait o + return a -- | TCP server transport for JSON-RPC. jsonRpcServer :: From 42089f684ee2027d864fee1aa05c490bf2a7a0e8 Mon Sep 17 00:00:00 2001 From: Sam Balco Date: Wed, 21 Feb 2024 18:38:56 +0000 Subject: [PATCH 6/8] switch from putStrLn to logging --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index 6ec621afdc..1f826636b2 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -122,7 +122,7 @@ srv respond handlers = do loop in loop spawnWorker reqQueue >>= mainLoop - liftIO $ putStrLn "session end" + Log.logInfoN $ "Session terminated" where isRequest = \case From 77f796c7615049801dbfd0013e8d8d1a95f5d98b Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 21 Feb 2024 18:40:51 +0000 Subject: [PATCH 7/8] Format with fourmolu --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 1 - 1 file changed, 1 deletion(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index 5084e8e90a..c3f58d5fc6 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -125,7 +125,6 @@ srv respond handlers = do in loop spawnWorker reqQueue >>= mainLoop Log.logInfoN $ "Session terminated" - where isRequest = \case Request{} -> True From 7409d1f0efd41b9d33de72a8000a1b13463cfad9 Mon Sep 17 00:00:00 2001 From: Georgy Lukyanov Date: Thu, 22 Feb 2024 12:21:54 +0100 Subject: [PATCH 8/8] Clean-up --- kore-rpc-types/src/Kore/JsonRpc/Server.hs | 47 ++++++++++++----------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index c3f58d5fc6..20ba4d9f70 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -17,7 +17,7 @@ module Kore.JsonRpc.Server ( import Control.Concurrent (forkIO, throwTo) import Control.Concurrent.STM.TMChan (closeTMChan, newTMChan, readTMChan, writeTMChan) import Control.Exception (Exception (fromException), catch, mask, throw) -import Control.Monad (forM_) +import Control.Monad (forM_, when) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Logger (MonadLoggerIO) import Control.Monad.Logger qualified as Log @@ -63,18 +63,20 @@ runJSONRPCT encodeOpts ver ignore snk src f = do qs <- liftIO . atomically $ initSession ver ignore let inSnk = sinkTBMChan (inCh qs) outSrc = sourceTBMChan (outCh qs) - withAsync - ((runConduit $ src .| decodeConduit ver .| inSnk) >> liftIO (atomically $ closeTBMChan $ inCh qs)) - $ const - $ withAsync (runConduit $ outSrc .| encodeConduit encodeOpts .| snk) - $ \o -> - withAsync (runReaderT processIncoming qs) $ - const $ do - a <- runReaderT f qs - liftIO $ do - atomically . closeTBMChan $ outCh qs - _ <- wait o - return a + decodeInput = do + runConduit $ src .| decodeConduit ver .| inSnk + liftIO (atomically $ closeTBMChan $ inCh qs) + encodeOutput = runConduit $ outSrc .| encodeConduit encodeOpts .| snk + withAsync decodeInput $ + const $ + withAsync encodeOutput $ \o -> + withAsync (runReaderT processIncoming qs) $ + const $ do + a <- runReaderT f qs + liftIO $ do + atomically . closeTBMChan $ outCh qs + _ <- wait o + return a -- | TCP server transport for JSON-RPC. jsonRpcServer :: @@ -124,7 +126,7 @@ srv respond handlers = do loop in loop spawnWorker reqQueue >>= mainLoop - Log.logInfoN $ "Session terminated" + Log.logInfoN "Session terminated" where isRequest = \case Request{} -> True @@ -192,12 +194,13 @@ srv respond handlers = do a <- before restore (thing a) `catch` catchesHandler a - loop = - bracketOnReqException - (atomically $ readTMChan reqQueue) - ( \case - Nothing -> pure False - Just req -> withLog (processReq req) >> pure True - ) - >>= \shouldContinue -> if shouldContinue then loop else pure () + loop = do + shouldContinue <- + bracketOnReqException + (atomically $ readTMChan reqQueue) + ( \case + Nothing -> pure False + Just req -> withLog (processReq req) >> pure True + ) + when shouldContinue loop liftIO $ forkIO loop