Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Aug 14, 2024
1 parent 1b14a67 commit d177b5f
Showing 1 changed file with 91 additions and 27 deletions.
118 changes: 91 additions & 27 deletions tests/system_tests_tcp_adaptor_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
CA_CERT, CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD, \
SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD, SERVER_PRIVATE_KEY_NO_PASS, BAD_CA_CERT, \
CHAINED_CERT
from system_test import CA2_CERT
from system_test import CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD
from system_test import SERVER2_CERTIFICATE, SERVER2_PRIVATE_KEY, SERVER2_PRIVATE_KEY_PASSWORD
from system_test import SSL_PROFILE_TYPE
from system_test import is_pattern_present
from system_tests_ssl import RouterTestSslBase
from system_tests_tcp_adaptor import TcpAdaptorBase, CommonTcpTests, ncat_available
from http1_tests import wait_tcp_listeners_up
Expand Down Expand Up @@ -811,21 +815,22 @@ def setUpClass(cls):
wait_tcp_listeners_up(cls.router_qdra.addresses[0])

def test_ssl_profile_update(self):
# Note we do not want to update the certs held by the ssl server or
# client. We need them to be valid so the test client/server do not
# fail themselves. We only want the router to be affected!

"""
Test management updates to the listener and connector sslProfile
configurations
"""
server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD

openssl_server = self.tester.openssl_server
self.openssl_server = openssl_server(listening_port=self.openssl_server_listening_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer",
cl_args=['-Verify', '1'])
server_create = self.tester.openssl_server
openssl_server = server_create(listening_port=self.openssl_server_listening_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer",
cl_args=['-Verify', '1',
'-verify_return_error'])

client_ssl_info = dict()
client_ssl_info['CA_CERT'] = CA_CERT
Expand All @@ -835,11 +840,13 @@ def test_ssl_profile_update(self):

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Sanity Check the Configuration!")
self.assertIn(b"Verification: OK", out)
self.assertIn(b"Verify return code: 0 (ok)", out)
data=b"Sanity Check the Configuration!",
cl_args=['-verify', '10',
'-verify_return_error'])
self.assertIn(b"Verification: OK", out, f"{error}")
self.assertIn(b"Verify return code: 0 (ok)", out, f"{error}")

self.openssl_server.wait_out_message("Sanity Check the Configuration!")
openssl_server.wait_out_message("Sanity Check the Configuration!")

#
# Attempt to update the listener-side sslProfile with the wrong
Expand All @@ -848,52 +855,109 @@ def test_ssl_profile_update(self):
#

skmgr_a = self.router_qdra.sk_manager
l_id = skmgr_a.read(name='listener-ssl-profile')['identity']

with self.assertRaises(Exception) as emgr:
skmgr_a.update(SSL_PROFILE_TYPE, {'password': 'badpassword'}, identity=l_id)
skmgr_a.update(SSL_PROFILE_TYPE, {'password': 'badpassword'},
name='listener-ssl-profile')

self.assertIn('Failed to set TLS certFile', str(emgr.exception))

#
# Restore the proper password and verify clients can connect
#

skmgr_a.update(SSL_PROFILE_TYPE, {'password': SERVER_PRIVATE_KEY_PASSWORD}, identity=l_id)
skmgr_a.update(SSL_PROFILE_TYPE, {'password':
SERVER_PRIVATE_KEY_PASSWORD},
name='listener-ssl-profile')

out = skmgr_a.read(name='listener-ssl-profile')
self.assertEqual(SERVER_PRIVATE_KEY_PASSWORD, out['password'])

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Hey password is good!")
self.assertIn(b"Verification: OK", out)
self.assertIn(b"Verify return code: 0 (ok)", out)
data=b"Hey password is good!",
cl_args=['-verify', '10',
'-verify_return_error'])
self.assertIn(b"Verification: OK", out, f"{error}")
self.assertIn(b"Verify return code: 0 (ok)", out, f"{error}")

self.openssl_server.wait_out_message("Hey password is good!")
openssl_server.wait_out_message("Hey password is good!")

#
# Now update the sslProfile with a valid config, but one that will not
# allow the client to connect
#

skmgr_a.update(SSL_PROFILE_TYPE, {'caCertFile': BAD_CA_CERT}, identity=l_id)
skmgr_a.update(SSL_PROFILE_TYPE, {'caCertFile': BAD_CA_CERT},
name='listener-ssl-profile')

_, _ = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"The CA will not allow this!")
data=b"The CA will not allow this!",
cl_args=['-verify', '10',
'-verify_return_error'])
self.router_qdra.wait_log_message(r'TLS connection failed')

#
# Restore the sslProfile configuration and verify all is well
#

skmgr_a.update(SSL_PROFILE_TYPE, {'caCertFile': CA_CERT}, identity=l_id)
skmgr_a.update(SSL_PROFILE_TYPE, {'caCertFile': CA_CERT},
name='listener-ssl-profile')

out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"Hey we recovered!")
self.assertIn(b"Verification: OK", out)
self.assertIn(b"Verify return code: 0 (ok)", out)
data=b"Hey we recovered!",
cl_args=['-verify', '10',
'-verify_return_error'])
self.assertIn(b"Verification: OK", out, f"{error}")
self.assertIn(b"Verify return code: 0 (ok)", out, f"{error}")

openssl_server.wait_out_message("Hey we recovered!")

#
# Test updates on the connector sslProfile
#

# start a new ssl server that uses an incompatible TLS configuration

openssl_server.teardown()
server_ssl_info = dict()
server_ssl_info['CA_CERT'] = CA2_CERT
server_ssl_info['SERVER_CERTIFICATE'] = SERVER2_CERTIFICATE
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER2_PRIVATE_KEY
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER2_PRIVATE_KEY_PASSWORD
openssl_server = server_create(listening_port=self.openssl_server_listening_port,
ssl_info=server_ssl_info,
name="OpenSSLServerAuthPeer",
cl_args=['-Verify', '1',
'-verify_return_error'])

self.openssl_server.wait_out_message("Hey we recovered!")
_, _ = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"The server conn must fail",
cl_args=['-verify', '10',
'-verify_return_error'])
self.router_qdrb.wait_log_message(r'TLS connection failed')
with open(openssl_server.outfile_path, 'rt') as out_file:
self.assertFalse(is_pattern_present(out_file,
"The server conn must fail"),
"TLS connection did not fail")

# Now update the connectors sslProfile with a compatible client cert
# and verify a new connection succeeds

new_cfg = {'caCertFile': CA2_CERT,
'certFile': CLIENT2_CERTIFICATE,
'privateKeyFile': CLIENT2_PRIVATE_KEY,
'password': CLIENT2_PRIVATE_KEY_PASSWORD}
skmgr_b = self.router_qdrb.sk_manager
skmgr_b.update(SSL_PROFILE_TYPE, new_cfg, name='connector-ssl-profile')
out, error = self.opensslclient(port=self.router_listener_port,
ssl_info=client_ssl_info,
data=b"The server conn must succeed!",
cl_args=['-verify', '10',
'-verify_return_error'])
self.assertIn(b"Verification: OK", out, f"{error}")
self.assertIn(b"Verify return code: 0 (ok)", out, f"{error}")
openssl_server.wait_out_message("The server conn must succeed!")

0 comments on commit d177b5f

Please sign in to comment.