diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs index 262f03c6e2..fe3d8b7cf2 100644 --- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs +++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs @@ -15,9 +15,10 @@ module Kore.JsonRpc.Server ( ) where import Control.Concurrent (forkIO, throwTo) +import Control.Concurrent.STM qualified as GHC import Control.Concurrent.STM.TChan (newTChan, readTChan, writeTChan) import Control.Exception (Exception (fromException), catch, mask, throw) -import Control.Monad (forM_, forever) +import Control.Monad (forM, forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Logger (MonadLoggerIO) import Control.Monad.Logger qualified as Log @@ -32,8 +33,10 @@ import Data.Conduit.Network (ServerSettings, appSink, appSource, runGeneralTCPSe import Data.Conduit.TMChan (closeTBMChan, sinkTBMChan, sourceTBMChan) import Data.Maybe (catMaybes) import Data.Text qualified as Text +import GHC.Conc.Sync qualified as GHC import Kore.JsonRpc.Types (FromRequestCancellable (isCancel), ReqException (..), rpcJsonConfig) import Network.JSONRPC hiding (encodeConduit, runJSONRPCT) +import System.IO.Unsafe qualified as IO import UnliftIO (MonadUnliftIO, atomically, wait, withAsync) -- Conduit to encode JSON to ByteString. @@ -97,6 +100,10 @@ jsonRpcServer serverSettings respond handlers = data JsonRpcHandler = forall e. Exception e => JsonRpcHandler (e -> Log.LoggingT IO ErrorObj) +{-# NOINLINE workerList #-} +workerList :: GHC.TVar [GHC.ThreadId] +workerList = IO.unsafePerformIO $ GHC.newTVarIO [] + srv :: forall m q r. (MonadLoggerIO m, FromRequestCancellable q, ToJSON r) => @@ -121,7 +128,17 @@ srv respond handlers = do liftIO $ atomically $ writeTChan reqQueue req loop in loop - spawnWorker reqQueue >>= mainLoop + newWorkerTid <- spawnWorker reqQueue + Log.logInfoN $ "Spawned worker with thread id " <> Text.pack (show newWorkerTid) + atomically $ GHC.modifyTVar workerList (newWorkerTid :) + Log.logInfoN "Status of worker threads: " + workerIds <- atomically $ GHC.readTVar workerList + workerStatuses <- forM workerIds $ \tId -> do + status <- liftIO $ GHC.threadStatus tId + pure $ " " <> show tId <> ": " <> show status + Log.logInfoN . Text.pack . unlines $ workerStatuses + + mainLoop newWorkerTid where isRequest = \case Request{} -> True @@ -188,7 +205,7 @@ srv respond handlers = do liftIO $ forkIO $ - forever $ + id $ bracketOnReqException (atomically $ readTChan reqQueue) (withLog . processReq) diff --git a/kore/src/Kore/JsonRpc.hs b/kore/src/Kore/JsonRpc.hs index 810ba8df53..643c1a5133 100644 --- a/kore/src/Kore/JsonRpc.hs +++ b/kore/src/Kore/JsonRpc.hs @@ -57,7 +57,6 @@ import Kore.JsonRpc.Error import Kore.JsonRpc.Server ( ErrorObj (..), JsonRpcHandler (JsonRpcHandler), - Request (getReqId), Respond, jsonRpcServer, ) @@ -69,7 +68,6 @@ import Kore.Log.DecidePredicateUnknown ( srcLoc, ) import Kore.Log.InfoExecDepth (ExecDepth (..)) -import Kore.Log.InfoJsonRpcProcessRequest (InfoJsonRpcProcessRequest (..)) import Kore.Log.JsonRpc (LogJsonRpcServer (..)) import Kore.Parser (parseKoreModule) import Kore.Reachability.Claim qualified as Claim @@ -757,10 +755,7 @@ runServer port serverState mainModule runSMT Log.LoggerEnv{logAction} = do flip runLoggingT logFun $ jsonRpcServer srvSettings - ( \req parsed -> - log (InfoJsonRpcProcessRequest (getReqId req) parsed) - >> respond serverState mainModule runSMT parsed - ) + (const (respond serverState mainModule runSMT)) [ handleDecidePredicateUnknown , handleErrorCall , handleSomeException @@ -771,9 +766,6 @@ runServer port serverState mainModule runSMT Log.LoggerEnv{logAction} = do logFun loc src level msg = Log.logWith logAction $ LogJsonRpcServer{loc, src, level, msg} - log :: MonadIO m => Log.Entry entry => entry -> m () - log = Log.logWith $ Log.hoistLogAction liftIO logAction - handleDecidePredicateUnknown :: JsonRpcHandler handleDecidePredicateUnknown = JsonRpcHandler $ \(err :: DecidePredicateUnknown) -> pure (backendError SmtSolverError $ externaliseDecidePredicateUnknown err)