From ad774b583c70a7db396301701e41076b3ebad73f Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Mon, 25 Jan 2021 13:56:35 +1100 Subject: [PATCH] feat(sdk): remove warrant dependency to remove native dependencies The SDK currently uses a small part of the https://github.com/capless/warrant library to hand Secure Remote Password (http://srp.stanford.edu/) however this library also provides a number of utility functions for cognito which we don't use. This PR takes just the code we use from warrant and copies it into this SDK, with a header including reference to the library and it's author. This change enables customers to install the SDK on OSX and upload it to lambda and run without the need to recompile native deps in linux containers. fixes #60 --- requirements.txt | 5 +- setup.py | 2 - staxapp/auth.py | 2 +- staxapp/aws_srp.py | 293 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_srp.py | 187 +++++++++++++++++++++++++++++ 5 files changed, 485 insertions(+), 4 deletions(-) create mode 100644 staxapp/aws_srp.py create mode 100644 tests/test_srp.py diff --git a/requirements.txt b/requirements.txt index 23ef32f..98adb3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ black isort jsonschema -pyjwt nose2 prance pycodestyle @@ -9,3 +8,7 @@ pylint pytest pytest-cov responses +pyjwt==1.7.1 +boto3 +aws_requests_auth +openapi-spec-validator \ No newline at end of file diff --git a/setup.py b/setup.py index 34847cb..d091f49 100755 --- a/setup.py +++ b/setup.py @@ -7,8 +7,6 @@ "prance", "requests", "aws_requests_auth", - "warrant", - "pyjwt", "openapi-spec-validator", ] diff --git a/staxapp/auth.py b/staxapp/auth.py index 38d55c8..fc5b82c 100644 --- a/staxapp/auth.py +++ b/staxapp/auth.py @@ -6,8 +6,8 @@ from botocore import UNSIGNED from botocore.client import Config as BotoConfig from botocore.exceptions import ClientError -from warrant import AWSSRP +from staxapp.aws_srp import AWSSRP from staxapp.config import Config as StaxConfig from staxapp.exceptions import InvalidCredentialsException diff --git a/staxapp/aws_srp.py b/staxapp/aws_srp.py new file mode 100644 index 0000000..0bbc11a --- /dev/null +++ b/staxapp/aws_srp.py @@ -0,0 +1,293 @@ +""" + Copyright 2021 Brian Jinwright + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Imported from https://github.com/capless/warrant to reduce external dependencies required by this library and just + use the SRP functions. +""" + +import base64 +import binascii +import datetime +import hashlib +import hmac +import os +import re + +import boto3 +import six + + +class WarrantException(Exception): + """Base class for all Warrant exceptions""" + + +class ForceChangePasswordException(WarrantException): + """Raised when the user is forced to change their password""" + + +# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22 +n_hex = ( + "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1" + + "29024E088A67CC74020BBEA63B139B22514A08798E3404DD" + + "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245" + + "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED" + + "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D" + + "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F" + + "83655D23DCA3AD961C62F356208552BB9ED529077096966D" + + "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B" + + "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9" + + "DE2BCBF6955817183995497CEA956AE515D2261898FA0510" + + "15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64" + + "ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7" + + "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B" + + "F12FFA06D98A0864D87602733EC86A64521F2B18177B200C" + + "BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31" + + "43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF" +) +# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49 +g_hex = "2" +info_bits = bytearray("Caldera Derived Key", "utf-8") + + +def hash_sha256(buf): + """AuthenticationHelper.hash""" + a = hashlib.sha256(buf).hexdigest() + return (64 - len(a)) * "0" + a + + +def hex_hash(hex_string): + return hash_sha256(bytearray.fromhex(hex_string)) + + +def hex_to_long(hex_string): + return int(hex_string, 16) + + +def long_to_hex(long_num): + return "%x" % long_num + + +def get_random(nbytes): + random_hex = binascii.hexlify(os.urandom(nbytes)) + return hex_to_long(random_hex) + + +def pad_hex(long_int): + """ + Converts a Long integer (or hex string) to hex format padded with zeroes for hashing + :param {Long integer|String} long_int Number or string to pad. + :return {String} Padded hex string. + """ + if not isinstance(long_int, six.string_types): + hash_str = long_to_hex(long_int) + else: + hash_str = long_int + if len(hash_str) % 2 == 1: + hash_str = "0%s" % hash_str + elif hash_str[0] in "89ABCDEFabcdef": + hash_str = "00%s" % hash_str + return hash_str + + +def compute_hkdf(ikm, salt): + """ + Standard hkdf algorithm + :param {Buffer} ikm Input key material. + :param {Buffer} salt Salt value. + :return {Buffer} Strong key material. + @private + """ + prk = hmac.new(salt, ikm, hashlib.sha256).digest() + info_bits_update = info_bits + bytearray(chr(1), "utf-8") + hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest() + return hmac_hash[:16] + + +def calculate_u(big_a, big_b): + """ + Calculate the client's value U which is the hash of A and B + :param {Long integer} big_a Large A value. + :param {Long integer} big_b Server B value. + :return {Long integer} Computed U value. + """ + u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b)) + return hex_to_long(u_hex_hash) + + +class AWSSRP(object): + + NEW_PASSWORD_REQUIRED_CHALLENGE = "NEW_PASSWORD_REQUIRED" + PASSWORD_VERIFIER_CHALLENGE = "PASSWORD_VERIFIER" + + def __init__( + self, + username, + password, + pool_id, + client_id, + pool_region=None, + client=None, + client_secret=None, + ): + if pool_region is not None and client is not None: + raise ValueError( + "pool_region and client should not both be specified " + "(region should be passed to the boto3 client instead)" + ) + + self.username = username + self.password = password + self.pool_id = pool_id + self.client_id = client_id + self.client_secret = client_secret + self.client = ( + client if client else boto3.client("cognito-idp", region_name=pool_region) + ) + self.big_n = hex_to_long(n_hex) + self.g = hex_to_long(g_hex) + self.k = hex_to_long(hex_hash("00" + n_hex + "0" + g_hex)) + self.small_a_value = self.generate_random_small_a() + self.large_a_value = self.calculate_a() + + def generate_random_small_a(self): + """ + helper function to generate a random big integer + :return {Long integer} a random value. + """ + random_long_int = get_random(128) + return random_long_int % self.big_n + + def calculate_a(self): + """ + Calculate the client's public value A = g^a%N + with the generated random number a + :param {Long integer} a Randomly generated small A. + :return {Long integer} Computed large A. + """ + big_a = pow(self.g, self.small_a_value, self.big_n) + # safety check + if (big_a % self.big_n) == 0: + raise ValueError("Safety check for A failed") + return big_a + + def get_password_authentication_key(self, username, password, server_b_value, salt): + """ + Calculates the final hkdf based on computed S value, and computed U value and the key + :param {String} username Username. + :param {String} password Password. + :param {Long integer} server_b_value Server B value. + :param {Long integer} salt Generated salt. + :return {Buffer} Computed HKDF value. + """ + u_value = calculate_u(self.large_a_value, server_b_value) + username_password = "%s%s:%s" % (self.pool_id.split("_")[1], username, password) + username_password_hash = hash_sha256(username_password.encode("utf-8")) + + x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash)) + g_mod_pow_xn = pow(self.g, x_value, self.big_n) + int_value2 = server_b_value - self.k * g_mod_pow_xn + s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n) + hkdf = compute_hkdf( + bytearray.fromhex(pad_hex(s_value)), + bytearray.fromhex(pad_hex(long_to_hex(u_value))), + ) + return hkdf + + def get_auth_params(self): + auth_params = { + "USERNAME": self.username, + "SRP_A": long_to_hex(self.large_a_value), + } + if self.client_secret is not None: + auth_params.update( + { + "SECRET_HASH": self.get_secret_hash( + self.username, self.client_id, self.client_secret + ) + } + ) + return auth_params + + @staticmethod + def get_secret_hash(username, client_id, client_secret): + message = bytearray(username + client_id, "utf-8") + hmac_obj = hmac.new(bytearray(client_secret, "utf-8"), message, hashlib.sha256) + return base64.standard_b64encode(hmac_obj.digest()).decode("utf-8") + + def process_challenge(self, challenge_parameters): + user_id_for_srp = challenge_parameters["USER_ID_FOR_SRP"] + salt_hex = challenge_parameters["SALT"] + srp_b_hex = challenge_parameters["SRP_B"] + secret_block_b64 = challenge_parameters["SECRET_BLOCK"] + # re strips leading zero from a day number (required by AWS Cognito) + timestamp = re.sub( + r" 0(\d) ", + r" \1 ", + datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"), + ) + hkdf = self.get_password_authentication_key( + user_id_for_srp, self.password, hex_to_long(srp_b_hex), salt_hex + ) + secret_block_bytes = base64.standard_b64decode(secret_block_b64) + msg = ( + bytearray(self.pool_id.split("_")[1], "utf-8") + + bytearray(user_id_for_srp, "utf-8") + + bytearray(secret_block_bytes) + + bytearray(timestamp, "utf-8") + ) + hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) + signature_string = base64.standard_b64encode(hmac_obj.digest()) + response = { + "TIMESTAMP": timestamp, + "USERNAME": user_id_for_srp, + "PASSWORD_CLAIM_SECRET_BLOCK": secret_block_b64, + "PASSWORD_CLAIM_SIGNATURE": signature_string.decode("utf-8"), + } + if self.client_secret is not None: + response.update( + { + "SECRET_HASH": self.get_secret_hash( + self.username, self.client_id, self.client_secret + ) + } + ) + return response + + def authenticate_user(self, client=None): + boto_client = self.client or client + auth_params = self.get_auth_params() + response = boto_client.initiate_auth( + AuthFlow="USER_SRP_AUTH", + AuthParameters=auth_params, + ClientId=self.client_id, + ) + if response["ChallengeName"] == self.PASSWORD_VERIFIER_CHALLENGE: + challenge_response = self.process_challenge(response["ChallengeParameters"]) + tokens = boto_client.respond_to_auth_challenge( + ClientId=self.client_id, + ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE, + ChallengeResponses=challenge_response, + ) + + if tokens.get("ChallengeName") == self.NEW_PASSWORD_REQUIRED_CHALLENGE: + raise ForceChangePasswordException( + "Change password before authenticating" + ) + + return tokens + else: + raise NotImplementedError( + "The %s challenge is not supported" % response["ChallengeName"] + ) diff --git a/tests/test_srp.py b/tests/test_srp.py new file mode 100644 index 0000000..00ed25a --- /dev/null +++ b/tests/test_srp.py @@ -0,0 +1,187 @@ +import unittest + +from botocore.stub import Stubber +from unittest.mock import patch + +from staxapp.aws_srp import AWSSRP, ForceChangePasswordException + + +def _mock_authenticate_user(_, client=None): + return { + 'AuthenticationResult': { + 'TokenType': 'admin', + 'IdToken': 'dummy_token', + 'AccessToken': 'dummy_token', + 'RefreshToken': 'dummy_token' + } + } + + +def _mock_get_params(_): + return {'USERNAME': 'bob', 'SRP_A': 'srp'} + +class AWSSRPTestCase(unittest.TestCase): + + def setUp(self): + self.app_id = "test" + self.aws = AWSSRP(username="test", + password="test", + pool_region='us-east-1', + pool_id="test", + client_id="test", + client_secret="test") + + def tearDown(self): + del self.aws + + def test_pool_region_validation(self): + with self.assertRaises(ValueError): + AWSSRP(username="test", + password="test", + pool_region='us-east-1', + pool_id="test", + client_id="test", + client_secret="test", client="test") + + def test_calculate_a_safety_check(self): + self.aws = AWSSRP(username="test", + password="test", + pool_region='us-east-1', + pool_id="test", + client_id="test", + client_secret="test") + self.aws.big_n = 1 + self.aws.small_a_value = 1 + with self.assertRaises(ValueError): + self.aws.calculate_a() + + def test_calculate_auth_request_check(self): + self.aws = AWSSRP(username="test", + password="test", + pool_region='us-east-1', + pool_id="test", + client_id="test", + client_secret="test") + self.aws.get_auth_params() + + def test_get_password_authentication_key(self): + self.aws = AWSSRP(username="test", + password="test", + pool_region='us-east-1', + pool_id="test_test", + client_id="test", + client_secret="test") + self.large_a_value = 0 + self.aws.get_password_authentication_key("test", "test", 0, 1234567890) + + def test_process_challenge_check(self): + self.aws = AWSSRP(username="test", + password="test", + pool_region='us-east-1', + pool_id="test_test", + client_id="test", + client_secret="test") + resp = self.aws.process_challenge({'USER_ID_FOR_SRP': 'test', 'SALT': '16', 'SRP_B': '16', 'SECRET_BLOCK': 'c2VjcmV0c3NlY3Jlc3Rzc2VjcmV0cwo='}) + self.assertTrue('PASSWORD_CLAIM_SIGNATURE' in resp) + + def test_get_secret_hash(self): + result = AWSSRP.get_secret_hash("test", "test", "test") + self.assertIsNotNone(result) + + @patch('staxapp.aws_srp.AWSSRP.get_auth_params', _mock_get_params) + @patch('staxapp.aws_srp.AWSSRP.process_challenge', return_value={}) + def test_authenticate_user_password_change_challenge(self, _): + + stub = Stubber(self.aws.client) + + # By the stubber nature, we need to add the sequence + # of calls for the AWS SRP auth to test the whole process + stub.add_response(method='initiate_auth', + service_response={ + 'ChallengeName': 'PASSWORD_VERIFIER', + 'ChallengeParameters': {} + }, + expected_params={ + 'AuthFlow': 'USER_SRP_AUTH', + 'AuthParameters': _mock_get_params(None), + 'ClientId': self.app_id + }) + stub.add_response(method='respond_to_auth_challenge', + service_response={ + 'ChallengeName': 'NEW_PASSWORD_REQUIRED', + 'AuthenticationResult': {} + }, + expected_params={ + 'ClientId': self.app_id, + 'ChallengeName': 'PASSWORD_VERIFIER', + 'ChallengeResponses': {} + }) + with stub: + with self.assertRaises(ForceChangePasswordException): + self.aws.authenticate_user() + + @patch('staxapp.aws_srp.AWSSRP.get_auth_params', _mock_get_params) + @patch('staxapp.aws_srp.AWSSRP.process_challenge', return_value={}) + def test_authenticate_user_bad_challenge(self, _): + + stub = Stubber(self.aws.client) + + # By the stubber nature, we need to add the sequence + # of calls for the AWS SRP auth to test the whole process + stub.add_response(method='initiate_auth', + service_response={ + 'ChallengeName': 'BOO', + 'ChallengeParameters': {} + }, + expected_params={ + 'AuthFlow': 'USER_SRP_AUTH', + 'AuthParameters': _mock_get_params(None), + 'ClientId': self.app_id + }) + with stub: + with self.assertRaises(NotImplementedError): + self.aws.authenticate_user() + + @patch('staxapp.aws_srp.AWSSRP.get_auth_params', _mock_get_params) + @patch('staxapp.aws_srp.AWSSRP.process_challenge', return_value={}) + def test_authenticate_user(self, _): + + stub = Stubber(self.aws.client) + + # By the stubber nature, we need to add the sequence + # of calls for the AWS SRP auth to test the whole process + stub.add_response(method='initiate_auth', + service_response={ + 'ChallengeName': 'PASSWORD_VERIFIER', + 'ChallengeParameters': {} + }, + expected_params={ + 'AuthFlow': 'USER_SRP_AUTH', + 'AuthParameters': _mock_get_params(None), + 'ClientId': self.app_id + }) + + stub.add_response(method='respond_to_auth_challenge', + service_response={ + 'AuthenticationResult': { + 'IdToken': 'dummy_token', + 'AccessToken': 'dummy_token', + 'RefreshToken': 'dummy_token' + } + }, + expected_params={ + 'ClientId': self.app_id, + 'ChallengeName': 'PASSWORD_VERIFIER', + 'ChallengeResponses': {} + }) + + with stub: + tokens = self.aws.authenticate_user() + self.assertTrue('IdToken' in tokens['AuthenticationResult']) + self.assertTrue('AccessToken' in tokens['AuthenticationResult']) + self.assertTrue('RefreshToken' in tokens['AuthenticationResult']) + stub.assert_no_pending_responses() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file