"""
Summary:
stslib generates temporary credentials used to assume roles across
many AWS accounts. It is commonly used for progammatic use cases where
avoiding a multi-factor auth prompt in a cli environment is desired
Module Attributes:
logger - logging object
Example Usage:
see https://bitbucket.org/blakeca00/stslib/overview
"""
import os
import json
from json import JSONDecodeError
import datetime
import inspect
import yaml
import pytz
import boto3
from botocore.exceptions import ClientError, ProfileNotFound
from stslib import logd
from stslib.refactor import parse_awscli
from stslib.vault import STSToken, STSCredentials
from stslib.async import TimeKeeper, convert_time
from stslib.statics import defaults, global_config
from stslib._version import __version__
logger = logd.getLogger(__version__)
[docs]class StsCore():
"""
Class definition, STS credentials library
"""
def __init__(self, **kwargs):
"""
Summary: initalization, attribute assignment
Args:
:role_file (arg, TYPE: str):
optional name of a json structured file located stslib config directory
in users home directory. File contains information about roles for which
you which to generate temporary credentials
:output_file (attr, TYPE: str):
optional non-default filename
:profile_name (param, TYPE: str):
user profile configured in local awscli config with permissions to
assume roles in target aws accounts
:profile_user (attr, TYPE: str):
Instance attr for profile_name when passed as a parameter
:log_mode (attr, TYPE: str):
Parameter designation for file or stream log output. If 'file', output
defaults to filesystem location denoted in statics module in the
global_config dict.
:format (attr, TYPE: str):
format of credentials, either boto (native) or vault (stslib default format)
:debug (attr, TYPE: bool):
optional debug flag (DEFAULT = False)
"""
# validate provided kwargs
keywords = ('role_file', 'output_file', 'profile_name',
'log_mode', 'format', 'debug')
if self.filter_args(kwargs, *keywords):
boto_profiles = kwargs.get('role_file', None)
stslib_profiles = kwargs.get('output_file', defaults['output_file'])
self.profile_user = kwargs.get('profile_name', defaults['profile_user'])
self.log_mode = kwargs.get('log_mode', global_config['log_mode'])
self.format = kwargs.get('format', defaults['format'])
self.debug_mode = kwargs.get('debug', False)
else:
return
# session attributes and objects
self.config_dir = defaults['config_path']
self.refactor(
input_file=boto_profiles,
output_file=stslib_profiles,
force_rewrite=False
)
self.session = self._session_init(self.profile_user)
# static attributes
self.sts_max = defaults['sts_max'] # minutes, 36 hours
self.sts_min = defaults['sts_min'] # minutes, 0.25 hours
# token attributes
self.token = {}
# credential attributes
self.credentials = {}
self.credential_default = defaults['credential_life']
self.credential_expiration = '' # datetime str ("%Y-%m-%d %H:%M:%S")
self.refresh_credentials = False # bool, recuring credential gen
try:
iam_client = self.session.client('iam') # client scope separation
sts_client = self.session.client('sts') # client scope separation
except Exception:
return
self.users = self.get_valid_users(iam_client)
self.iam_user = self._map_identity(self.profile_user, sts_client)
self.mfa_serial = self.get_mfa_info(self.iam_user, iam_client)
self.thread = None
[docs] def local_config(self):
""" override defaults in statics with local config values """
if os.path.exists(global_config['config_file']):
with open(global_config['config_file'], 'r') as stream:
try:
yml_object = yaml.load(stream)
except yaml.YAMLError as exc:
print(exc)
else:
self.log_mode = yml_object['LocalConfiguration']['log_mode'][0]
return
def _map_identity(self, user, client):
"""
retrieves iam user info for profiles in awscli config
Args:
:type user: string, local profile user
:param user: user from which default boto3 session object created
:type client: boto3.client
:param client: the sts client used
Returns:
:type iam_user: string
:param iam_user: AWS iam user mapped to profile user in local config
"""
try:
iam_user = client.get_caller_identity()['Arn'].split('/')[1]
logger.info(
'%s: profile_name mapped to iam_user: %s' %
(inspect.stack()[0][3], iam_user)
)
except ClientError as e:
logger.warning(
'%s: Inadequate User permissions (Code: %s Message: %s)' %
(inspect.stack()[0][3], e.response['Error']['Code'],
e.response['Error']['Message']))
raise str(e)
return iam_user
[docs] def parse_profiles(self, pre_name, post_name):
"""
Creates list of account profiles from local configuration file
Args:
:type string
:param pre_name: input file containing iam role credentials in
non-default location or fname
:type string
:param post_name: json file containing role profiles for which stslib will
generate temporary credentials. This file is generated by stslib and
will be located in the ~/.stslib directory. Format:
Returns:
:type: dictionary
:param profile_dict: list of aws account profile role names, role arns
.. code-block:: javascript
{
"AliceIAMUser": {
"aws_access_key_id": "AKIAIDYCI6Q4469WORVQ",
"aws_secret_access_key": "Wf2A0dx1ApMrEdljjkjteBmqqCdPB3Ng3kx/ow",
"mfa_serial": "arn:aws:iam::715400231659:mfa/AliceIAMUser"
},
"DynamoDBAccessRole": {
"role_arn": "arn:aws:iam::357115911622:role/DynamoDBFullAccess",
"mfa_serial": "arn:aws:iam::715400231659:mfa/AliceIAMUser",
"source_profile": "default"
},
"EC2AccessRole": {
...
}
}
"""
profile_file = self.config_dir + '/' + str(post_name)
try:
if os.path.exists(profile_file):
with open(profile_file) as f1:
profile_dict = json.load(f1)
else:
raise OSError(
'%s: Problem parsing local awscli credentials file: %s' %
(inspect.stack()[0][3], str(profile_file))
)
except JSONDecodeError as e:
logger.exception(
'%s: %s file not properly formed json. Error %s' %
(inspect.stack()[0][3], profile_file, str(e)))
raise
except Exception as e:
logger.exception(e)
raise
return profile_dict
[docs] def get_mfa_info(self, user, client):
"""
Summary:
Extracts the mfa_serial arn (soft token) or SerialNumber
(if hardware token assigned)
Args:
:type user: string
:param user: iam_user in local awscli profile. user may be a
profile name which is used exclusively in the awscli but does
not represent an actual iam name recorded in the Amazon Web
Services account.
Returns:
TYPE: string
"""
# query local for mfa info
if self.profile_user in self.profiles.keys():
if 'mfa_serial' in self.profiles[self.profile_user].keys():
mfa_id = self.profiles[self.profile_user]['mfa_serial']
else:
mfa_id = ''
else:
# query aws for mfa info
try:
response = client.list_mfa_devices(UserName=user)
if response['MFADevices']:
mfa_id = response['MFADevices'][0]['SerialNumber']
else:
mfa_id = ''
except ClientError:
mfa_id = '' # no mfa assigned to user
except Exception as e:
logger.exception(
'%s: Unknown error retrieving mfa device info. Error %s' %
(inspect.stack()[0][3], str(e)))
return str(e)
return mfa_id
[docs] def generate_session_token(self, **kwargs):
"""
Summary:
generates session token for use in gen temp credentials
Args:
lifetime (int): token lifetime duration in hours
mfa_code (str): 6 digit authorization code from a multi-factor (mfa)
authentication device
Returns:
session credentials | TYPE: dict
.. code-block:: javascript
{
'AccessKeyId': 'ASIAI6QV2U3JJAYRHCJQ',
'StartTime': datetime.datetime(2017, 8, 25, 20, 2, 37, tzinfo=tzutc()),
'Expiration': datetime.datetime(2017, 8, 25, 20, 5, 37, tzinfo=tzutc()),
'SecretAccessKey': 'MdjPAkXTHl12k64LSjmgTWMsmnHk4cJfeMHdXMLA',
'SessionToken': 'FQoDYXdzEDMaDHAaP2wi/+77fNJJryKvAdVZjYKk...zQU='
}
"""
# parse parameters
keywords = ('lifetime', 'mfa_code')
if self.filter_args(kwargs, *keywords):
lifetime = kwargs.get('lifetime', 1)
mfa_code = kwargs.get('mfa_code', '')
# now, timezone offset aware
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
token_life = datetime.timedelta(hours=int(lifetime))
mfa_code = str(mfa_code)
sts_client = self.session.client('sts')
try:
if (self.sts_min < token_life <= self.sts_max):
if self.mfa_serial:
response = sts_client.get_session_token(
DurationSeconds=token_life.seconds,
SerialNumber=self.mfa_serial,
TokenCode=mfa_code
)
else:
response = sts_client.get_session_token(
DurationSeconds=token_life.seconds
)
response['Credentials']['StartTime'] = now
# set token and related attributes
self.token = self._set_session_token(
token_object=response['Credentials']
)
else:
logger.warning(
'%s: Requested lifetime must be STS service limits (%s - %s hrs)'
% (inspect.stack()[0][3], self.sts_min, self.sts_max))
return {}
except ClientError as e:
logger.warning(
'%s: Exception gen session token with iam user %s (Code: %s Message: %s)' %
(inspect.stack()[0][3], self.iam_user, e.response['Error']['Code'],
e.response['Error']['Message'])
)
return {'Error': str(e)}
return self.token
[docs] def generate_credentials(self, accounts, token=None, strict=True):
"""
Summary:
generate temporary credentials for profiles
Args:
accounts: TYPE: list
List of account aliases or profile names from the local
awscli configuration in accounts to assume a role
strict: TYPE: list
Determines if strict membership checking is applied to
aliases found in accounts parameter list. if strict=True
(Default), then if 1 account profilename given in the accounts
list, all accounts will be rejected and no temporary credentials
are generated. If False, temporary credentials generated
for all profiles that are valid, only invalid profiles will
fail to generate credentials
Returns:
iam role temporary credentials | TYPE: Dict
.. code-block:: javascript
{
'sts-acme-gen-ra1-prod' : {
'AccessKeyId': 'ASIAI6QV2U3JJAYRHCJQ',
'Expiration': datetime.datetime(2017, 8, 25, 20, 5, 37, tzinfo=tzutc()),
'SecretAccessKey': 'MdjPAkXTHl12k64LSjmgTWMsmnHk4cJfeMHdXMLA',
'SessionToken': 'FQoDYXdzEDMaDHAaP2wi/+77fNJJryKvAdVZjYKk...zQU='
},
'sts-acme-gen-ra1-dev' : {
'AccessKeyId': 'ASIAI6QV2U3 ...',
}
}
"""
# prep, now - tz offset aware
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
session_token = token or self.token
prefix = global_config['credential_prefix'] + '-'
credentials = {}
try:
if self._valid_token(session_token):
# instantiate client
sts_client = self.session.client(
'sts',
aws_access_key_id=session_token.access_key,
aws_secret_access_key=session_token.secret_key,
aws_session_token=session_token.session
)
# one-time (non-threaded) creation of credentials
if self._validate(accounts, strict):
for alias in accounts:
response = sts_client.assume_role(
RoleArn=self.profiles[alias]['role_arn'],
DurationSeconds=self.credential_default.seconds,
RoleSessionName=prefix + alias
)
response['Credentials']['StartTime'] = now
credentials[prefix + alias] = response['Credentials']
if self.format == 'boto':
self.credentials = credentials
else:
self.credentials = STSCredentials(credentials).credentials
else:
return {} # validation fail
# branch, auto-refresh or one-time gen of credentials
if self.refresh_credentials:
if self.debug_mode:
logger.debug('token_duration = %s min, RefreshCount = %s' %
(convert_time(self.token.duration),
str(self.token.duration.seconds //3600))
)
# stop active thread if exists before generating new one
self._halt_thread()
self.thread = TimeKeeper(
roles=accounts,
event=self.generate_credentials,
RefreshCount=(self.token.duration.seconds // 3600),
debug=self.debug_mode
)
self.thread.start()
self.refresh_credentials = False
else:
logger.warning('No credentials generated, token is expired')
return {} # token expired
except KeyError as e:
if not strict:
logger.info('Strict checking %s, role invalid, skipped' % str(e))
else:
raise e
except ClientError as e:
logger.critical(
'%s: Assume role exception in account %s (Code: %s Message: %s)' %
(inspect.stack()[0][3], alias, e.response['Error']['Code'],
e.response['Error']['Message']))
return {}
return self.credentials
[docs] def current_credentials(self):
""" returns credentials when refreshed
Args:
:type self.credentials: dict
:param self.credentials: latest credentials generated and stored as class attribute
Returns:
Valid credentials | None if expired {}
"""
if self.credentials:
if self.calc_lifetime(credentials=self.credentials)[1].seconds > 0:
return self.credentials
else:
logger.info('credentials expired')
return {}
def _active_thread(self):
"""
Summary: determine thread status
Returns:
TYPE: Boolean | True = active, False = inactive thread
"""
if self.thread:
return self.thread.is_alive()
return False
def _halt_thread(self):
"""
Summary: Stop an active thread
Returns:
TYPE: Boolean | True = stopped, False = running
"""
try:
if self._active_thread():
# thread alive
logger.info('Stopping active thread [%s]' % str(self.thread.name))
self.thread.halt()
# msg thread status, alive or dead
logger.info('Status of thread [%s]: %s' %
(str(self.thread.name), str(self.thread.is_alive())
))
else:
logger.info('thread halt requested, but no active thread')
except Exception as e:
logger.critical(
'%s: Unknown error while attempting to halt thread. Error %s' %
(inspect.stack()[0][3], str(e)))
return False
return True
def _validate(self, list, check_bit):
"""
Summary:
validates parameter list is a subsset of profiles list object
Args:
TYPE: list
Returns:
TYPE: Boolean
"""
profile_aliases = []
for profile in self.profiles.keys():
profile_aliases.append(profile)
invalid = set(list) - set(profile_aliases)
valid = set(list) - set(invalid)
if set(list).issubset(set(profile_aliases)):
logger.info('%s: Valid account profile names: %s' %
(inspect.stack()[0][3], str(list)))
return True
elif check_bit:
# strict checking
logger.info('%s: Valid account profile names: %s' %
(inspect.stack()[0][3], str(valid)))
ex = Exception('%s: Invalid account profiles: %s' %
(inspect.stack()[0][3], set(invalid)))
logger.exception(ex)
return False
else:
# relaxed checking
logger.warning('%s: Valid profile names: %s, Invalid Names: %s' %
(inspect.stack()[0][3], str(valid)), str(invalid))
return True
def _valid_token(self, token=None):
"""
validate if session token active
Returns: TYPE Boolean | True = active, False = expired
"""
# now, timezone offset aware
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
if token is None:
logger.info('cannot generate credentials without a session token')
return False
elif now >= token.end:
logger.warning(
'%s: token expired on %s' %
(inspect.stack()[0][3], token['Expiration'].isoformat())
)
return False # expired
return True
def _session_init(self, user, region=''):
"""
Summary:
creates class-level boto3 session object which is shared across
StsCore method clients and resources
Args:
iam user (required) with adequate iam and sts permissions
Returns:
boto3 session object
"""
try:
if region:
session_init = boto3.Session(profile_name=user, region_name=region)
else:
session_init = boto3.Session(profile_name=user)
# FUTURE: support other local creds configs besides awscli; use real iam
# user to establish session, look up mfa_serial, etc before declaring fail
except ProfileNotFound:
return logger.warning(
'%s: iam user [%s] not found in local awscli configuration.' %
(inspect.stack()[0][3], user))
raise
except ClientError as e:
logger.critical(
'%s: Unable to establish session (Code: %s Message: %s)' %
(inspect.stack()[0][3], e.response['Error']['Code'],
e.response['Error']['Message'])
)
raise
return session_init
def _set_session_token(self, token_object):
"""
Summary:
update class attributes for newly generated session token
Args:
token_object (credentials object) | Amazon STS session token obj
Returns: (token, expiration) | TYPE: tuple
token (datetime obj) | session remaining lifetime (minutes)
expiration (datetime str) | token lifetime (minutes) datetime string
"""
drift = datetime.timedelta(minutes=3)
default_low = self.credential_default - drift
default_hi = self.credential_default + drift
# init STSToken obj
token = STSToken(token_object)
#token.duration = duration
if default_low < token.duration < default_hi:
self.refresh_credentials = False # enable async cred gen
elif token.duration >= self.credential_default - drift:
self.refresh_credentials = True # enable async cred gen
logger.debug('refresh_credentials class attribute set to: %s' %
str(self.refresh_credentials)
)
return token
def _set_credentials(self, c_object):
"""
Summary:
update class attributes for newly generated credentials
Args:
c_object (credentials object): Amazon STS temporary credentials obj
Returns:
tuple (token, expiration)
credentials (datetime obj) | session remaining lifetime (minutes)
expiration (datetime str) | remaining time credentials are valid
"""
try:
# set class attributes
expiration = c_object['Expiration'].isoformat()
except NameError:
return logger.warning('%s: there is no active session established' %
inspect.stack()[0][3])
return c_object, expiration
[docs] def calc_lifetime(self, credentials=None, human_readable=False):
""" Return remaining time on sts token, sts temporary credentials
Args:
:type credentials: STSCredentials object (if specified)
:param credentials: generated for which remaining life requested
:type self.token: STSToken object (if exists)
:param self.token: latest token generated
Returns:
tuple containing TYPE: datetime.timedelta objects (DEFAULT)
human_readable: returns tuple containing strings
.. code-block:: javascript
(
token_life_remaining,
credential_life_remaining
)
"""
try:
# now, timezone offset aware
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
credentials = self.credentials
keys = []
for key in credentials:
keys.append(key)
if self.format == 'boto':
credential_expiration = credentials[keys[0]]['Expiration']
else:
credential_expiration = credentials[keys[0]].end
if self.token:
if self.token.end >= now:
token_life_reamining = self.token.end - now
else:
token_life_reamining = datetime.timedelta(minutes=0)
else:
token_life_reamining = datetime.timedelta(minutes=0)
if credentials:
if credential_expiration >= now:
credential_life_remaining = credential_expiration - now
else:
credential_life_remaining = datetime.timedelta(minutes=0)
if human_readable:
token_life_reamining = convert_time(token_life_reamining)
credential_life_remaining = convert_time(credential_life_remaining)
except Exception as e:
raise
return (token_life_reamining, credential_life_remaining)
[docs] def get_valid_users(self, client):
"""
Summary:
Retrieve list valid iam users from local config
Arg:
iam client object
Returns:
TYPE list
"""
users = []
try:
users = [x['UserName'] for x in client.list_users()['Users']]
except ClientError as e:
logger.critical(
'%s: User not valid or permissions inadequate (Code: %s Message: %s)' %
(inspect.stack()[0][3], e.response['Error']['Code'],
e.response['Error']['Message']))
raise
return users
[docs] def filter_args(self, kwarg_dict, *args):
"""
Summary:
arg, kwarg validity test
Args:
kwarg_dict: kwargs passed in to calling method or func
args: valid keywords for the caller
Returns:
True if kwargs are valid; else raise exception
"""
# unpack if iterable passed in args - TBD (here)
if kwarg_dict is not None:
keys = [key for key in kwarg_dict]
unknown_arg = list(filter(lambda x: x not in args, keys))
if unknown_arg:
raise KeyError(
'%s: unknown parameter(s) provided [%s]' %
(inspect.stack()[0][3], str(unknown_arg))
)
return True
[docs] def refactor(self, input_file=defaults['default_awscli'],
output_file=defaults['output_file'], force_rewrite=True):
"""
Summary:
Refactors native awscli credentials file into a useable form.
Credentials file in the native format used by awscli is refactored
into a json file located in the stslib configuration directory
(typically ~/.stslib) in user's home.
refactor exists as a StsCore class method so that it refactoring
operations can be initiated on an ad hoc basis whenever credentails
are refreshed
Args:
input_file (str): pathname of awscli credentails file
output_file (str): name of json formatted output file, post awscli transformation
Returns:
TYPE Boolean | Success or Failure
"""
response = False
if not force_rewrite and os.path.exists(output_file):
# local awscli credentials already refactored
return True
else:
# first-time local awscli refactor or force refresh of existing stslib obj profiles
logger.info('%s: refactoring awscli credentials file' % inspect.stack()[0][3])
response = parse_awscli(parameter_input=input_file, parameter_output=output_file)
if response:
self.profiles = self.parse_profiles(pre_name=input_file, post_name=output_file)
return True
return False