-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #63 from stax-labs/feat_less_dependencies
feat(sdk): remove warrant dependency to remove native dependencies
- Loading branch information
Showing
5 changed files
with
485 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,14 @@ | ||
black | ||
isort | ||
jsonschema | ||
pyjwt | ||
nose2 | ||
prance | ||
pycodestyle | ||
pylint | ||
pytest | ||
pytest-cov | ||
responses | ||
pyjwt==1.7.1 | ||
boto3 | ||
aws_requests_auth | ||
openapi-spec-validator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,6 @@ | |
"prance", | ||
"requests", | ||
"aws_requests_auth", | ||
"warrant", | ||
"pyjwt", | ||
"openapi-spec-validator", | ||
] | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
""" | ||
Copyright 2021 Brian Jinwright <opensource@capless.io> | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
Imported from https://github.com/capless/warrant to reduce external dependencies required by this library and just | ||
use the SRP functions. | ||
""" | ||
|
||
import base64 | ||
import binascii | ||
import datetime | ||
import hashlib | ||
import hmac | ||
import os | ||
import re | ||
|
||
import boto3 | ||
import six | ||
|
||
|
||
class WarrantException(Exception): | ||
"""Base class for all Warrant exceptions""" | ||
|
||
|
||
class ForceChangePasswordException(WarrantException): | ||
"""Raised when the user is forced to change their password""" | ||
|
||
|
||
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22 | ||
n_hex = ( | ||
"FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1" | ||
+ "29024E088A67CC74020BBEA63B139B22514A08798E3404DD" | ||
+ "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245" | ||
+ "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED" | ||
+ "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D" | ||
+ "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F" | ||
+ "83655D23DCA3AD961C62F356208552BB9ED529077096966D" | ||
+ "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B" | ||
+ "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9" | ||
+ "DE2BCBF6955817183995497CEA956AE515D2261898FA0510" | ||
+ "15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64" | ||
+ "ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7" | ||
+ "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B" | ||
+ "F12FFA06D98A0864D87602733EC86A64521F2B18177B200C" | ||
+ "BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31" | ||
+ "43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF" | ||
) | ||
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49 | ||
g_hex = "2" | ||
info_bits = bytearray("Caldera Derived Key", "utf-8") | ||
|
||
|
||
def hash_sha256(buf): | ||
"""AuthenticationHelper.hash""" | ||
a = hashlib.sha256(buf).hexdigest() | ||
return (64 - len(a)) * "0" + a | ||
|
||
|
||
def hex_hash(hex_string): | ||
return hash_sha256(bytearray.fromhex(hex_string)) | ||
|
||
|
||
def hex_to_long(hex_string): | ||
return int(hex_string, 16) | ||
|
||
|
||
def long_to_hex(long_num): | ||
return "%x" % long_num | ||
|
||
|
||
def get_random(nbytes): | ||
random_hex = binascii.hexlify(os.urandom(nbytes)) | ||
return hex_to_long(random_hex) | ||
|
||
|
||
def pad_hex(long_int): | ||
""" | ||
Converts a Long integer (or hex string) to hex format padded with zeroes for hashing | ||
:param {Long integer|String} long_int Number or string to pad. | ||
:return {String} Padded hex string. | ||
""" | ||
if not isinstance(long_int, six.string_types): | ||
hash_str = long_to_hex(long_int) | ||
else: | ||
hash_str = long_int | ||
if len(hash_str) % 2 == 1: | ||
hash_str = "0%s" % hash_str | ||
elif hash_str[0] in "89ABCDEFabcdef": | ||
hash_str = "00%s" % hash_str | ||
return hash_str | ||
|
||
|
||
def compute_hkdf(ikm, salt): | ||
""" | ||
Standard hkdf algorithm | ||
:param {Buffer} ikm Input key material. | ||
:param {Buffer} salt Salt value. | ||
:return {Buffer} Strong key material. | ||
@private | ||
""" | ||
prk = hmac.new(salt, ikm, hashlib.sha256).digest() | ||
info_bits_update = info_bits + bytearray(chr(1), "utf-8") | ||
hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest() | ||
return hmac_hash[:16] | ||
|
||
|
||
def calculate_u(big_a, big_b): | ||
""" | ||
Calculate the client's value U which is the hash of A and B | ||
:param {Long integer} big_a Large A value. | ||
:param {Long integer} big_b Server B value. | ||
:return {Long integer} Computed U value. | ||
""" | ||
u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b)) | ||
return hex_to_long(u_hex_hash) | ||
|
||
|
||
class AWSSRP(object): | ||
|
||
NEW_PASSWORD_REQUIRED_CHALLENGE = "NEW_PASSWORD_REQUIRED" | ||
PASSWORD_VERIFIER_CHALLENGE = "PASSWORD_VERIFIER" | ||
|
||
def __init__( | ||
self, | ||
username, | ||
password, | ||
pool_id, | ||
client_id, | ||
pool_region=None, | ||
client=None, | ||
client_secret=None, | ||
): | ||
if pool_region is not None and client is not None: | ||
raise ValueError( | ||
"pool_region and client should not both be specified " | ||
"(region should be passed to the boto3 client instead)" | ||
) | ||
|
||
self.username = username | ||
self.password = password | ||
self.pool_id = pool_id | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.client = ( | ||
client if client else boto3.client("cognito-idp", region_name=pool_region) | ||
) | ||
self.big_n = hex_to_long(n_hex) | ||
self.g = hex_to_long(g_hex) | ||
self.k = hex_to_long(hex_hash("00" + n_hex + "0" + g_hex)) | ||
self.small_a_value = self.generate_random_small_a() | ||
self.large_a_value = self.calculate_a() | ||
|
||
def generate_random_small_a(self): | ||
""" | ||
helper function to generate a random big integer | ||
:return {Long integer} a random value. | ||
""" | ||
random_long_int = get_random(128) | ||
return random_long_int % self.big_n | ||
|
||
def calculate_a(self): | ||
""" | ||
Calculate the client's public value A = g^a%N | ||
with the generated random number a | ||
:param {Long integer} a Randomly generated small A. | ||
:return {Long integer} Computed large A. | ||
""" | ||
big_a = pow(self.g, self.small_a_value, self.big_n) | ||
# safety check | ||
if (big_a % self.big_n) == 0: | ||
raise ValueError("Safety check for A failed") | ||
return big_a | ||
|
||
def get_password_authentication_key(self, username, password, server_b_value, salt): | ||
""" | ||
Calculates the final hkdf based on computed S value, and computed U value and the key | ||
:param {String} username Username. | ||
:param {String} password Password. | ||
:param {Long integer} server_b_value Server B value. | ||
:param {Long integer} salt Generated salt. | ||
:return {Buffer} Computed HKDF value. | ||
""" | ||
u_value = calculate_u(self.large_a_value, server_b_value) | ||
username_password = "%s%s:%s" % (self.pool_id.split("_")[1], username, password) | ||
username_password_hash = hash_sha256(username_password.encode("utf-8")) | ||
|
||
x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash)) | ||
g_mod_pow_xn = pow(self.g, x_value, self.big_n) | ||
int_value2 = server_b_value - self.k * g_mod_pow_xn | ||
s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n) | ||
hkdf = compute_hkdf( | ||
bytearray.fromhex(pad_hex(s_value)), | ||
bytearray.fromhex(pad_hex(long_to_hex(u_value))), | ||
) | ||
return hkdf | ||
|
||
def get_auth_params(self): | ||
auth_params = { | ||
"USERNAME": self.username, | ||
"SRP_A": long_to_hex(self.large_a_value), | ||
} | ||
if self.client_secret is not None: | ||
auth_params.update( | ||
{ | ||
"SECRET_HASH": self.get_secret_hash( | ||
self.username, self.client_id, self.client_secret | ||
) | ||
} | ||
) | ||
return auth_params | ||
|
||
@staticmethod | ||
def get_secret_hash(username, client_id, client_secret): | ||
message = bytearray(username + client_id, "utf-8") | ||
hmac_obj = hmac.new(bytearray(client_secret, "utf-8"), message, hashlib.sha256) | ||
return base64.standard_b64encode(hmac_obj.digest()).decode("utf-8") | ||
|
||
def process_challenge(self, challenge_parameters): | ||
user_id_for_srp = challenge_parameters["USER_ID_FOR_SRP"] | ||
salt_hex = challenge_parameters["SALT"] | ||
srp_b_hex = challenge_parameters["SRP_B"] | ||
secret_block_b64 = challenge_parameters["SECRET_BLOCK"] | ||
# re strips leading zero from a day number (required by AWS Cognito) | ||
timestamp = re.sub( | ||
r" 0(\d) ", | ||
r" \1 ", | ||
datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"), | ||
) | ||
hkdf = self.get_password_authentication_key( | ||
user_id_for_srp, self.password, hex_to_long(srp_b_hex), salt_hex | ||
) | ||
secret_block_bytes = base64.standard_b64decode(secret_block_b64) | ||
msg = ( | ||
bytearray(self.pool_id.split("_")[1], "utf-8") | ||
+ bytearray(user_id_for_srp, "utf-8") | ||
+ bytearray(secret_block_bytes) | ||
+ bytearray(timestamp, "utf-8") | ||
) | ||
hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) | ||
signature_string = base64.standard_b64encode(hmac_obj.digest()) | ||
response = { | ||
"TIMESTAMP": timestamp, | ||
"USERNAME": user_id_for_srp, | ||
"PASSWORD_CLAIM_SECRET_BLOCK": secret_block_b64, | ||
"PASSWORD_CLAIM_SIGNATURE": signature_string.decode("utf-8"), | ||
} | ||
if self.client_secret is not None: | ||
response.update( | ||
{ | ||
"SECRET_HASH": self.get_secret_hash( | ||
self.username, self.client_id, self.client_secret | ||
) | ||
} | ||
) | ||
return response | ||
|
||
def authenticate_user(self, client=None): | ||
boto_client = self.client or client | ||
auth_params = self.get_auth_params() | ||
response = boto_client.initiate_auth( | ||
AuthFlow="USER_SRP_AUTH", | ||
AuthParameters=auth_params, | ||
ClientId=self.client_id, | ||
) | ||
if response["ChallengeName"] == self.PASSWORD_VERIFIER_CHALLENGE: | ||
challenge_response = self.process_challenge(response["ChallengeParameters"]) | ||
tokens = boto_client.respond_to_auth_challenge( | ||
ClientId=self.client_id, | ||
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE, | ||
ChallengeResponses=challenge_response, | ||
) | ||
|
||
if tokens.get("ChallengeName") == self.NEW_PASSWORD_REQUIRED_CHALLENGE: | ||
raise ForceChangePasswordException( | ||
"Change password before authenticating" | ||
) | ||
|
||
return tokens | ||
else: | ||
raise NotImplementedError( | ||
"The %s challenge is not supported" % response["ChallengeName"] | ||
) |
Oops, something went wrong.