Replies: 4 comments
-
我猜想你想表达的是: socketchannel 这个模块,在 socketchannel:close() 调用时,如果 但我认为不会。 首先, 其次, https://github.com/cloudwu/skynet/blob/master/lualib/skynet/socketchannel.lua#L538 |
Beta Was this translation helpful? Give feedback.
-
不好意思,self.__sock不是nil,是等于false,我打错了. 我的意思是 skynet/lualib/skynet/socket.lua Line 340 in 7c3942c 当read到一个完整的包唤醒时,如果排在self.__sock=false后面了,这样当处理完一个包之后再while的时候由于self.__sock==false,就退出循环了. 并不是说这个channel没有正确关闭,它确实正确的关闭了,只是正常的情况下还未返回的responose会在read到socket error之后被唤醒并中断掉 skynet/lualib/skynet/socketchannel.lua Line 129 in 7c3942c |
Beta Was this translation helpful? Give feedback.
-
这个测试用例确实能复现这个问题。 local skynet = require "skynet"
local service = require "skynet.service"
local socketchannel = require "skynet.socketchannel"
local function server()
local skynet = require "skynet"
local skynet_util = require "skynet-fly.utils.skynet_util"
local socket = require "skynet.socket"
local CMD = {}
skynet.start(function()
skynet_util.lua_dispatch(CMD)
local listen_fd = socket.listen('0.0.0.0', 10001)
skynet.error("socket listen on ", 10001)
socket.start(listen_fd, function(fd, addr)
socket.start(fd)
skynet.error('connect from ' .. addr)
skynet.fork(function()
while true do
local session = socket.readline(fd, '\n')
local data = socket.readline(fd, '\n')
if session and data then
skynet.error('recv: session = ' .. session .. ' data = ' .. data)
socket.write(fd, session .. '\n')
skynet.sleep(1)
socket.write(fd, data .. '\n')
else
socket.close(fd)
skynet.error('disconnect from ' .. addr)
break
end
end
end)
end)
end)
end
local CMD = {}
function CMD.start()
skynet.fork(function()
service.new("socket_server", server)
local channel = socketchannel.channel {
host = '127.0.0.1',
port = 10001,
response = function(sock)
skynet.sleep(10)
local session = sock:readline('\n')
local data = sock:readline('\n')
session = tonumber(session)
skynet.error(">>> ",session, data)
return session, true, data, false
end,
nodelay = true,
}
local g_session = 0
local function new_session()
g_session = g_session + 1
return g_session
end
for i = 1, 10 do
skynet.fork(function ()
local session_id = new_session()
local data = string.format("%s\n%s\n", session_id, "hello world " .. i)
local isok, rsp = pcall(channel.request, channel, data, session_id)
skynet.error("rsp : ", i, isok, rsp)
end)
end
skynet.sleep(10)
skynet.error("close begin")
channel:close()
skynet.error("close end")
skynet.sleep(100)
local log = require "skynet-fly.log"
log.info(channel)
end)
return true
end
function CMD.exit()
return true
end
return CMD 测试结果[:0000000f][20241230 17:06:39 39]socket listen on 10001
[:0000000f][20241230 17:06:39 39]connect from 127.0.0.1:56198
[:0000000f][20241230 17:06:39 39]recv: session = 1 data = hello world 1
[:0000000f][20241230 17:06:39 40]recv: session = 2 data = hello world 2
[:0000000f][20241230 17:06:39 41]recv: session = 3 data = hello world 3
[:0000000f][20241230 17:06:39 42]recv: session = 4 data = hello world 4
[:0000000f][20241230 17:06:39 43]recv: session = 5 data = hello world 5
[:0000000f][20241230 17:06:39 44]recv: session = 6 data = hello world 6
[:0000000f][20241230 17:06:39 45]recv: session = 7 data = hello world 7
[:0000000f][20241230 17:06:39 46]recv: session = 8 data = hello world 8
[:0000000f][20241230 17:06:39 47]recv: session = 9 data = hello world 9
[:0000000f][20241230 17:06:39 48]recv: session = 10 data = hello world 10
[:0000000e][20241230 17:06:39 49]close begin
[:0000000e][20241230 17:06:39 49]>>> 1 hello world 1
[:0000000e][20241230 17:06:39 49]dispatch_by_session >>> false 10
[:0000000e][20241230 17:06:39 49]rsp : 1 true hello world 1
[:0000000f][20241230 17:06:39 49]socket: error on 4 Connection reset by peer
[:0000000f][20241230 17:06:39 49]disconnect from 127.0.0.1:56198
[:0000000e][20241230 17:06:39 49]close end
[:0000000e][20241230 17:06:40 49][info][socket_channel_m][./module/socket_channel_m.lua:80]{
["__thread"] = {
[2] = thread: 0x7fefb320e8c8,
[3] = thread: 0x7fefb320e708,
[4] = thread: 0x7fefb320fce8,
[5] = thread: 0x7fefb1c0a0e8,
[6] = thread: 0x7fefb1c0a1c8,
[7] = thread: 0x7fefb1c0a2a8,
[8] = thread: 0x7fefb1c0a388,
[9] = thread: 0x7fefb1c0a468,
[10] = thread: 0x7fefb1c0a548,
}
["__overload"] = false,
["__nodelay"] = true,
["__port"] = 10001,
["__host"] = "127.0.0.1",
["__socket_meta"] = {
["__index"] = {
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["__overload"] = false,
["__nodelay"] = true,
["__port"] = 10001,
["__host"] = "127.0.0.1",
["__socket_meta"] = {
["__index"] = {
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["__nodelay"] = true,
["__port"] = 10001,
["__host"] = "127.0.0.1",
["__socket_meta"] = {
["__index"] = {
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["__port"] = 10001,
["__host"] = "127.0.0.1",
["__socket_meta"] = {
["__index"] = {
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["__socket_meta"] = {
["__index"] = {
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["__index"] = {
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["read"] = function: 0x7fefb331ad40,
["readline"] = function: 0x7fefb331ad80,
["readline"] = function: 0x7fefb331ad80,
}
["__gc"] = function: 0x7fefb3312f00,
}
["__gc"] = function: 0x7fefb3312f00,
["__gc"] = function: 0x7fefb3312f00,
}
}
["__response"] = function: 0x7fefb325ddf0,
["__sock"] = false,
["__request"] = {
["__sock"] = false,
["__request"] = {
["__request"] = {
}
["__connecting"] = {
}
["__closed"] = true,
}
["__closed"] = true,
["__closed"] = true,
["__result_data"] = {
}
["__authcoroutine"] = false,
["__result"] = {
}
} 关闭后确实有携程没被唤醒。 测试发现 local skynet = require "skynet"
local service = require "skynet.service"
local socketchannel = require "skynet.socketchannel"
local function server()
local skynet = require "skynet"
local skynet_util = require "skynet-fly.utils.skynet_util"
local socket = require "skynet.socket"
local CMD = {}
skynet.start(function()
skynet_util.lua_dispatch(CMD)
local listen_fd = socket.listen('0.0.0.0', 10001)
skynet.error("socket listen on ", 10001)
socket.start(listen_fd, function(fd, addr)
socket.start(fd)
skynet.error('connect from ' .. addr)
skynet.fork(function()
while true do
local data = socket.readline(fd, '\n')
if data then
skynet.error('recv: data = ' .. data)
skynet.sleep(1)
socket.write(fd, data .. '\n')
else
socket.close(fd)
skynet.error('disconnect from ' .. addr)
break
end
end
end)
end)
end)
end
local CMD = {}
function CMD.start()
skynet.fork(function()
service.new("socket_server", server)
local channel = socketchannel.channel {
host = '127.0.0.1',
port = 10001,
nodelay = true,
}
local response = function (sock)
skynet.sleep(10)
local data = sock:readline('\n')
skynet.error(">>> ", data)
return true, data, false
end
for i = 1, 10 do
skynet.fork(function ()
local data = string.format("%s\n", "hello world " .. i)
local isok, rsp = pcall(channel.request, channel, data, response)
skynet.error("rsp : ", i, isok, rsp)
end)
end
skynet.sleep(10)
skynet.error("close begin")
channel:close()
skynet.error("close end")
skynet.sleep(100)
local log = require "skynet-fly.log"
log.info(channel)
end)
return true
end
function CMD.exit()
return true
end
return CMD 测试结果[:0000000f][20241230 17:35:08 77]connect from 127.0.0.1:34218
[:0000000f][20241230 17:35:08 77]recv: data = hello world 1
[:0000000f][20241230 17:35:08 78]recv: data = hello world 2
[:0000000f][20241230 17:35:08 79]recv: data = hello world 3
[:0000000f][20241230 17:35:08 80]recv: data = hello world 4
[:0000000f][20241230 17:35:08 81]recv: data = hello world 5
[:0000000f][20241230 17:35:08 82]recv: data = hello world 6
[:0000000f][20241230 17:35:08 83]recv: data = hello world 7
[:0000000f][20241230 17:35:08 84]recv: data = hello world 8
[:0000000f][20241230 17:35:08 85]recv: data = hello world 9
[:0000000f][20241230 17:35:08 86]recv: data = hello world 10
[:0000000e][20241230 17:35:08 87]close begin
[:0000000e][20241230 17:35:08 87]>>> hello world 1
[:0000000e][20241230 17:35:08 87]dispatch_by_order >>> true true hello world 1 false
[:0000000e][20241230 17:35:08 87]rsp : 1 true hello world 1
[:0000000f][20241230 17:35:08 87]socket: error on 4 Connection reset by peer
[:0000000f][20241230 17:35:08 87]disconnect from 127.0.0.1:34218
[:0000000e][20241230 17:35:08 87]close end
[:0000000e][20241230 17:35:09 87][info][socket_channel_m][./module/socket_channel_m.lua:161]{
["__connecting"] = {
}
["__port"] = 10001,
["__authcoroutine"] = false,
["__thread"] = {
[1] = thread: 0x7f54ccc158c8,
[2] = thread: 0x7f54ccc15708,
[3] = thread: 0x7f54ccc16ce8,
[4] = thread: 0x7f54cc80e0e8,
[5] = thread: 0x7f54cc80e1c8,
[6] = thread: 0x7f54cc80e2a8,
[7] = thread: 0x7f54cc80e388,
[8] = thread: 0x7f54cc80e468,
[9] = thread: 0x7f54cc80e548,
[10] = false,
} |
Beta Was this translation helpful? Give feedback.
-
移到 #2020 继续讨论。 |
Beta Was this translation helpful? Give feedback.
-
线上运营的项目发现在调用cluster.reload设置为false的时候,大部分情况下没有返回的请求都会收到socket error中断掉,有一次发现请求没有中断,没中断的这个函数正好在queue中导致后续逻辑全部等待没有响应.
阅读代码发现在dispatch_by_session这个函数中,clustersender传进来的response函数里read是挂起的,当挂起后cluster.reload设置了false,self.__sock就被设置为nil,这时如果正好有完整的包收到,会执行到ok and session的逻辑,逻辑结束后因为self.__sock为nil,while就跳出了,那socket error事件就无法再响应上来,无法执行wakeup_all.
由于这个情况正常不好复现,我测试了发2个request后等10毫秒cluster.reload设置为false,第一个包收到回包后再socket里等50毫秒再返回到socketchannel,第二个包对端等1秒再返回,这时能复现第二个request无法中断一直挂着的情况.
不知道这个分析和测试是否有问题,希望云风大佬看一下.
Beta Was this translation helpful? Give feedback.
All reactions