-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not run forever
in request worker threads
#3726
Conversation
@@ -188,7 +205,7 @@ srv respond handlers = do | |||
|
|||
liftIO $ | |||
forkIO $ | |||
forever $ | |||
id $ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this id
is here just to preserve formatting, must be removed if we merge this
@@ -757,10 +755,7 @@ runServer port serverState mainModule runSMT Log.LoggerEnv{logAction} = do | |||
flip runLoggingT logFun $ | |||
jsonRpcServer | |||
srvSettings | |||
( \req parsed -> | |||
log (InfoJsonRpcProcessRequest (getReqId req) parsed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already emit a similar log message on receiving requests in Kore.JsonRpc.Server.srv
How exactly is this being tested? From the logs, it would appear that each request is using a brand new client connection (since the request ID is always 1). If this is the case, the old behaviour is to be expected, unless the client connections were actually closed, in which case it's definitely a bug. |
Good point. It looks very much like each request uses a fresh connection/session in these tests but something is amiss either way:
An important part of understanding this is to understand how exactly the sessions in |
Upon closer inspection, the thread status logging itself unfortunately creates a misleading picture:
I changed the code to use
i.e., the previous two worker threads have been removed (by garbage collection). So I think all is well with the current server code in that respect. Code changes to avoid holding thread references:diff --git a/kore-rpc-types/src/Kore/JsonRpc/Server.hs b/kore-rpc-types/src/Kore/JsonRpc/Server.hs
index fe3d8b7cf..9c05b183c 100644
--- a/kore-rpc-types/src/Kore/JsonRpc/Server.hs
+++ b/kore-rpc-types/src/Kore/JsonRpc/Server.hs
@@ -18,7 +18,7 @@ import Control.Concurrent (forkIO, throwTo)
import Control.Concurrent.STM qualified as GHC
import Control.Concurrent.STM.TChan (newTChan, readTChan, writeTChan)
import Control.Exception (Exception (fromException), catch, mask, throw)
-import Control.Monad (forM, forM_)
+import Control.Monad (forM, forM_, forever)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Logger qualified as Log
@@ -33,6 +33,7 @@ import Data.Conduit.Network (ServerSettings, appSink, appSource, runGeneralTCPSe
import Data.Conduit.TMChan (closeTBMChan, sinkTBMChan, sourceTBMChan)
import Data.Maybe (catMaybes)
import Data.Text qualified as Text
+import System.Mem.Weak qualified as GHC
import GHC.Conc.Sync qualified as GHC
import Kore.JsonRpc.Types (FromRequestCancellable (isCancel), ReqException (..), rpcJsonConfig)
import Network.JSONRPC hiding (encodeConduit, runJSONRPCT)
@@ -88,6 +89,8 @@ jsonRpcServer ::
m a
jsonRpcServer serverSettings respond handlers =
runGeneralTCPServer serverSettings $ \cl ->
+ liftIO GHC.myThreadId >>= \me ->
+ Log.logInfoNS "kore-rpc" (Text.pack $ "Json RPC Request handler in thread " <> show me) >>
runJSONRPCT
-- we have to ensure that the returned messages contain no newlines
-- inside the message and have a trailing newline, which is used as the delimiter
@@ -101,7 +104,7 @@ jsonRpcServer serverSettings respond handlers =
data JsonRpcHandler = forall e. Exception e => JsonRpcHandler (e -> Log.LoggingT IO ErrorObj)
{-# NOINLINE workerList #-}
-workerList :: GHC.TVar [GHC.ThreadId]
+workerList :: GHC.TVar [GHC.Weak GHC.ThreadId]
workerList = IO.unsafePerformIO $ GHC.newTVarIO []
srv ::
@@ -130,13 +133,14 @@ srv respond handlers = do
in loop
newWorkerTid <- spawnWorker reqQueue
Log.logInfoN $ "Spawned worker with thread id " <> Text.pack (show newWorkerTid)
- atomically $ GHC.modifyTVar workerList (newWorkerTid :)
- Log.logInfoN "Status of worker threads: "
+ weakWorkerTid <- liftIO $ GHC.mkWeakThreadId newWorkerTid
+ atomically $ GHC.modifyTVar workerList (weakWorkerTid :)
workerIds <- atomically $ GHC.readTVar workerList
- workerStatuses <- forM workerIds $ \tId -> do
- status <- liftIO $ GHC.threadStatus tId
- pure $ " " <> show tId <> ": " <> show status
- Log.logInfoN . Text.pack . unlines $ workerStatuses
+ workerStatuses <- forM workerIds $ \wkTId -> do
+ mbTId <- liftIO $ GHC.deRefWeak wkTId
+ status <- liftIO $ maybe (pure "Gone") (fmap show . GHC.threadStatus) mbTId
+ pure $ " " <> show mbTId <> ": " <> status
+ Log.logInfoN . Text.pack . unlines $ "Status of worker threads: " : workerStatuses
mainLoop newWorkerTid
where
@@ -205,7 +209,7 @@ srv respond handlers = do
liftIO $
forkIO $
- id $
+ forever $
bracketOnReqException
(atomically $ readTChan reqQueue)
(withLog . processReq) |
Closing this as this is not a valid solution, see #3727 instead. |
I've added code that tracks the worked threads spawned in
Kore.JsonRpc.Server.srv
, and polls them on their status when spawning a new one. It turns out, that they never exit, because we will spawn a new worker for every request.We end up with logs like when submitting several copies of the same requests one after the other:
and with change we get the threads to stop: