Source code for stslib.vault

"""
Summary:
    The vault module contains classes definitions for various types of credentials
    generated by Amazon's Secure Token Service (STS):

    - STSToken:  type definition for Amazon STS session tokens
    - STSCredentials:  type definition for Amazon STS temporary credentials

Module Attributes:
    - logger: logging object

"""

import datetime
import inspect
from collections import namedtuple
from stslib import logd
from stslib.statics import defaults
from stslib._version import __version__


logger = logd.getLogger(__version__)


[docs]class STSToken(): """ structure for session tokens generated by Amazon STS """ def __init__(self, token=None): self.ring = {} if token is None: self.start = '' self.end = '' self.expiration = '' self.duration = datetime.timedelta(seconds=0) self.boto = {} return else: self.set(token)
[docs] def request_ring(self, index=None): """ token ring is a facility for managing multiple session tokens (FUTURE) """ if not self.token_ring: self.token_ring = [] elif index: return self.token_ring[index] return self.token_ring
[docs] def set(self, token): """ creates a new token object """ try: self.start = token['StartTime'] self.end = token['Expiration'] self.expiration = token['Expiration'].isoformat() self.default = defaults['token_life'] self.duration = token['Expiration'] - token['StartTime'] self.access_key = token['AccessKeyId'] self.secret_key = token['SecretAccessKey'] self.session = token['SessionToken'] self.boto = token #self.add(token) # FUTURE: add init token to ring except KeyError as e: logger.exception( '%s: failure on key [%s] during token initalization' % (inspect.stack()[0][3], str(e))) raise return self
[docs] def add(self, new_token, overwrite=True): """ adds a new token to the ring (FUTURE) """ # calc index if overwrite: key_ring = max(len(self.ring) - 1, 0) else: key_ring = len(self.ring) # add additional to keyring # add token to ring self.token_ring.append( { 'start': new_token['StartTime'], 'end': new_token['Expiration'], 'access_key': new_token['AccessKeyId'], 'secret_key': new_token['SecretAccessKey'], 'session': new_token['SessionToken'] } ) return self.token_ring[key_ring]
[docs]class STSCredentials(): """ structure for temporary credentials generated by Amazon STS """ def __init__(self, credentials): """ init empty credentials object with null values or add if generated """ self.credentials = {} self.boto = {} self.ring = {} if credentials: self.add(credentials) # v2.0, MULTIPLE CREDENTIAL SETS else: self.role_name = '' self.boto = {}
[docs] def request_ring(self, index=0): """ a facility for managing multiple session credentials gen at diff times """ return self.ring[index]
[docs] def add(self, new_credentials, token=None, overwrite=True): """ adds new credentials to the ring """ credentials = {} self.boto = new_credentials # retained copy of the native boto format try: if overwrite: for key, value in new_credentials.items(): self.credentials[key] = STSingleSet(new_credentials[key]) # self.credentials[key] = self.named_tuple(new_credentials[key]) else: # add creds to ring but accept duplicate credentials for key, value in new_credentials.items(): if key not in self.ring: credentials[key] = STSingleSet(new_credentials[key]) self.ring[credentials[key]] = credentials[key]['end'] else: # key (credentials) already exist # check if expired; if so, overwrite them, not, raise ex raise KeyError( '%s: credentials [%s] already exist. Cannot add to ring.' % (inspect.stack()[0][3], str(key))) except KeyError as e: logger.exception('%s: %s' % (inspect.stack()[0][3], str(e))) except Exception as e: logger.exception('%s: unknown Exception while adding credentials: %s' % (inspect.stack()[0][3], str(e))) return self.credentials
[docs] def named_tuple(self, single_set): """ converts boto credentials into namedtuple format """ try: start = single_set['StartTime'] end = single_set['Expiration'] expiration = single_set['Expiration'].isoformat() access_key = single_set['AccessKeyId'] secret_key = single_set['SecretAccessKey'] session = single_set['SessionToken'] boto = single_set container = namedtuple('container', [ 'start', 'end', 'expiration', 'access_key', 'secret_key', 'session', 'boto' ] ) ntuple = container( start, end, expiration, access_key, secret_key, session, boto ) except KeyError as e: logger.exception( '%s: failure on key [%s] during token initalization' % (inspect.stack()[0][3], str(e))) raise return ntuple
[docs] def index_credentials(self, creds): unknown = set(credentials.keys()) known = set(ring.keys()) if uknown not in known: self.add(credentials) # add init credentials to ring else: return self.ring[credentials]
[docs]class STSingleSet(): """ Class definition of single credential object representing temporary credentials for a single iam role """ def __init__(self, credential_set): self.set(credential_set)
[docs] def set(self, single_set): self.start = single_set['StartTime'] self.end = single_set['Expiration'] self.expiration = single_set['Expiration'].isoformat() self.default = defaults['token_life'] self.duration = single_set['Expiration'] - single_set['StartTime'] self.access_key = single_set['AccessKeyId'] self.secret_key = single_set['SecretAccessKey'] self.session = single_set['SessionToken'] self.boto = single_set return self