From f9396304075467cab67affca7ee34a3caa7019a6 Mon Sep 17 00:00:00 2001
From: Doron Somech <somdoron@gmail.com>
Date: Wed, 13 May 2020 17:32:06 +0300
Subject: [PATCH] problem: zeromq connects peer before handshake is completed
Solution: delay connecting the peer pipe until the handshake is completed
(cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09)
Conflicts:
src/i_engine.hpp
src/norm_engine.hpp
src/pgm_receiver.hpp
src/pgm_sender.hpp
src/raw_engine.cpp
src/session_base.cpp
src/session_base.hpp
src/stream_engine_base.cpp
src/stream_engine_base.hpp
src/udp_engine.hpp
src/ws_engine.cpp
src/zmtp_engine.cpp
---
src/i_engine.hpp | 4 ++++
src/ipc_connecter.cpp | 2 +-
src/ipc_listener.cpp | 2 +-
src/norm_engine.hpp | 2 ++
src/pgm_receiver.hpp | 1 +
src/pgm_sender.hpp | 1 +
src/session_base.cpp | 19 +++++++++++++------
src/session_base.hpp | 1 +
src/socks_connecter.cpp | 2 +-
src/stream_engine.cpp | 12 ++++++++++--
src/stream_engine.hpp | 8 +++++++-
src/tcp_connecter.cpp | 2 +-
src/tcp_listener.cpp | 2 +-
src/tipc_connecter.cpp | 2 +-
src/tipc_listener.cpp | 2 +-
src/udp_engine.hpp | 2 ++
16 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index dfbdd265..a12ec6f2 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -40,6 +40,10 @@ struct i_engine
{
virtual ~i_engine () {}
+ // Indicate if the engine has an handshake stage.
+ // If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
+ virtual bool has_handshake_stage () = 0;
+
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 579c5042..bc95fc5e 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -127,7 +127,7 @@ void zmq::ipc_connecter_t::out_event ()
}
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (fd, options, endpoint);
+ new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index fa2222ca..d2316370 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -173,7 +173,7 @@ void zmq::ipc_listener_t::in_event ()
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (fd, options, endpoint);
+ new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp
index 733f24fc..86095b65 100644
--- a/src/norm_engine.hpp
+++ b/src/norm_engine.hpp
@@ -28,6 +28,8 @@ class norm_engine_t : public io_object_t, public i_engine
int init (const char *network_, bool send, bool recv);
void shutdown ();
+ bool has_handshake_stage () { return false; };
+
// i_engine interface implementation.
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index d5eca74d..7edc4c30 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -55,6 +55,7 @@ class pgm_receiver_t : public io_object_t, public i_engine
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
+ bool has_handshake_stage () { return false; };
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate ();
bool restart_input ();
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 1eb0b1f1..88b82946 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -54,6 +54,7 @@ class pgm_sender_t : public io_object_t, public i_engine
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
+ bool has_handshake_stage () { return false; };
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate ();
bool restart_input ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index ef7daf95..b627fc82 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -289,7 +289,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
}
if (unlikely (_engine == NULL)) {
- _pipe->check_read ();
+ if (_pipe)
+ _pipe->check_read ();
return;
}
@@ -388,7 +389,18 @@ bool zmq::session_base_t::zap_enabled ()
void zmq::session_base_t::process_attach (i_engine *engine_)
{
zmq_assert (engine_ != NULL);
+ zmq_assert (!_engine);
+ _engine = engine_;
+ if (!engine_->has_handshake_stage ())
+ engine_ready ();
+
+ // Plug in the engine.
+ _engine->plug (_io_thread, this);
+}
+
+void zmq::session_base_t::engine_ready ()
+{
// Create the pipe if it does not exist yet.
if (!_pipe && !is_terminating ()) {
object_t *parents[2] = {this, _socket};
@@ -412,11 +424,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Ask socket to plug into the remote end of the pipe.
send_bind (_socket, pipes[1]);
}
-
- // Plug in the engine.
- zmq_assert (!_engine);
- _engine = engine_;
- _engine->plug (_io_thread, this);
}
void zmq::session_base_t::engine_error (
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 002cdf87..b106e43f 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -62,6 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
void flush ();
void rollback ();
void engine_error (zmq::stream_engine_t::error_reason_t reason_);
+ void engine_ready ();
// i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_);
diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp
index ada20726..ca4aba3a 100644
--- a/src/socks_connecter.cpp
+++ b/src/socks_connecter.cpp
@@ -152,7 +152,7 @@ void zmq::socks_connecter_t::in_event ()
else {
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (_s, options, _endpoint);
+ new (std::nothrow) stream_engine_t (_s, options, _endpoint, !options.raw_socket);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 4b13012d..5d372780 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -65,7 +65,8 @@
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
const options_t &options_,
- const std::string &endpoint_) :
+ const std::string &endpoint_,
+ bool has_handshake_stage_) :
_s (fd_),
_handle (static_cast<handle_t> (NULL)),
_inpos (NULL),
@@ -79,6 +80,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
_greeting_size (v2_greeting_size),
_greeting_bytes_read (0),
_session (NULL),
+ _has_handshake_stage (has_handshake_stage_),
_options (options_),
_endpoint (endpoint_),
_plugged (false),
@@ -290,9 +292,12 @@ void zmq::stream_engine_t::in_event ()
zmq_assert (!_io_error);
// If still handshaking, receive and process the greeting message.
- if (unlikely (_handshaking))
+ if (unlikely (_handshaking)) {
if (!handshake ())
return;
+ else if (_mechanism == NULL && _has_handshake_stage)
+ _session->engine_ready ();
+ }
zmq_assert (_decoder);
@@ -896,6 +901,9 @@ void zmq::stream_engine_t::mechanism_ready ()
_has_heartbeat_timer = true;
}
+ if (_has_handshake_stage)
+ _session->engine_ready ();
+
bool flush_session = false;
if (_options.recv_routing_id) {
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index a8149fd2..05212670 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -70,10 +70,12 @@ class stream_engine_t : public io_object_t, public i_engine
stream_engine_t (fd_t fd_,
const options_t &options_,
- const std::string &endpoint_);
+ const std::string &endpoint_,
+ bool has_handshake_stage_);
~stream_engine_t ();
// i_engine interface implementation.
+ bool has_handshake_stage () { return _has_handshake_stage; };
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate ();
bool restart_input ();
@@ -188,6 +190,10 @@ class stream_engine_t : public io_object_t, public i_engine
// The session this engine is attached to.
zmq::session_base_t *_session;
+ // Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
+ // when handshake is completed.
+ bool _has_handshake_stage;
+
const options_t _options;
// String representation of endpoint
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index b55d87fc..e8f1b573 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -155,7 +155,7 @@ void zmq::tcp_connecter_t::out_event ()
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (fd, options, _endpoint);
+ new (std::nothrow) stream_engine_t (fd, options, _endpoint, !options.raw_socket);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 19baaa1d..a64b93a2 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -116,7 +116,7 @@ void zmq::tcp_listener_t::in_event ()
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (fd, options, _endpoint);
+ new (std::nothrow) stream_engine_t (fd, options, _endpoint, !options.raw_socket);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp
index 168a0d40..b18aafe7 100644
--- a/src/tipc_connecter.cpp
+++ b/src/tipc_connecter.cpp
@@ -129,7 +129,7 @@ void zmq::tipc_connecter_t::out_event ()
}
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (fd, options, endpoint);
+ new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp
index 9c05f765..2cf1f5ca 100644
--- a/src/tipc_listener.cpp
+++ b/src/tipc_listener.cpp
@@ -98,7 +98,7 @@ void zmq::tipc_listener_t::in_event ()
// Create the engine object for this connection.
stream_engine_t *engine =
- new (std::nothrow) stream_engine_t (fd, options, endpoint);
+ new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp
index 9bf5a533..db93e9a6 100644
--- a/src/udp_engine.hpp
+++ b/src/udp_engine.hpp
@@ -22,6 +22,8 @@ class udp_engine_t : public io_object_t, public i_engine
int init (address_t *address_, bool send_, bool recv_);
+ bool has_handshake_stage () { return false; };
+
// i_engine interface implementation.
// Plug the engine to the session.
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
Impact
Users with TCP transport public endpoints, even with CURVE/ZAP enabled
Patches
#3913
#3973
4.3.1
4.2.5
4.2.1
4.1.4
4.0.5
Workarounds
No workarounds known
References
Found thanks to Google's oss-fuzz project:
https://oss-fuzz.com/testcase-detail/5707174518194176
For more information
If a raw TCP socket is opened and connected to an endpoint that is fully configured with CURVE/ZAP, legitimate clients will not be able to exchange any message. Handshakes complete successfully, and messages are delivered to the library, but the server application never receives them.
To reproduce simply run this test program - if the library is affected, it will deadlock and never terminate: