Skip to content

Commit

Permalink
Fixes 1136: add additional message integrity tests to the legacy adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Oct 17, 2023
1 parent 7aa8077 commit 22fb5dc
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 21 deletions.
58 changes: 54 additions & 4 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,10 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
conn->read_eos_seen = true;
break;
case QD_MESSAGE_STREAM_DATA_INVALID:
// Corrupted message, treat like EOS since there is no way to undo what has already been sent
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR,
"[C%" PRIu64 "] Invalid body data for streaming message", conn->conn_id);
"[C%" PRIu64 "] Invalid body data for streaming message, closing connection", conn->conn_id);
conn->read_eos_seen = true;
break;
default:
break;
Expand Down Expand Up @@ -1996,15 +1998,19 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(delivery));
return 0; // retry later
}
assert(depth_ok == QD_MESSAGE_DEPTH_OK); // otherwise bug in message encoding?
if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding?
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(delivery));
qd_message_set_send_complete(msg);
return PN_REJECTED;
}

// ISSUE-1136: check if the message format is correct. For this adaptor the content-type field must be
// unset. See comment in handle_incoming().
//
qd_iterator_t *ctype = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE);
if (ctype) {
qd_iterator_free(ctype);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpConnector (wrong version)",
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpConnector (wrong encapsulation)",
DLV_ARGS(delivery));
qd_message_set_send_complete(msg);
return PN_RELEASED; // allow it to be re-forwarded to a different adaptor
Expand All @@ -2028,6 +2034,50 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
} else if (!tc->out_dlv_stream) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " tcp_adaptor delivery arrived on non-egress dispatcher connection", DLV_ARGS(delivery));

if (tc->ingress) {
// Egress (connector-side) outgoing messages are validated when they arrive on the dispatcher link (see
// above). Ingress (client-side) outbound reply messages do not arrive on the dispatch link so these
// messages need to be validated here.

qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES);
if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " tcp_adaptor reply message incomplete, waiting for more", DLV_ARGS(delivery));
return 0; // retry later
}
if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding?
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(delivery));
qd_message_set_send_complete(msg);
return PN_REJECTED;
}

// ISSUE-1136: check if the message format is correct. For this adaptor the content-type field must be
// unset. See comment in handle_incoming().
//
qd_iterator_t *ctype = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE);
if (ctype) {
qd_iterator_free(ctype);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpListener (wrong outgoing encapsulation)",
DLV_ARGS(delivery));
qd_message_set_send_complete(msg);

// What to do? This is a reply message, so it cannot be re-delivered to another service.

if (tc->pn_raw_conn) {
// set the raw connection condition info so it will appear in the vanflow logs
// when the connection disconnects
pn_condition_t *cond = pn_raw_connection_condition(tc->pn_raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, "delivery-failed");
(void) pn_condition_set_description(cond, "invalid message encapsulation");
}
pn_raw_connection_close(tc->pn_raw_conn);
}
return PN_REJECTED;
}
}

tc->out_dlv_stream = delivery;
qdr_delivery_incref(delivery, "tcp_adaptor - new out_dlv_stream");
if (tc->ingress) {
Expand Down Expand Up @@ -2115,7 +2165,7 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t
pn_condition_t *cond = pn_raw_connection_condition(tc->pn_raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, "delivery-failed");
(void) pn_condition_set_description(cond, "destination unreachable");
(void) pn_condition_set_description(cond, (disp == PN_REJECTED) ? "invalid/corrupt message" : "destination unreachable");
}
pn_raw_connection_close(tc->pn_raw_conn);
}
Expand Down
177 changes: 160 additions & 17 deletions tests/system_tests_tcp_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from subprocess import STDOUT
from typing import List, Optional, Mapping, Tuple

from proton import Message
from proton import Message, Disposition
from proton.handlers import MessagingHandler
from proton.reactor import Container

Expand Down Expand Up @@ -2195,53 +2195,100 @@ def check_connection_deleted():
client_conn.close()


class TcpInvalidEncodingTest(TestCase):
class TcpLegacyInvalidEncodingTest(TestCase):
"""
Ensure that the TCP adaptor can recover from receiving an improperly
formatted/wrong version AMQP encoded stream message.
"""
@classmethod
def setUpClass(cls):
super(TcpInvalidEncodingTest, cls).setUpClass()
super(TcpLegacyInvalidEncodingTest, cls).setUpClass()

