Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use TMChan instead of TChan to indicate when a session was terminated #3727

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kore-rpc-types/kore-rpc-types.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ common library
build-depends: prettyprinter >=1.2
build-depends: stm >=2.5
build-depends: stm-conduit >= 4.0
build-depends: stm-chans
build-depends: text >=1.2
build-depends: unliftio >= 0.2
build-depends: vector >= 0.12.3.1
Expand Down
46 changes: 29 additions & 17 deletions kore-rpc-types/src/Kore/JsonRpc/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ module Kore.JsonRpc.Server (
) where

import Control.Concurrent (forkIO, throwTo)
import Control.Concurrent.STM.TChan (newTChan, readTChan, writeTChan)
import Control.Concurrent.STM.TMChan (closeTMChan, newTMChan, readTMChan, writeTMChan)
import Control.Exception (Exception (fromException), catch, mask, throw)
import Control.Monad (forM_, forever)
import Control.Monad (forM_, when)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Logger qualified as Log
Expand Down Expand Up @@ -63,9 +63,13 @@ runJSONRPCT encodeOpts ver ignore snk src f = do
qs <- liftIO . atomically $ initSession ver ignore
let inSnk = sinkTBMChan (inCh qs)
outSrc = sourceTBMChan (outCh qs)
withAsync (runConduit $ src .| decodeConduit ver .| inSnk) $
decodeInput = do
runConduit $ src .| decodeConduit ver .| inSnk
liftIO (atomically $ closeTBMChan $ inCh qs)
encodeOutput = runConduit $ outSrc .| encodeConduit encodeOpts .| snk
withAsync decodeInput $
const $
withAsync (runConduit $ outSrc .| encodeConduit encodeOpts .| snk) $ \o ->
withAsync encodeOutput $ \o ->
withAsync (runReaderT processIncoming qs) $
const $ do
a <- runReaderT f qs
Expand Down Expand Up @@ -104,12 +108,12 @@ srv ::
[JsonRpcHandler] ->
JSONRPCT m ()
srv respond handlers = do
reqQueue <- liftIO $ atomically newTChan
reqQueue <- liftIO $ atomically newTMChan
let mainLoop tid =
let loop =
receiveBatchRequest >>= \case
Nothing -> do
return ()
Nothing ->
atomically $ closeTMChan reqQueue
Just (SingleRequest req) | Right True <- isCancel @q <$> fromRequest req -> do
Log.logInfoN "Cancel request"
liftIO $ throwTo tid CancelRequest
Expand All @@ -118,10 +122,11 @@ srv respond handlers = do
forM_ (getRequests req) $ \r -> do
Log.logInfoN $ "Process request " <> mReqId r <> " " <> getReqMethod r
Log.logDebugN $ Text.pack $ show r
liftIO $ atomically $ writeTChan reqQueue req
liftIO $ atomically $ writeTMChan reqQueue req
loop
in loop
spawnWorker reqQueue >>= mainLoop
Log.logInfoN "Session terminated"
where
isRequest = \case
Request{} -> True
Expand Down Expand Up @@ -150,17 +155,20 @@ srv respond handlers = do
respondTo :: Request -> Log.LoggingT IO (Maybe Response)
respondTo req = buildResponse (respond req) req

cancelReq :: ErrorObj -> BatchRequest -> Log.LoggingT IO ()
cancelReq :: ErrorObj -> Maybe BatchRequest -> Log.LoggingT IO Bool
cancelReq err = \case
SingleRequest req@Request{} -> do
Just (SingleRequest req@Request{}) -> do
let reqVersion = getReqVer req
reqId = getReqId req
sendResponses $ SingleResponse $ ResponseError reqVersion err reqId
SingleRequest Notif{} -> pure ()
BatchRequest reqs -> do
pure True
Just (SingleRequest Notif{}) -> pure True
Just (BatchRequest reqs) -> do
sendResponses $
BatchResponse $
[ResponseError (getReqVer req) err (getReqId req) | req <- reqs, isRequest req]
pure True
Nothing -> pure False

processReq :: BatchRequest -> Log.LoggingT IO ()
processReq = \case
Expand All @@ -186,9 +194,13 @@ srv respond handlers = do
a <- before
restore (thing a) `catch` catchesHandler a

liftIO $
forkIO $
forever $
loop = do
shouldContinue <-
bracketOnReqException
(atomically $ readTChan reqQueue)
(withLog . processReq)
(atomically $ readTMChan reqQueue)
( \case
Nothing -> pure False
Just req -> withLog (processReq req) >> pure True
)
when shouldContinue loop
liftIO $ forkIO loop
Loading