"""
Summary:
The vault module contains classes definitions for various types of credentials
generated by Amazon's Secure Token Service (STS):
- STSToken: type definition for Amazon STS session tokens
- STSCredentials: type definition for Amazon STS temporary credentials
Module Attributes:
- logger: logging object
"""
import datetime
import inspect
from collections import namedtuple
from stslib import logd
from stslib.statics import defaults
from stslib._version import __version__
logger = logd.getLogger(__version__)
[docs]class STSToken():
"""
structure for session tokens generated by Amazon STS
"""
def __init__(self, token=None):
self.ring = {}
if token is None:
self.start = ''
self.end = ''
self.expiration = ''
self.duration = datetime.timedelta(seconds=0)
self.boto = {}
return
else:
self.set(token)
[docs] def request_ring(self, index=None):
"""
token ring is a facility for managing multiple session tokens (FUTURE)
"""
if not self.token_ring:
self.token_ring = []
elif index:
return self.token_ring[index]
return self.token_ring
[docs] def set(self, token):
""" creates a new token object """
try:
self.start = token['StartTime']
self.end = token['Expiration']
self.expiration = token['Expiration'].isoformat()
self.default = defaults['token_life']
self.duration = token['Expiration'] - token['StartTime']
self.access_key = token['AccessKeyId']
self.secret_key = token['SecretAccessKey']
self.session = token['SessionToken']
self.boto = token
#self.add(token) # FUTURE: add init token to ring
except KeyError as e:
logger.exception(
'%s: failure on key [%s] during token initalization' %
(inspect.stack()[0][3], str(e)))
raise
return self
[docs] def add(self, new_token, overwrite=True):
"""
adds a new token to the ring (FUTURE)
"""
# calc index
if overwrite:
key_ring = max(len(self.ring) - 1, 0)
else:
key_ring = len(self.ring) # add additional to keyring
# add token to ring
self.token_ring.append(
{
'start': new_token['StartTime'],
'end': new_token['Expiration'],
'access_key': new_token['AccessKeyId'],
'secret_key': new_token['SecretAccessKey'],
'session': new_token['SessionToken']
}
)
return self.token_ring[key_ring]
[docs]class STSCredentials():
"""
structure for temporary credentials generated by Amazon STS
"""
def __init__(self, credentials):
"""
init empty credentials object with null values or add if generated
"""
self.credentials = {}
self.boto = {}
self.ring = {}
if credentials:
self.add(credentials) # v2.0, MULTIPLE CREDENTIAL SETS
else:
self.role_name = ''
self.boto = {}
[docs] def request_ring(self, index=0):
"""
a facility for managing multiple session credentials gen at diff times
"""
return self.ring[index]
[docs] def add(self, new_credentials, token=None, overwrite=True):
""" adds new credentials to the ring """
credentials = {}
self.boto = new_credentials # retained copy of the native boto format
try:
if overwrite:
for key, value in new_credentials.items():
self.credentials[key] = STSingleSet(new_credentials[key])
# self.credentials[key] = self.named_tuple(new_credentials[key])
else:
# add creds to ring but accept duplicate credentials
for key, value in new_credentials.items():
if key not in self.ring:
credentials[key] = STSingleSet(new_credentials[key])
self.ring[credentials[key]] = credentials[key]['end']
else:
# key (credentials) already exist
# check if expired; if so, overwrite them, not, raise ex
raise KeyError(
'%s: credentials [%s] already exist. Cannot add to ring.' %
(inspect.stack()[0][3], str(key)))
except KeyError as e:
logger.exception('%s: %s' % (inspect.stack()[0][3], str(e)))
except Exception as e:
logger.exception('%s: unknown Exception while adding credentials: %s' %
(inspect.stack()[0][3], str(e)))
return self.credentials
[docs] def named_tuple(self, single_set):
""" converts boto credentials into namedtuple format """
try:
start = single_set['StartTime']
end = single_set['Expiration']
expiration = single_set['Expiration'].isoformat()
access_key = single_set['AccessKeyId']
secret_key = single_set['SecretAccessKey']
session = single_set['SessionToken']
boto = single_set
container = namedtuple('container',
[
'start', 'end', 'expiration',
'access_key', 'secret_key',
'session', 'boto'
]
)
ntuple = container(
start, end, expiration, access_key,
secret_key, session, boto
)
except KeyError as e:
logger.exception(
'%s: failure on key [%s] during token initalization' %
(inspect.stack()[0][3], str(e)))
raise
return ntuple
[docs] def index_credentials(self, creds):
unknown = set(credentials.keys())
known = set(ring.keys())
if uknown not in known:
self.add(credentials) # add init credentials to ring
else:
return self.ring[credentials]
[docs]class STSingleSet():
"""
Class definition of single credential object representing temporary
credentials for a single iam role
"""
def __init__(self, credential_set):
self.set(credential_set)
[docs] def set(self, single_set):
self.start = single_set['StartTime']
self.end = single_set['Expiration']
self.expiration = single_set['Expiration'].isoformat()
self.default = defaults['token_life']
self.duration = single_set['Expiration'] - single_set['StartTime']
self.access_key = single_set['AccessKeyId']
self.secret_key = single_set['SecretAccessKey']
self.session = single_set['SessionToken']
self.boto = single_set
return self