-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathawscli_plugin_credential_mfa.py
389 lines (319 loc) · 14.6 KB
/
awscli_plugin_credential_mfa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
import datetime
import getpass
import json
import logging
from copy import deepcopy
from hashlib import sha1
from botocore.credentials import (CachedCredentialFetcher,
CanonicalNameCredentialSourcer,
CredentialProvider, Credentials,
DeferredRefreshableCredentials,
JSONFileCache, create_mfa_serial_refresher)
from botocore.exceptions import (CredentialRetrievalError,
InfiniteLoopConfigError, InvalidConfigError,
PartialCredentialsError, ProfileNotFound)
from dateutil.tz import tzlocal
__version__ = '1.0.1'
logger = logging.getLogger(__name__)
def awscli_initialize(event_handlers):
event_handlers.register(
'session-initialized',
inject_mfa_credential_provider,
unique_id='inject_mfa_credential_provider',
)
def inject_mfa_credential_provider(session, **kwargs):
try:
credential_provider = session.get_component('credential_provider')
except ProfileNotFound:
# If a user has provided a profile that does not exist,
# trying to retrieve components/config on the session
# will raise ProfileNotFound. Sometimes this is invalid:
#
# "ec2 describe-instances --profile unknown"
#
# and sometimes this is perfectly valid:
#
# "configure set region us-west-2 --profile brand-new-profile"
#
# Because we can't know (and don't want to know) whether
# the customer is trying to do something valid, we just
# immediately return. If it's invalid something else
# up the stack will raise ProfileNotFound, otherwise
# the configure (and other) commands will work as expected.
logger.debug('ProfileNotFound caught when trying to inject mfa credential provider. '
'Credential provider not configured.')
return
credential_provider.insert_after(
'env',
MfaCredentialProvider(
load_config=lambda: session.full_config,
client_creator=session.create_client,
cache=JSONFileCache(),
profile_name=session.get_config_variable('profile') or 'default',
credential_sourcer=CanonicalNameCredentialSourcer([
credential_provider.get_provider('env'),
credential_provider.get_provider('shared-credentials-file'),
credential_provider.get_provider('config-file'),
]),
),
)
def _local_now():
return datetime.datetime.now(tzlocal())
class MfaCredentialFetcher(CachedCredentialFetcher):
def __init__(self, client_creator, source_credentials, extra_args, mfa_prompter=None,
cache=None, expiry_window_seconds=60 * 15):
"""
:type client_creator: callable
:param client_creator: A callable that creates a client taking
arguments like ``Session.create_client``.
:type source_credentials: Credentials
:param source_credentials: The credentials to use to create the
client for the call to AssumeRole.
:type extra_args: dict
:param extra_args: Any additional arguments to add to the assume
role request using the format of the botocore operation.
Possible keys include, but may not be limited to,
DurationSeconds, Policy, SerialNumber, ExternalId and
RoleSessionName.
:type mfa_prompter: callable
:param mfa_prompter: A callable that returns input provided by the
user (i.e raw_input, getpass.getpass, etc.).
:type cache: dict
:param cache: An object that supports ``__getitem__``,
``__setitem__``, and ``__contains__``. An example of this is
the ``JSONFileCache`` class in aws-cli.
:type expiry_window_seconds: int
:param expiry_window_seconds: The amount of time, in seconds,
"""
self._client_creator = client_creator
self._source_credentials = source_credentials
if extra_args is None:
self._extra_args = {}
else:
self._extra_args = deepcopy(extra_args)
self._mfa_serial = self._extra_args.get('SerialNumber')
self._mfa_prompter = mfa_prompter
if self._mfa_prompter is None:
self._mfa_prompter = getpass.getpass
super(MfaCredentialFetcher, self).__init__(cache, expiry_window_seconds)
def _create_cache_key(self):
args = deepcopy(self._extra_args)
frozen_credentials = self._source_credentials.get_frozen_credentials()
args['AccessKeyId'] = frozen_credentials.access_key
args['SecretAccessKey'] = frozen_credentials.secret_key
args = json.dumps(args, sort_keys=True)
argument_hash = sha1(args.encode('utf-8')).hexdigest()
return self._make_file_safe(argument_hash)
def _get_credentials(self):
kwargs = self._get_session_token_kwargs()
client = self._create_client()
result = client.get_session_token(**kwargs)
return result
def _get_session_token_kwargs(self):
kwargs = self._extra_args
if self._mfa_serial is not None:
prompt = 'Enter MFA code for {mfa_serial}: '.format(mfa_serial=self._mfa_serial)
token_code = self._mfa_prompter(prompt)
kwargs['TokenCode'] = token_code
return kwargs
def _create_client(self):
frozen_credentials = self._source_credentials.get_frozen_credentials()
return self._client_creator(
'sts',
aws_access_key_id=frozen_credentials.access_key,
aws_secret_access_key=frozen_credentials.secret_key,
aws_session_token=frozen_credentials.token,
)
class MfaCredentialProvider(CredentialProvider):
METHOD = 'mfa-credential-provider'
CANONICAL_NAME = 'custom-MfaCredentials'
ACCESS_KEY = 'aws_access_key_id'
SECRET_KEY = 'aws_secret_access_key'
MFA_SERIAL = 'mfa_serial'
def __init__(self, load_config, client_creator, cache, profile_name, prompter=getpass.getpass,
credential_sourcer=None):
"""Initialize MfaCredentialProvider.
:type load_config: callable
:param load_config: A function that accepts no arguments, and
when called, will return the full configuration dictionary
for the session (``session.full_config``).
:type client_creator: callable
:param client_creator: A factory function that will create
a client when called. Has the same interface as
``botocore.session.Session.create_client``.
:type cache: dict
:param cache: An object that supports ``__getitem__``,
``__setitem__``, and ``__contains__``. An example
of this is the ``JSONFileCache`` class in the CLI.
:type profile_name: str
:param profile_name: The name of the profile.
:type prompter: callable
:param prompter: A callable that returns input provided
by the user (i.e raw_input, getpass.getpass, etc.).
:type credential_sourcer: CanonicalNameCredentialSourcer
:param credential_sourcer: A credential provider that takes a
configuration, which is used to provide the source credentials
for the STS call.
"""
self.cache = cache
self._load_config = load_config
self._client_creator = client_creator
self._profile_name = profile_name
self._prompter = prompter
self._loaded_config = {}
self._credential_sourcer = credential_sourcer
self._visited_profiles = [self._profile_name]
def load(self):
self._loaded_config = self._load_config()
profiles = self._loaded_config.get('profiles', {})
profile = profiles.get(self._profile_name, {})
if self._has_mfa_serial(profile):
return self._load_mfa_creds(self._profile_name)
def _has_mfa_serial(self, profile):
return self.MFA_SERIAL in profile
def _load_mfa_creds(self, profile_name):
mfa_config = self._get_mfa_config(profile_name)
source_credentials = self._resolve_source_credentials(mfa_config, profile_name)
mfa_serial = mfa_config.get('mfa_serial')
extra_args = {}
if mfa_serial is not None:
extra_args['SerialNumber'] = mfa_serial
fetcher = MfaCredentialFetcher(
client_creator=self._client_creator,
source_credentials=source_credentials,
extra_args=extra_args,
mfa_prompter=self._prompter,
cache=self.cache,
)
refresher = fetcher.fetch_credentials
if mfa_serial is not None:
refresher = create_mfa_serial_refresher(refresher)
return DeferredRefreshableCredentials(
method=self.METHOD,
refresh_using=refresher,
time_fetcher=_local_now,
)
def _get_mfa_config(self, profile_name):
profiles = self._loaded_config.get('profiles', {})
profile = profiles[profile_name]
source_profile = profile.get('source_profile')
credential_source = profile.get('credential_source')
mfa_config = {
'mfa_serial': profile.get('mfa_serial'),
'source_profile': source_profile,
'credential_source': credential_source,
}
if credential_source is not None and source_profile is not None:
raise InvalidConfigError(
error_msg=(
'The profile "{profile_name}" contains both source_profile and '
'credential_source.'.format(profile_name=profile_name)
),
)
elif credential_source is None and source_profile is None:
raise PartialCredentialsError(
provider=self.METHOD,
cred_var='source_profile or credential_source',
)
elif credential_source is not None:
self._validate_credential_source(profile_name, credential_source)
else:
self._validate_source_profile(profile_name, source_profile)
return mfa_config
def _validate_credential_source(self, profile_name, credential_source):
if self._credential_sourcer is None:
raise InvalidConfigError(
error_msg=(
'The credential source "{credential_source}" is specified in profile '
'"{profile_name}", but no source_provider was configured.'.format(
credential_source=credential_source,
profile_name=profile_name,
),
),
)
if not self._credential_sourcer.is_supported(credential_source):
raise InvalidConfigError(
error_msg=(
'The credential source "{credential_source}" referenced in profile '
'"{profile_name}" is not valid.'.format(
credential_source=credential_source,
profile_name=profile_name,
),
),
)
def _source_profile_has_credentials(self, profile):
return self._has_static_credentials(profile)
def _validate_source_profile(self, parent_profile_name, source_profile_name):
profiles = self._loaded_config.get('profiles', {})
if source_profile_name not in profiles:
raise InvalidConfigError(
error_msg=(
'The source_profile "{source_profile}" referenced in the profile '
'"{parent_profile}" does not exist.'.format(
source_profile=source_profile_name,
parent_profile=parent_profile_name,
),
),
)
source_profile = profiles[source_profile_name]
if not self._source_profile_has_credentials(source_profile):
raise InvalidConfigError(
error_msg=(
'The source_profile "{source_profile}" must specify static '
'credentials.'.format(
source_profile=source_profile_name,
),
),
)
if source_profile_name not in self._visited_profiles:
return
if source_profile_name != parent_profile_name:
raise InfiniteLoopConfigError(
source_profile=source_profile_name,
visited_profiles=self._visited_profiles,
)
if not self._has_static_credentials(source_profile):
raise InfiniteLoopConfigError(
source_profile=source_profile_name,
visited_profiles=self._visited_profiles,
)
def _has_static_credentials(self, profile):
static_keys = [self.ACCESS_KEY, self.SECRET_KEY]
return any(static_key in profile for static_key in static_keys)
def _resolve_source_credentials(self, mfa_config, profile_name):
credential_source = mfa_config.get('credential_source')
if credential_source is not None:
return self._resolve_credentials_from_source(credential_source, profile_name)
source_profile = mfa_config['source_profile']
self._visited_profiles.append(source_profile)
return self._resolve_credentials_from_profile(source_profile)
def _resolve_credentials_from_profile(self, profile_name):
profiles = self._loaded_config.get('profiles', {})
profile = profiles[profile_name]
if self._has_static_credentials(profile):
return self._resolve_static_credentials_from_profile(profile)
return self._load_mfa_creds(profile_name)
def _resolve_static_credentials_from_profile(self, profile):
try:
return Credentials(
access_key=profile['aws_access_key_id'],
secret_key=profile['aws_secret_access_key'],
token=profile.get('aws_session_token'),
)
except KeyError as e:
raise PartialCredentialsError(
provider=self.METHOD,
cred_var=str(e),
)
def _resolve_credentials_from_source(self, credential_source, profile_name):
credentials = self._credential_sourcer.source_credentials(credential_source)
if credentials is None:
raise CredentialRetrievalError(
provider=credential_source,
error_msg=(
'No credentials found in credential_source referenced '
'in profile "{profile_name}".'.format(
profile_name=profile_name,
),
),
)