Module xcon.providers.secrets_manager
Expand source code
from __future__ import annotations
import dataclasses
import logging
import base64
from typing import Dict, Optional, Any, Mapping
from .common import handle_aws_exception
from ..directory import Directory, DirectoryListing, DirectoryOrPath, DirectoryItem, DirectoryChain
from botocore.exceptions import ClientError
from xcon.provider import AwsProvider, ProviderChain, InternalLocalProviderCache
from xboto import boto_clients
log = logging.getLogger(__name__)
@dataclasses.dataclass
class _LocalSecretsManagerCache:
directories: Dict[Directory, DirectoryListing] = dataclasses.field(default_factory=lambda: {})
available: Dict[Directory, DirectoryListing] = None
""" Items in here have None values, but it does list the dir/name of every item available
in the secrets manager. So if the item is in here, you know you can grab it's value.
"""
class SecretsManagerProvider(AwsProvider):
""" Allows you to use the AWS secrets manager. It works by using the
`secretsmanager:ListSecrets` aws permission to list all secrets it has access to.
This way it can discover quickly if it has a secret with a specific name or not.
This is important because it may be asked a lot due to the way we iterate though
all the directories/providers when finding the value for a config name.
It then uses `secretsmanager:GetSecretValue` as needed to get a specific secret value
when it's asked for, the first time. The secrets manager does not let us bulk-get
the secret values. So that's why we list them first and cache that. And then only
query for specific secrets if we know they exist.
## Things Left To Do
Keep in mind that the current implementation requires use of lower-case name-strings in the
last part of the path when writing a value into secrets manager service.
The directory path is case-sensitive, but the last part of the path
after the directory needs to be all-lower case.
We assume all keys in secrets manager will be lower-cased at the moment in the current
implementation below.
We could make this have like the other providers, where it's case insensitive lookup but it
can handle any actual case used for the name.
If we start using this, we would also probably want to write some unit-tests for
`SecretsManagerProvider`.
We may also want to support expiring config values from local-memory after a period
of time or restart our long-lives ECS services regularly (this would also force them
to reconnect to the database, and so perhaps is a simpler way to accomplish database
password rotation anyway).
However, since we are not currently not using `SecretsManagerProvider`, these are at
a low-priority to do right now.
"""
name = "secrets"
@property
def local_cache(self) -> _LocalSecretsManagerCache:
# Using default dict so I don't have to worry about allocating the dict's my self later.
maker = lambda c: _LocalSecretsManagerCache()
cacher = InternalLocalProviderCache.grab()
return cacher.get_cache_for_provider(provider=self, cache_constructor=maker)
def _available_names_for_directory(self) -> Dict[Directory, DirectoryListing]:
""" A dictionary with a mapping of directory to directory list.
The list will initially have None as the item values. THis indicates that we
know the secret exists in AWS but we just have not gotten the value yet.
As you update get the values and update the items in the mapped listing,
we will keep them as-is. We only retrieve the initial listing if it does not
exist. Otherwise, we will keep returning the cached list and won't change and
items/values you update in it.
"""
if self.local_cache.available is not None:
return self.local_cache.available
log.info("Getting full listing of available path/names in AWS Secrets Manager.")
dir_to_item_map = {}
try:
paginator = boto_clients.secretsmanager.get_paginator('list_secrets')
response = paginator.paginate()
for page in response:
for secret in page['SecretList']:
full_path: str = secret['Name']
split = full_path.split('/')
name = split.pop()
dir_path = '/'.join(split)
if not name:
log.warning(
f"Somehow got a false-looking name after splitting full-path "
f"({full_path}) that was retrieived from aws secrets manager."
)
continue
# Create a directory item with None as the value, so that
# we know it exists but we have not gotten the value yet.
# [secrets manager only has string values, so None is good enough for this].
item = DirectoryItem(
directory=dir_path,
# Just being paranoid, ensure it's a string.
name=str(name),
source=f"{self.name}-nameOnly"
)
directory = item.directory
dir_listing = dir_to_item_map.get(directory, None)
if not dir_listing:
dir_listing = DirectoryListing(directory=directory)
dir_to_item_map[directory] = dir_listing
dir_listing.add_item(item)
except Exception as e:
# Will either re-raise the exception or handle it for us.
# It will also communicate to us via marking the directory as error'd on us if needed.
handle_aws_exception(
exception=e, provider=self, directory=Directory(path="list_secrets")
)
self.local_cache.available = dir_to_item_map
for dir_listing in dir_to_item_map.values():
self.log_about_items(
items=dir_listing.item_mapping().values(),
path=dir_listing.directory.path,
msg_prefix="Retrieved only name"
)
return dir_to_item_map
def get_item(
self,
name: str,
directory: Optional[DirectoryOrPath],
directory_chain: DirectoryChain,
provider_chain: ProviderChain,
environ: Directory
) -> Optional[DirectoryItem]:
if directory is None:
return None
directory = Directory.from_path(directory)
listing = self.local_cache.directories.get(directory)
if listing:
item = listing.get_item(name)
if item:
return item if item.value is not None else None
available = self._available_names_for_directory()
available_listing = available.get(directory)
if not available_listing:
return None
# See if the item is available in the secrets manager.
# Consider caching the available names in secret manager in Dynamo or some such.
item = available_listing.get_item(name)
if not item:
return None
# Use original_name to grab the value from aws (to preserve original case of name).
item_path = f'{item.directory.path}/{item.original_name or item.name}'
secret = None
try:
log.info(f"Getting value at SecretsManagerProvider path ({item_path})")
item_value: Dict[str, Any] = boto_clients.secretsmanager.get_secret_value(
SecretId=item_path
)
secret = item_value.get('SecretString')
if secret is None:
binary_data = item_value.get('SecretBinary')
if binary_data is not None:
secret = base64.b64decode(binary_data)
except ClientError as e:
if not (e.response['Error']['Code'] == 'ResourceNotFoundException'):
handle_aws_exception(exception=e, provider=self, directory=directory)
listing = self.local_cache.directories.get(directory)
if not listing:
listing = DirectoryListing(directory=directory)
self.local_cache.directories[directory] = listing
item = DirectoryItem(
directory=directory, name=name, value=secret,
source=self.name
)
listing.add_item(item)
if item.value is None:
return None
return item
def retrieved_items_map(
self, directory: DirectoryOrPath
) -> Optional[Mapping[str, DirectoryItem]]:
directory = Directory.from_path(directory)
listing = self.local_cache.directories.get(directory)
if listing is None:
return None
return listing.item_mapping()
Classes
class SecretsManagerProvider
-
Allows you to use the AWS secrets manager. It works by using the
secretsmanager:ListSecrets
aws permission to list all secrets it has access to.This way it can discover quickly if it has a secret with a specific name or not. This is important because it may be asked a lot due to the way we iterate though all the directories/providers when finding the value for a config name.
It then uses
secretsmanager:GetSecretValue
as needed to get a specific secret value when it's asked for, the first time. The secrets manager does not let us bulk-get the secret values. So that's why we list them first and cache that. And then only query for specific secrets if we know they exist.Things Left To Do
Keep in mind that the current implementation requires use of lower-case name-strings in the last part of the path when writing a value into secrets manager service. The directory path is case-sensitive, but the last part of the path after the directory needs to be all-lower case.
We assume all keys in secrets manager will be lower-cased at the moment in the current implementation below.
We could make this have like the other providers, where it's case insensitive lookup but it can handle any actual case used for the name.
If we start using this, we would also probably want to write some unit-tests for
SecretsManagerProvider
.We may also want to support expiring config values from local-memory after a period of time or restart our long-lives ECS services regularly (this would also force them to reconnect to the database, and so perhaps is a simpler way to accomplish database password rotation anyway).
However, since we are not currently not using
SecretsManagerProvider
, these are at a low-priority to do right now.Expand source code
class SecretsManagerProvider(AwsProvider): """ Allows you to use the AWS secrets manager. It works by using the `secretsmanager:ListSecrets` aws permission to list all secrets it has access to. This way it can discover quickly if it has a secret with a specific name or not. This is important because it may be asked a lot due to the way we iterate though all the directories/providers when finding the value for a config name. It then uses `secretsmanager:GetSecretValue` as needed to get a specific secret value when it's asked for, the first time. The secrets manager does not let us bulk-get the secret values. So that's why we list them first and cache that. And then only query for specific secrets if we know they exist. ## Things Left To Do Keep in mind that the current implementation requires use of lower-case name-strings in the last part of the path when writing a value into secrets manager service. The directory path is case-sensitive, but the last part of the path after the directory needs to be all-lower case. We assume all keys in secrets manager will be lower-cased at the moment in the current implementation below. We could make this have like the other providers, where it's case insensitive lookup but it can handle any actual case used for the name. If we start using this, we would also probably want to write some unit-tests for `SecretsManagerProvider`. We may also want to support expiring config values from local-memory after a period of time or restart our long-lives ECS services regularly (this would also force them to reconnect to the database, and so perhaps is a simpler way to accomplish database password rotation anyway). However, since we are not currently not using `SecretsManagerProvider`, these are at a low-priority to do right now. """ name = "secrets" @property def local_cache(self) -> _LocalSecretsManagerCache: # Using default dict so I don't have to worry about allocating the dict's my self later. maker = lambda c: _LocalSecretsManagerCache() cacher = InternalLocalProviderCache.grab() return cacher.get_cache_for_provider(provider=self, cache_constructor=maker) def _available_names_for_directory(self) -> Dict[Directory, DirectoryListing]: """ A dictionary with a mapping of directory to directory list. The list will initially have None as the item values. THis indicates that we know the secret exists in AWS but we just have not gotten the value yet. As you update get the values and update the items in the mapped listing, we will keep them as-is. We only retrieve the initial listing if it does not exist. Otherwise, we will keep returning the cached list and won't change and items/values you update in it. """ if self.local_cache.available is not None: return self.local_cache.available log.info("Getting full listing of available path/names in AWS Secrets Manager.") dir_to_item_map = {} try: paginator = boto_clients.secretsmanager.get_paginator('list_secrets') response = paginator.paginate() for page in response: for secret in page['SecretList']: full_path: str = secret['Name'] split = full_path.split('/') name = split.pop() dir_path = '/'.join(split) if not name: log.warning( f"Somehow got a false-looking name after splitting full-path " f"({full_path}) that was retrieived from aws secrets manager." ) continue # Create a directory item with None as the value, so that # we know it exists but we have not gotten the value yet. # [secrets manager only has string values, so None is good enough for this]. item = DirectoryItem( directory=dir_path, # Just being paranoid, ensure it's a string. name=str(name), source=f"{self.name}-nameOnly" ) directory = item.directory dir_listing = dir_to_item_map.get(directory, None) if not dir_listing: dir_listing = DirectoryListing(directory=directory) dir_to_item_map[directory] = dir_listing dir_listing.add_item(item) except Exception as e: # Will either re-raise the exception or handle it for us. # It will also communicate to us via marking the directory as error'd on us if needed. handle_aws_exception( exception=e, provider=self, directory=Directory(path="list_secrets") ) self.local_cache.available = dir_to_item_map for dir_listing in dir_to_item_map.values(): self.log_about_items( items=dir_listing.item_mapping().values(), path=dir_listing.directory.path, msg_prefix="Retrieved only name" ) return dir_to_item_map def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> Optional[DirectoryItem]: if directory is None: return None directory = Directory.from_path(directory) listing = self.local_cache.directories.get(directory) if listing: item = listing.get_item(name) if item: return item if item.value is not None else None available = self._available_names_for_directory() available_listing = available.get(directory) if not available_listing: return None # See if the item is available in the secrets manager. # Consider caching the available names in secret manager in Dynamo or some such. item = available_listing.get_item(name) if not item: return None # Use original_name to grab the value from aws (to preserve original case of name). item_path = f'{item.directory.path}/{item.original_name or item.name}' secret = None try: log.info(f"Getting value at SecretsManagerProvider path ({item_path})") item_value: Dict[str, Any] = boto_clients.secretsmanager.get_secret_value( SecretId=item_path ) secret = item_value.get('SecretString') if secret is None: binary_data = item_value.get('SecretBinary') if binary_data is not None: secret = base64.b64decode(binary_data) except ClientError as e: if not (e.response['Error']['Code'] == 'ResourceNotFoundException'): handle_aws_exception(exception=e, provider=self, directory=directory) listing = self.local_cache.directories.get(directory) if not listing: listing = DirectoryListing(directory=directory) self.local_cache.directories[directory] = listing item = DirectoryItem( directory=directory, name=name, value=secret, source=self.name ) listing.add_item(item) if item.value is None: return None return item def retrieved_items_map( self, directory: DirectoryOrPath ) -> Optional[Mapping[str, DirectoryItem]]: directory = Directory.from_path(directory) listing = self.local_cache.directories.get(directory) if listing is None: return None return listing.item_mapping()
Ancestors
Class variables
var botocore_error_ignored_exception : botocore.exceptions.BotoCoreError
-
Inherited from:
AwsProvider
.botocore_error_ignored_exception
This means that any attempt to communicat with aws service will probably fail; probable due to a corrupted or missing aws credentials.
var is_cacher
-
Inherited from:
AwsProvider
.is_cacher
Easy way to figure out if a provider is a
ProviderCacher
or just a normal provider. Should be set toTrue
for provider subclasses that are … var name
-
Inherited from:
AwsProvider
.name
This is the value that will normally be set to the items
DirectoryItem.source
, also displayed when logging out the names of providers … var needs_directory
-
Inherited from:
AwsProvider
.needs_directory
By default, providers can't really use a
None
for a directory when callingget_item()
. If you CAN work with a None directory then set this to … var query_before_cache_if_possible
-
Inherited from:
AwsProvider
.query_before_cache_if_possible
If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll …
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
AwsProvider
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
AwsProvider
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
AwsProvider
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
AwsProvider
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var local_cache : xcon.providers.secrets_manager._LocalSecretsManagerCache
-
Expand source code
@property def local_cache(self) -> _LocalSecretsManagerCache: # Using default dict so I don't have to worry about allocating the dict's my self later. maker = lambda c: _LocalSecretsManagerCache() cacher = InternalLocalProviderCache.grab() return cacher.get_cache_for_provider(provider=self, cache_constructor=maker)
var obj : Self
-
Inherited from:
AwsProvider
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
AwsProvider
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
AwsProvider
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def directory_has_error(self, directory: Directory)
-
Inherited from:
AwsProvider
.directory_has_error
If a directory had an error in the past, this returns true. For informational purposes only.
def get_item(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory) ‑> Optional[DirectoryItem]
-
Inherited from:
AwsProvider
.get_item
Grabs a config value for name in directory …
Expand source code
def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> Optional[DirectoryItem]: if directory is None: return None directory = Directory.from_path(directory) listing = self.local_cache.directories.get(directory) if listing: item = listing.get_item(name) if item: return item if item.value is not None else None available = self._available_names_for_directory() available_listing = available.get(directory) if not available_listing: return None # See if the item is available in the secrets manager. # Consider caching the available names in secret manager in Dynamo or some such. item = available_listing.get_item(name) if not item: return None # Use original_name to grab the value from aws (to preserve original case of name). item_path = f'{item.directory.path}/{item.original_name or item.name}' secret = None try: log.info(f"Getting value at SecretsManagerProvider path ({item_path})") item_value: Dict[str, Any] = boto_clients.secretsmanager.get_secret_value( SecretId=item_path ) secret = item_value.get('SecretString') if secret is None: binary_data = item_value.get('SecretBinary') if binary_data is not None: secret = base64.b64decode(binary_data) except ClientError as e: if not (e.response['Error']['Code'] == 'ResourceNotFoundException'): handle_aws_exception(exception=e, provider=self, directory=directory) listing = self.local_cache.directories.get(directory) if not listing: listing = DirectoryListing(directory=directory) self.local_cache.directories[directory] = listing item = DirectoryItem( directory=directory, name=name, value=secret, source=self.name ) listing.add_item(item) if item.value is None: return None return item
def get_value(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory)
-
Inherited from:
AwsProvider
.get_value
Gets an item's value for directory from provider. Return None if not found.
def mark_errored_directory(self, directory: Directory)
-
Inherited from:
AwsProvider
.mark_errored_directory
If a directory has an error, this is called. For informational purposes only.
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Optional[Mapping[str, DirectoryItem]]
-
Inherited from:
AwsProvider
.retrieved_items_map
Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's
item_mapping()
…Expand source code
def retrieved_items_map( self, directory: DirectoryOrPath ) -> Optional[Mapping[str, DirectoryItem]]: directory = Directory.from_path(directory) listing = self.local_cache.directories.get(directory) if listing is None: return None return listing.item_mapping()