config = [
('router', {'mode': 'interior', 'id': 'TcpInvalidEncoding'}),
# Listener for handling router management requests.
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('tcpConnector', {'host': "localhost",
('tcpConnector', {'host': "127.0.0.1",
'port': cls.tester.get_port(),
'address': 'tcp-adaptor',
'address': 'tcp-connector',
'encapsulation': 'legacy',
'siteId': "mySite"}),
('tcpListener', {'host': "0.0.0.0",
'port': cls.tester.get_port(),
'address': 'tcp-listener',
'encapsulation': 'legacy',
'siteId': "mySite"}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

cls.router = cls.tester.qdrouterd('TcpInvalidEncoding',
Qdrouterd.Config(config), wait=True)
cls.address = cls.router.addresses[0]

def test_invalid_amqp_message(self):
cls.amqp_address = cls.router.addresses[0]
cls.listener_address = cls.router.tcp_addresses[0]

def test_invalid_egress_client_request_encaps(self):
"""
Send an AMQP message addressed to the TCP service via the amqp
listener. Set values in the AMQP header which will conflict with what
is expected by the adaptor. Verify the message is RELEASED and an an
error has been logged.
Simulate an invalid message arriving at the egress connector. Verify
the message is RELEASED and an error has been logged.
"""

# send a request message with an incompatible encapsulation

msg = Message()
msg.to = "tcp-adaptor"
msg.to = "tcp-connector"
msg.subject = "stuff"
msg.reply_to = "invalid/reply/to"
msg.content_type = "application/octet-stream"
test = SendAMQPMessage(msg, self.address, 'tcp-adaptor')
msg.content_type = "This-is-wrong"
test = InvalidClientSendRequest(msg, self.amqp_address, 'tcp-connector')
test.run()
self.assertIsNone(test.error)
self.router.wait_log_message(pattern=r"Misconfigured tcpConnector \(wrong version\)")
self.router.wait_log_message(pattern=r"Misconfigured tcpConnector \(wrong encapsulation\)")

def test_invalid_ingress_server_reply_encaps(self):
"""
Simulate an invalid reply message arriving at the ingress listener. Verify
the message is RELEASED and an error has been logged.
"""

# send a reply message with an incompatible encapsulation

class SendAMQPMessage(MessagingHandler):
msg = Message()
msg.subject = "Subject"
msg.annotations = {":flowid": "whatever"}
msg.content_type = "This-is-wrong"
test = InvalidServerSendReply(msg, self.amqp_address,
self.listener_address, 'tcp-listener',
Disposition.REJECTED)
test.run()
self.assertIsNone(test.error)
self.router.wait_log_message(pattern=r"Misconfigured tcpListener \(wrong outgoing encapsulation\)")

def test_invalid_ingress_server_reply_body(self):
"""
Simulate an invalid reply message arriving at the ingress listener. Verify
the message is RELEASED and an error has been logged.
"""

# send a reply message with an incompatible body format

msg = Message()
msg.subject = "Subject"
msg.annotations = {":flowid": "whatever"}
msg.body = "This is a STRING, NOT VBIN!"
test = InvalidServerSendReply(msg, self.amqp_address,
self.listener_address, 'tcp-listener',
Disposition.ACCEPTED)
test.run()
self.assertIsNone(test.error)
self.router.wait_log_message(pattern=r"Invalid body data for streaming message")


class InvalidClientSendRequest(MessagingHandler):
def __init__(self, msg, address, destination):
super(SendAMQPMessage, self).__init__(auto_settle=False)
super(InvalidClientSendRequest, self).__init__(auto_settle=False)
self.msg = msg
self.address = address
self.destination = destination
Expand Down Expand Up @@ -2283,6 +2330,102 @@ def run(self):
Container(self).run()


class InvalidServerSendReply(MessagingHandler):
def __init__(self, msg, server_address, listener_address, service_address, dispo):
super(InvalidServerSendReply, self).__init__(auto_settle=False)
self.msg = msg
self.service_address = service_address
self.error = None
self.timer = None
self.expected_dispo = dispo

# fake server connection, receive link for request, send link for reply-to
self.server_address = server_address
self.server_conn = None
self.server_sender = None
self.server_receiver = None
self.server_sent = False

# The request message that arrives at the "server" is streaming. Proton
# does not give us an "on_message" callback since it never
# completes. Wait long enough for the headers to arrive so we can
# extract the reply-to
self.request_dlv = None
self.dlv_drain_timer = None

# fack tcp client, just sends a request message
self.listener_address = listener_address
self.client_conn = None
self.client_sent = False

def done(self, error=None):
self.error = error
if self.timer:
self.timer.cancel()
self.server_conn.close()
if self.client_conn is not None:
self.client_conn.close()
if self.dlv_drain_timer:
self.dlv_drain_timer.cancel()

def timeout(self):
self.timer = None
self.done(error=f"Timeout Expired: sent={self.sent}")

def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.server_conn = event.container.connect(self.server_address)
self.server_receiver = event.container.create_receiver(self.server_conn,
self.service_address)

def on_timer_task(self, event):
# at this point we expect the reply-to header to have arrived
try:
data = self.server_receiver.recv(self.request_dlv.pending)
#print(f"len={len(xxx)}\nBODY=[{xxx}]", flush=True)
msg = Message()
msg.decode(data)
self.server_sender = event.container.create_sender(self.server_conn,
msg.reply_to)
except Exception as exc:
self.bail(error=f"Incomplete request msg headers {data}")

self.request_dlv.settle()

def on_delivery(self, event):
if event.receiver == self.server_receiver:
if self.request_dlv is None and event.delivery.readable:
# sleep a bit to allow all the header data to arrive on the
# delivery
self.request_dlv = event.delivery
self.dlv_drain_timer = event.reactor.schedule(1.0, self)

def on_link_opened(self, event):
if event.receiver == self.server_receiver:
# "server" ready to take requests, fire up the "client". All we
# need is to connect since that will activate the tcp adaptor
self.client_conn = event.container.connect(self.listener_address)

def on_sendable(self, event):
if event.sender == self.server_sender:
if not self.server_sent:
# send the invalid reply
self.server_sender.send(self.msg)
self.server_sent = True

def on_released(self, event):
self.done(None if self.expected_dispo == Disposition.RELEASED else "Unexpected PN_RELEASED")

def on_accepted(self, event):
self.done(None if self.expected_dispo == Disposition.ACCEPTED else "Unexpected PN_ACCEPTED")

def on_rejected(self, event):
self.done(None if self.expected_dispo == Disposition.REJECTED else "Unexpected PN_REJECTED")

def run(self):
Container(self).run()


class TcpAdaptorConnCounter(TestCase):
"""
Validate the TCP service connection counter
Expand Down

0 comments on commit 22fb5dc

Please sign in to comment.