Module xcon.providers.dynamo
Expand source code
from __future__ import annotations
import dataclasses
import datetime as dt
import logging
import os
import random
from collections import defaultdict
from typing import Dict, Optional, Callable, Iterable, Sequence, Tuple, Any
from typing import Mapping
from boto3.dynamodb import conditions
from xboto.dependencies import BotoResources
from xinject import Dependency
from xsentinels import Default
from xloop import xloop
from xsentinels.default import DefaultType
from xcon.directory import Directory, DirectoryListing, DirectoryOrPath, DirectoryItem, \
DirectoryChain
from xcon.exceptions import ConfigError
from xcon.provider import ProviderCacher, ProviderChain, AwsProvider, \
InternalLocalProviderCache
from .common import handle_aws_exception
log = logging.getLogger(__name__)
class DynamoProvider(AwsProvider):
"""
Access a dynamo tabled called `global-all-config` when searching for a config value.
This provider allows one to have a structured list or dictionary. It supports JSON
and will parse/decode it when it gets it from Dynamo into a real Python dict/list/str/etc!
"""
name = "dynamo"
_directories: Dict[Directory, DirectoryListing]
@property
def _table(self) -> _ConfigDynamoTable:
# todo: make table name configurable
return _ConfigDynamoTable(table_name='global-all-config')
def get_item(
self,
name: str,
directory: Optional[DirectoryOrPath],
directory_chain: DirectoryChain,
provider_chain: ProviderChain,
environ: Directory
) -> Optional[DirectoryItem]:
if directory is None:
return None
directory = Directory.from_path(directory)
listing = self.local_cache.get(directory)
if listing:
return listing.get_item(name)
# We need to look up the directory listing from Dynamo.
items = []
try:
if self.botocore_error_ignored_exception:
# Raise same error we previously had, and handle it the same way
# for this new directory.
log.info(
f"We've already previously had a botocore error. Botocore error's [vs client"
f"errors] are generally related to something that will keep failing. "
f"Assuming we can't do anything with the service so bailing out early via "
f"the same previous exception; for directory {directory}."
)
raise self.botocore_error_ignored_exception from None
items = list(self._table.get_items_for_directory(directory=directory))
self.log_about_items(items=items, path=directory.path)
except Exception as e:
# Will either re-raise the exception or handle it for us.
handle_aws_exception(exception=e, provider=self, directory=directory)
listing = DirectoryListing(directory=directory, items=items)
self.local_cache[directory] = listing
return listing.get_item(name)
def retrieved_items_map(
self, directory: DirectoryOrPath
) -> Optional[Mapping[str, DirectoryItem]]:
directory = Directory.from_path(directory)
listing = self.local_cache.get(directory)
if listing is None:
return None
return listing.item_mapping()
class DynamoCacher(ProviderCacher):
""" Uses a Dynamo table called `global-all-configCache`.
Generally caches what configuration values we lookup into a dynamo table.
A good summary would be in the [Config Overview - Caching](../config.html#caching)
More details about the table struvture its self follows.
The table has two keys:
1. Hash key: Is the `environ` method parameter that gets passed to the methods on me.
It's normally a string in this format: `/{APP_NAME}/{APP_ENV}`.
Also, normally apps/services are only given access to one specific hash-key.
This hash-key should represent the app and its current environment.
2. Range/Sort key: Contains the variable name, providers and directory paths used
to lookup value. This makes it so the app can do various queries using various
different providers/directories and the cacher can cache those results correctly
and uniquely based on the var-name, providers and directories originally
used to get the value.
You don't need to parse the rage/sort-key.
All of its components are also separate attributes in the table on the row.
## Dependency Details
Right now the `DynamoCacher` is a `xinject.dependency.Dependency`
resource, you can grab the current one by calling `DynamoCacher.grab()`.
More specifically: we are a `xinject.dependency.Dependency`, which means that there is
normally only one of us around. See xinject library for more details.
"""
name = "cacher"
_ttl: dt.datetime
def retrieved_items_map(self, directory: DirectoryOrPath) -> Mapping[str, DirectoryItem]:
""" This is mostly useful for getting this to cache, so I am not going to implement it
in the cacher (if we ever do, the `xcon.provider.ProviderChain` will have to
figure out how to skip us).
"""
return {}
@property
def _table(self) -> _ConfigDynamoTable:
# todo: make table name configurable
table = _ConfigDynamoTable(table_name='global-all-configCache', cache_table=True)
table.append_source = " - via cacher"
return table
@dataclasses.dataclass
class _LocalCache:
listings: Dict[Directory, Dict[str, Dict[str, DirectoryListing]]] = dataclasses.field(
default_factory=(
lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(DirectoryListing)))
)
)
environ_to_items: Dict[Directory, Tuple[DirectoryItem]] = dataclasses.field(
default_factory=lambda: {}
)
@property
def local_cache(self) -> _LocalCache:
# Using default dict, so I don't have to worry about allocating the dict's my self later.
maker = lambda c: DynamoCacher._LocalCache()
cacher = InternalLocalProviderCache.grab()
return cacher.get_cache_for_provider(provider=self, cache_constructor=maker)
def __init__(self):
"""
## How to Clear Cache
If you need to change the config and have it propagate asap, you can easily
just delete all items in the dynamo cache table for cache_key for the service/env.
So for now, I am giving a long-life to the cached-items, since config changes to
existing items are rare [if we need a new item and it's not in cache it will be looked
up immediately and then cached].
I use a random number to try to help ensure we try not to have synchronous times on when
various things expire between various different services, to help spread load between
param store and secrets manager aws api's.
12 hours in the future with a random +/- 1500 seconds added on is what we currently do.
Thinking about making it a shorter period of time [a couple of hours].
"""
self._table.append_source = " - via cacher"
super().__init__()
self._ttl = dt.datetime.now(dt.timezone.utc) + dt.timedelta(
hours=12, seconds=random.randint(-1500, 1500)
)
def cache_items(
self,
items: Iterable[DirectoryItem],
provider_chain: ProviderChain,
directory_chain: DirectoryChain,
environ: Directory
):
""" Cache's passed in item, using the other params to create the proper range and hash
keys that we use in Dynamo to uniquely identify the item.
See `DynamoCacher.get_item` for more details on what the Args mean. It uses many of
the same ones.
For and overview of the caching process, see
[Config Overview - Caching](../config.html#caching)
"""
environ = self._get_environ_to_use(environ)
listing = self._get_listing(
directory_chain=directory_chain,
provider_chain=provider_chain,
environ=environ
)
items_to_send = []
for new_item in listing.get_items_with_different_value(items):
if new_item.cacheable:
ttl = self._ttl
# If the item has a ttl, we want to use that. It means it's a temporary value
# that should be looked up again after the expiration date.
if new_item.ttl:
# We should never get a ttl less then the current time; but if we do we will
# insert an item into cacher that will never be read by other processes
# [since the cacher will filter them out via a dynamo query-filter].
ttl = new_item.ttl
concat_dir_paths = directory_chain.concatenated_directory_paths
concat_provider_names = provider_chain.concatenated_provider_names
item_to_cache = DirectoryItem(
directory=new_item.directory,
name=new_item.name,
value=new_item.value,
source=f"{new_item.source} - {new_item.directory.path}",
ttl=ttl,
# DirectoryItem will calculate a cache_range_key for us with these two values:
cache_concat_directory_paths=concat_dir_paths,
cache_concat_provider_names=concat_provider_names,
cache_hash_key=environ.path,
)
items_to_send.append(item_to_cache)
listing.add_item(item_to_cache)
if not items_to_send:
return
self.log_about_items(
items=items_to_send,
path=environ.path,
msg_prefix="Sending to cache"
)
if self.directory_has_error(environ):
log.debug(
f"Not saving cached items to {environ}, it had an error reading/writing "
f"previously. See previous log messages [whenever the error happened for first "
f"time] for more details."
)
return
try:
self._table.put_items(items_to_send)
except Exception as e:
# Will either re-raise the exception or handle it for us.
# It will also communicate to us via marking the directory as error'd on us if needed.
handle_aws_exception(exception=e, provider=self, directory=environ)
def get_item(
self,
name: str,
directory: Optional[DirectoryOrPath],
directory_chain: DirectoryChain,
provider_chain: ProviderChain,
environ: Optional[Directory]
) -> Optional[DirectoryItem]:
"""
Returns item out of the cache. Cache-key is constructed using directory_chain,
provider_chain, environ and name.
For and overview of the caching process, see
[Config Overview - Caching](../config.html#caching)
Args:
name: Name of the config value, ie: `XYNAPI_BASE_URL`.
directory: Directory to lookup value in, this is not really used right now by
cacher. But it's used by the other providers. This cacher acts just like
a provider and so accepts the parameter.
directory_chain: Current directory chain that is being used to lookup value.
Used as part of the rang-key in the Dynamo table.
provider_chain (xcon.provider.ProviderChain): Current provider chain
that is being used to lookup value. Used as part of the rang-key in the Dynamo
table.
environ:
This is the directory the cacher uses for the hash-key. It's supposed to have
the full service and environment name.
Example Directory Path: `/hubspot/testing`
Returns:
xcon.directory.DirectoryItem: If we have a cached item, this is it.
None: Otherwise we return None indicating nothing has been cached.
"""
# Cache needs all of this stuff to do proper caching.
environ = self._get_environ_to_use(environ)
if not directory_chain or not environ:
return None
return self._get_listing(
directory_chain=directory_chain,
provider_chain=provider_chain,
environ=environ,
).get_item(name)
def _get_listing(
self,
directory_chain: DirectoryChain,
provider_chain: ProviderChain,
environ: Directory
) -> DirectoryListing:
dir_paths = directory_chain.concatenated_directory_paths
provider_names = provider_chain.concatenated_provider_names
# defaultdict will provide default versions of all the objects as-needed.
listing = self.local_cache.listings[environ][dir_paths][provider_names]
# If we have a directory assigned to object [defaults to None], then we know we
# have retrieved it in the past at some point, return it.
if listing.directory is not None:
return listing
listing.directory = environ
items = self._get_items_for_environ(environ=environ)
concat_directory_paths = directory_chain.concatenated_directory_paths
concat_provider_names = provider_chain.concatenated_provider_names
for item in items:
# find all items in the environ items that match my directory/provider lists.
# IF they do match, then it's safe to use the cached value.
if item.cache_concat_directory_paths != concat_directory_paths:
continue
if item.cache_concat_provider_names != concat_provider_names:
continue
listing.add_item(item)
return listing
def _get_items_for_environ(self, environ: Directory) -> Iterable[DirectoryItem]:
items = self.local_cache.environ_to_items.get(environ)
if items is not None:
return items
now = dt.datetime.now(dt.timezone.utc)
expire_time = now + dt.timedelta(seconds=random.randint(0, 60 * 60 * 2))
try:
items = self._table.get_items_for_directory(directory=environ, expire_time=expire_time)
# Ensure we have a list, and not a generator.
items = tuple(xloop(items, default_not_iterate=[str]))
# Log about stuff we retrieved from the cache table.
self.log_about_items(items=items, path=environ.path)
except Exception as e:
# Will either re-raise the exception or handle it for us.
handle_aws_exception(exception=e, directory=environ, provider=self)
items = tuple()
self.local_cache.environ_to_items[environ] = items
if not items:
return items
# ttl per-environ?
item_ttl = items[0]
if not item_ttl:
return items
# We want to try and have the cache expire at around the same time if possible;
# check item for it's ttl and use that.
item_ttl = item_ttl.ttl
current_ttl = self._ttl
future_limit = current_ttl + dt.timedelta(days=2)
now_limit = current_ttl + dt.timedelta(minutes=1)
if item_ttl and now_limit <= item_ttl <= future_limit:
self._ttl = item_ttl
return items
def _get_environ_to_use(
self, passed_in_environ: Optional[Directory] = None
) -> Optional[Directory]:
"""
Looks at environmental vars SERVICE_NAME and APP_ENV,
if they both exist and are not blank we will use that and return those
values inside an environ Directory object.
Otherwise, we will return the passed_in_environ, which should be the one
that came from Config and is based on that Config's Config.APP_ENV and Config.SERVICE_NAME.
"""
from xcon import xcon_settings
e_service = xcon_settings.service
e_env = xcon_settings.environment
if not e_service or not e_env:
return passed_in_environ
return Directory(service=e_service, env=e_env)
class DynamoDBResource(Dependency):
dynamodb_xboto_resource: BotoResources.DynamoDB | DefaultType = Default
_table_name_to_boto_resource: dict[str, Any]
def __init__(self, dynamodb_xboto_resource: BotoResources.DynamoDB | DefaultType = Default):
self.dynamodb_xboto_resource = dynamodb_xboto_resource
self._table_name_to_boto_resource = {}
def table_resource(self, table_name):
if resource := self._table_name_to_boto_resource.get(table_name):
return resource
if self.dynamodb_xboto_resource is Default:
dynamodb = BotoResources.grab().dynamodb
else:
dynamodb = self.dynamodb_xboto_resource.boto_resource
resource = dynamodb.Table(table_name)
self._table_name_to_boto_resource[table_name] = resource
return resource
# Most of this code could be shared from `xmodel_dynamo`, but we don't want to import that
# in this library (it's a bit heavy). So for now, we are duplicating some of that functionality
# for use here, in a much simpler (but WAY less feature-rich) way:
class _ConfigDynamoTable:
"""
Meant to be a simple abstract around dynamo table, just enough for our needs in this
`dynamo.py` module file...
After doing all the needed work for getting/updating items and so forth,
you should throw-away the `_ConfigDynamoTable` object and lazily create a new one next time a
call from the user comes in that needs the table.
This helps support dependency injection of the dynamodb boto3 resource via xinject
(always uses dependency when called, so it can be changed/injected by user).
"""
append_source = "dynamo"
@property
def table_name(self) -> str:
""" DynamoDB table name. """
return self._table_name
@property
def table(self):
""" DynamoDB table resource.
We lazily get the resource, so we don't have to verify/create it if not needed.
"""
return DynamoDBResource.grab().table_resource(self.table_name)
def __init__(
self,
table_name: str,
cache_table: bool = False
):
super().__init__()
self._table_name = table_name
self._table = None
self._verified_table_status = False
self._cache_table = cache_table
def put_item(self, item: DirectoryItem):
""" Put item into dynamo-table.
:param item:
Item to put in.
"""
resource = self._batch_writer
if not resource:
resource = self.table
resource.put_item(Item=item.json())
def put_items(self, items: Sequence[DirectoryItem]):
""" Uses a batch-writer to put the items.
WAY more efficient than doing it one at a time.
If you only give me one item, directly calls `put_item` without a batch-writer.
"""
if not items:
return
if len(items) == 1:
self.put_item(item=items[0])
return
with self._with_batch_writer():
for i in items:
self.put_item(item=i)
def delete_items(self, items: Iterable[DirectoryItem]):
# This is really only used with unit-tests, I am not going to try to batch-delete
# the items. Just doing to do it the slower/simpler way of one at a time.
# If we really need to make this faster for some reason look at how
# xyn-model-dynamo batch-deletes items.
table = self.table
for i in items:
table.delete_item(Key={
'app_key': i.cache_hash_key,
'name_key': i.cache_range_key
})
def get_items_for_directory(
self, directory: DirectoryOrPath, expire_time: dt.datetime = Default
) -> Iterable[DirectoryItem]:
"""
Gets all items for a particular directory.
:param directory:
:param expire_time:
Date to use to filter expired items by.
if Default:
By Default we calculate the current date/time and use that.
if None:
All items regardless of their expiration time will be returned. Keep in mind
that DynamoDB only guarantees an expired item will be deleted within 48 hours,
so it will only be returned if DynamoDB has not deleted the item yet.
If dt.datetime:
I'll use the provided datetime for the expiry time. Items will only be returned
if they don't have an expiration time, or if their expiration time is greater
than the provided date/time.
:return:
"""
dir_path = Directory.from_path(directory).path
expression = conditions.Key('app_key').eq(dir_path)
log.info(f"Getting Dynamo directory ({directory.path}).")
if expire_time is Default:
expire_time = dt.datetime.now(dt.timezone.utc)
filter_exp = None
if expire_time is not None:
ttl_attr = conditions.Attr('ttl')
filter_exp = ttl_attr.not_exists() | ttl_attr.gt(int(expire_time.timestamp()))
def response_creator(last_key: str):
query = {
# I think we are fine without a `ConsistentRead`, we rarely write/put things,
# And if it was out-of-date it would only be by a matter of seconds which really
# does not matter to us in this context.
#
# "ConsistentRead": True,
# Expression for the directory-partition we want.
"KeyConditionExpression": expression,
}
if filter_exp:
query["FilterExpression"] = filter_exp
if last_key:
query["ExclusiveStartKey"] = last_key
return self.table.query(**query)
return self._paginate_all_items_generator(response_creator)
def get_all_items(self) -> Iterable[DirectoryItem]:
def response_creator(last_key: str):
if last_key is None:
return self.table.scan()
return self.table.scan(ExclusiveStartKey=last_key)
return self._paginate_all_items_generator(response_creator)
def _with_batch_writer(self):
""" Uses a batch-writer to put the items.
WAY more efficient than doing it one at a time.
You can use a batch writer via a `with` and then create another batch writer
via `with_batch_writer()` and enter that one via `with` while the first one is
active without a problem. You MUST not use a `with` a second time with the same
batch-writer object [ie: with one one call to `with_batch_writer()`].
You need to use this in a `with` statement, like so:
```
table = HubspotContactSyncTable()
with table.with_batch_writer():
for item in items:
table.put_item(item)
```
Or you can use `put_items` and just give it a list of items, and it will do
this for you (create and use a batch writer).
"""
return self._BatchTable(self)
# ----------------------------
# --------- Private ----------
_table_name: str
_verified_table_status: bool
_batch_writer = None
def _paginate_all_items_generator(
self, response_creator: Callable[[Optional[str]], dict]
) -> Iterable[DirectoryItem]:
last_key: Optional[str] = None
append_source = self.append_source
while True:
response = response_creator(last_key)
last_key = response.get('LastEvaluatedKey', None)
db_datas = response['Items']
if not db_datas:
db_datas = []
for data in db_datas:
yield DirectoryItem.from_json(
json=data,
append_source=append_source,
from_cacher=self._cache_table
)
if not last_key:
return
class _BatchTable(object):
"""
Used by ``Table`` as the context manager for batch writes.
You likely don't want to try to use this object directly.
"""
table: _ConfigDynamoTable
_batch_writer = None
def __init__(self, table):
self.table = table
def __enter__(self):
if self._batch_writer is not None:
raise ConfigError(
"Must not use `with` multiple times with same dynamo batch writer object."
)
if self.table._batch_writer:
# Nothing to do if table already has a batch-writer.
return
batch_writer = self.table._create_batch_writer()
batch_writer.__enter__()
self.table._batch_writer = batch_writer
self._batch_writer = batch_writer
return self
def __exit__(self, type, value, traceback):
if not self._batch_writer:
return
# Only remove batch-writer if we were the one who set it originally.
self.table._batch_writer = None
self._batch_writer.__exit__(type, value, traceback)
def _create_batch_writer(self):
return self.table.batch_writer(overwrite_by_pkeys=['app_key', 'name_key'])
Classes
class DynamoCacher
-
Uses a Dynamo table called
global-all-configCache
.Generally caches what configuration values we lookup into a dynamo table. A good summary would be in the Config Overview - Caching
More details about the table struvture its self follows.
The table has two keys:
- Hash key: Is the
environ
method parameter that gets passed to the methods on me. It's normally a string in this format:/{APP_NAME}/{APP_ENV}
. Also, normally apps/services are only given access to one specific hash-key. This hash-key should represent the app and its current environment. - Range/Sort key: Contains the variable name, providers and directory paths used to lookup value. This makes it so the app can do various queries using various different providers/directories and the cacher can cache those results correctly and uniquely based on the var-name, providers and directories originally used to get the value.
You don't need to parse the rage/sort-key. All of its components are also separate attributes in the table on the row.
Dependency Details
Right now the
DynamoCacher
is aDependency
resource, you can grab the current one by callingDependency.grab()
.More specifically: we are a
Dependency
, which means that there is normally only one of us around. See xinject library for more details.How to Clear Cache
If you need to change the config and have it propagate asap, you can easily just delete all items in the dynamo cache table for cache_key for the service/env.
So for now, I am giving a long-life to the cached-items, since config changes to existing items are rare [if we need a new item and it's not in cache it will be looked up immediately and then cached].
I use a random number to try to help ensure we try not to have synchronous times on when various things expire between various different services, to help spread load between param store and secrets manager aws api's.
12 hours in the future with a random +/- 1500 seconds added on is what we currently do. Thinking about making it a shorter period of time [a couple of hours].
Expand source code
class DynamoCacher(ProviderCacher): """ Uses a Dynamo table called `global-all-configCache`. Generally caches what configuration values we lookup into a dynamo table. A good summary would be in the [Config Overview - Caching](../config.html#caching) More details about the table struvture its self follows. The table has two keys: 1. Hash key: Is the `environ` method parameter that gets passed to the methods on me. It's normally a string in this format: `/{APP_NAME}/{APP_ENV}`. Also, normally apps/services are only given access to one specific hash-key. This hash-key should represent the app and its current environment. 2. Range/Sort key: Contains the variable name, providers and directory paths used to lookup value. This makes it so the app can do various queries using various different providers/directories and the cacher can cache those results correctly and uniquely based on the var-name, providers and directories originally used to get the value. You don't need to parse the rage/sort-key. All of its components are also separate attributes in the table on the row. ## Dependency Details Right now the `DynamoCacher` is a `xinject.dependency.Dependency` resource, you can grab the current one by calling `DynamoCacher.grab()`. More specifically: we are a `xinject.dependency.Dependency`, which means that there is normally only one of us around. See xinject library for more details. """ name = "cacher" _ttl: dt.datetime def retrieved_items_map(self, directory: DirectoryOrPath) -> Mapping[str, DirectoryItem]: """ This is mostly useful for getting this to cache, so I am not going to implement it in the cacher (if we ever do, the `xcon.provider.ProviderChain` will have to figure out how to skip us). """ return {} @property def _table(self) -> _ConfigDynamoTable: # todo: make table name configurable table = _ConfigDynamoTable(table_name='global-all-configCache', cache_table=True) table.append_source = " - via cacher" return table @dataclasses.dataclass class _LocalCache: listings: Dict[Directory, Dict[str, Dict[str, DirectoryListing]]] = dataclasses.field( default_factory=( lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(DirectoryListing))) ) ) environ_to_items: Dict[Directory, Tuple[DirectoryItem]] = dataclasses.field( default_factory=lambda: {} ) @property def local_cache(self) -> _LocalCache: # Using default dict, so I don't have to worry about allocating the dict's my self later. maker = lambda c: DynamoCacher._LocalCache() cacher = InternalLocalProviderCache.grab() return cacher.get_cache_for_provider(provider=self, cache_constructor=maker) def __init__(self): """ ## How to Clear Cache If you need to change the config and have it propagate asap, you can easily just delete all items in the dynamo cache table for cache_key for the service/env. So for now, I am giving a long-life to the cached-items, since config changes to existing items are rare [if we need a new item and it's not in cache it will be looked up immediately and then cached]. I use a random number to try to help ensure we try not to have synchronous times on when various things expire between various different services, to help spread load between param store and secrets manager aws api's. 12 hours in the future with a random +/- 1500 seconds added on is what we currently do. Thinking about making it a shorter period of time [a couple of hours]. """ self._table.append_source = " - via cacher" super().__init__() self._ttl = dt.datetime.now(dt.timezone.utc) + dt.timedelta( hours=12, seconds=random.randint(-1500, 1500) ) def cache_items( self, items: Iterable[DirectoryItem], provider_chain: ProviderChain, directory_chain: DirectoryChain, environ: Directory ): """ Cache's passed in item, using the other params to create the proper range and hash keys that we use in Dynamo to uniquely identify the item. See `DynamoCacher.get_item` for more details on what the Args mean. It uses many of the same ones. For and overview of the caching process, see [Config Overview - Caching](../config.html#caching) """ environ = self._get_environ_to_use(environ) listing = self._get_listing( directory_chain=directory_chain, provider_chain=provider_chain, environ=environ ) items_to_send = [] for new_item in listing.get_items_with_different_value(items): if new_item.cacheable: ttl = self._ttl # If the item has a ttl, we want to use that. It means it's a temporary value # that should be looked up again after the expiration date. if new_item.ttl: # We should never get a ttl less then the current time; but if we do we will # insert an item into cacher that will never be read by other processes # [since the cacher will filter them out via a dynamo query-filter]. ttl = new_item.ttl concat_dir_paths = directory_chain.concatenated_directory_paths concat_provider_names = provider_chain.concatenated_provider_names item_to_cache = DirectoryItem( directory=new_item.directory, name=new_item.name, value=new_item.value, source=f"{new_item.source} - {new_item.directory.path}", ttl=ttl, # DirectoryItem will calculate a cache_range_key for us with these two values: cache_concat_directory_paths=concat_dir_paths, cache_concat_provider_names=concat_provider_names, cache_hash_key=environ.path, ) items_to_send.append(item_to_cache) listing.add_item(item_to_cache) if not items_to_send: return self.log_about_items( items=items_to_send, path=environ.path, msg_prefix="Sending to cache" ) if self.directory_has_error(environ): log.debug( f"Not saving cached items to {environ}, it had an error reading/writing " f"previously. See previous log messages [whenever the error happened for first " f"time] for more details." ) return try: self._table.put_items(items_to_send) except Exception as e: # Will either re-raise the exception or handle it for us. # It will also communicate to us via marking the directory as error'd on us if needed. handle_aws_exception(exception=e, provider=self, directory=environ) def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Optional[Directory] ) -> Optional[DirectoryItem]: """ Returns item out of the cache. Cache-key is constructed using directory_chain, provider_chain, environ and name. For and overview of the caching process, see [Config Overview - Caching](../config.html#caching) Args: name: Name of the config value, ie: `XYNAPI_BASE_URL`. directory: Directory to lookup value in, this is not really used right now by cacher. But it's used by the other providers. This cacher acts just like a provider and so accepts the parameter. directory_chain: Current directory chain that is being used to lookup value. Used as part of the rang-key in the Dynamo table. provider_chain (xcon.provider.ProviderChain): Current provider chain that is being used to lookup value. Used as part of the rang-key in the Dynamo table. environ: This is the directory the cacher uses for the hash-key. It's supposed to have the full service and environment name. Example Directory Path: `/hubspot/testing` Returns: xcon.directory.DirectoryItem: If we have a cached item, this is it. None: Otherwise we return None indicating nothing has been cached. """ # Cache needs all of this stuff to do proper caching. environ = self._get_environ_to_use(environ) if not directory_chain or not environ: return None return self._get_listing( directory_chain=directory_chain, provider_chain=provider_chain, environ=environ, ).get_item(name) def _get_listing( self, directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> DirectoryListing: dir_paths = directory_chain.concatenated_directory_paths provider_names = provider_chain.concatenated_provider_names # defaultdict will provide default versions of all the objects as-needed. listing = self.local_cache.listings[environ][dir_paths][provider_names] # If we have a directory assigned to object [defaults to None], then we know we # have retrieved it in the past at some point, return it. if listing.directory is not None: return listing listing.directory = environ items = self._get_items_for_environ(environ=environ) concat_directory_paths = directory_chain.concatenated_directory_paths concat_provider_names = provider_chain.concatenated_provider_names for item in items: # find all items in the environ items that match my directory/provider lists. # IF they do match, then it's safe to use the cached value. if item.cache_concat_directory_paths != concat_directory_paths: continue if item.cache_concat_provider_names != concat_provider_names: continue listing.add_item(item) return listing def _get_items_for_environ(self, environ: Directory) -> Iterable[DirectoryItem]: items = self.local_cache.environ_to_items.get(environ) if items is not None: return items now = dt.datetime.now(dt.timezone.utc) expire_time = now + dt.timedelta(seconds=random.randint(0, 60 * 60 * 2)) try: items = self._table.get_items_for_directory(directory=environ, expire_time=expire_time) # Ensure we have a list, and not a generator. items = tuple(xloop(items, default_not_iterate=[str])) # Log about stuff we retrieved from the cache table. self.log_about_items(items=items, path=environ.path) except Exception as e: # Will either re-raise the exception or handle it for us. handle_aws_exception(exception=e, directory=environ, provider=self) items = tuple() self.local_cache.environ_to_items[environ] = items if not items: return items # ttl per-environ? item_ttl = items[0] if not item_ttl: return items # We want to try and have the cache expire at around the same time if possible; # check item for it's ttl and use that. item_ttl = item_ttl.ttl current_ttl = self._ttl future_limit = current_ttl + dt.timedelta(days=2) now_limit = current_ttl + dt.timedelta(minutes=1) if item_ttl and now_limit <= item_ttl <= future_limit: self._ttl = item_ttl return items def _get_environ_to_use( self, passed_in_environ: Optional[Directory] = None ) -> Optional[Directory]: """ Looks at environmental vars SERVICE_NAME and APP_ENV, if they both exist and are not blank we will use that and return those values inside an environ Directory object. Otherwise, we will return the passed_in_environ, which should be the one that came from Config and is based on that Config's Config.APP_ENV and Config.SERVICE_NAME. """ from xcon import xcon_settings e_service = xcon_settings.service e_env = xcon_settings.environment if not e_service or not e_env: return passed_in_environ return Directory(service=e_service, env=e_env)
Ancestors
Class variables
var botocore_error_ignored_exception : botocore.exceptions.BotoCoreError
-
Inherited from:
ProviderCacher
.botocore_error_ignored_exception
This means that any attempt to communicat with aws service will probably fail; probable due to a corrupted or missing aws credentials.
var is_cacher
-
Inherited from:
ProviderCacher
.is_cacher
Easy way to figure out if a provider is a
ProviderCacher
or just a normal provider. Should be set toTrue
for provider subclasses that are … var name
-
Inherited from:
ProviderCacher
.name
This is the value that will normally be set to the items
DirectoryItem.source
, also displayed when logging out the names of providers … var needs_directory
-
Inherited from:
ProviderCacher
.needs_directory
By default, providers can't really use a
None
for a directory when callingget_item()
. If you CAN work with a None directory then set this to … var query_before_cache_if_possible
-
Inherited from:
ProviderCacher
.query_before_cache_if_possible
If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll …
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
ProviderCacher
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
ProviderCacher
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
ProviderCacher
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
ProviderCacher
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var local_cache : _LocalCache
-
Expand source code
@property def local_cache(self) -> _LocalCache: # Using default dict, so I don't have to worry about allocating the dict's my self later. maker = lambda c: DynamoCacher._LocalCache() cacher = InternalLocalProviderCache.grab() return cacher.get_cache_for_provider(provider=self, cache_constructor=maker)
var obj : Self
-
Inherited from:
ProviderCacher
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
ProviderCacher
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
ProviderCacher
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def cache_items(self, items: Iterable[DirectoryItem], provider_chain: ProviderChain, directory_chain: DirectoryChain, environ: Directory)
-
Cache's passed in item, using the other params to create the proper range and hash keys that we use in Dynamo to uniquely identify the item.
See
DynamoCacher.get_item()
for more details on what the Args mean. It uses many of the same ones.For and overview of the caching process, see Config Overview - Caching
Expand source code
def cache_items( self, items: Iterable[DirectoryItem], provider_chain: ProviderChain, directory_chain: DirectoryChain, environ: Directory ): """ Cache's passed in item, using the other params to create the proper range and hash keys that we use in Dynamo to uniquely identify the item. See `DynamoCacher.get_item` for more details on what the Args mean. It uses many of the same ones. For and overview of the caching process, see [Config Overview - Caching](../config.html#caching) """ environ = self._get_environ_to_use(environ) listing = self._get_listing( directory_chain=directory_chain, provider_chain=provider_chain, environ=environ ) items_to_send = [] for new_item in listing.get_items_with_different_value(items): if new_item.cacheable: ttl = self._ttl # If the item has a ttl, we want to use that. It means it's a temporary value # that should be looked up again after the expiration date. if new_item.ttl: # We should never get a ttl less then the current time; but if we do we will # insert an item into cacher that will never be read by other processes # [since the cacher will filter them out via a dynamo query-filter]. ttl = new_item.ttl concat_dir_paths = directory_chain.concatenated_directory_paths concat_provider_names = provider_chain.concatenated_provider_names item_to_cache = DirectoryItem( directory=new_item.directory, name=new_item.name, value=new_item.value, source=f"{new_item.source} - {new_item.directory.path}", ttl=ttl, # DirectoryItem will calculate a cache_range_key for us with these two values: cache_concat_directory_paths=concat_dir_paths, cache_concat_provider_names=concat_provider_names, cache_hash_key=environ.path, ) items_to_send.append(item_to_cache) listing.add_item(item_to_cache) if not items_to_send: return self.log_about_items( items=items_to_send, path=environ.path, msg_prefix="Sending to cache" ) if self.directory_has_error(environ): log.debug( f"Not saving cached items to {environ}, it had an error reading/writing " f"previously. See previous log messages [whenever the error happened for first " f"time] for more details." ) return try: self._table.put_items(items_to_send) except Exception as e: # Will either re-raise the exception or handle it for us. # It will also communicate to us via marking the directory as error'd on us if needed. handle_aws_exception(exception=e, provider=self, directory=environ)
def directory_has_error(self, directory: Directory)
-
Inherited from:
ProviderCacher
.directory_has_error
If a directory had an error in the past, this returns true. For informational purposes only.
def get_item(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Optional[Directory]) ‑> Optional[DirectoryItem]
-
Returns item out of the cache. Cache-key is constructed using directory_chain, provider_chain, environ and name.
For and overview of the caching process, see Config Overview - Caching
Args
name
- Name of the config value, ie:
XYNAPI_BASE_URL
. directory
- Directory to lookup value in, this is not really used right now by cacher. But it's used by the other providers. This cacher acts just like a provider and so accepts the parameter.
directory_chain
- Current directory chain that is being used to lookup value. Used as part of the rang-key in the Dynamo table.
provider_chain
:ProviderChain
- Current provider chain that is being used to lookup value. Used as part of the rang-key in the Dynamo table.
environ: This is the directory the cacher uses for the hash-key. It's supposed to have the full service and environment name.
Example Directory Path: `/hubspot/testing`
Returns
DirectoryItem
- If we have a cached item, this is it.
None
- Otherwise we return None indicating nothing has been cached.
Expand source code
def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Optional[Directory] ) -> Optional[DirectoryItem]: """ Returns item out of the cache. Cache-key is constructed using directory_chain, provider_chain, environ and name. For and overview of the caching process, see [Config Overview - Caching](../config.html#caching) Args: name: Name of the config value, ie: `XYNAPI_BASE_URL`. directory: Directory to lookup value in, this is not really used right now by cacher. But it's used by the other providers. This cacher acts just like a provider and so accepts the parameter. directory_chain: Current directory chain that is being used to lookup value. Used as part of the rang-key in the Dynamo table. provider_chain (xcon.provider.ProviderChain): Current provider chain that is being used to lookup value. Used as part of the rang-key in the Dynamo table. environ: This is the directory the cacher uses for the hash-key. It's supposed to have the full service and environment name. Example Directory Path: `/hubspot/testing` Returns: xcon.directory.DirectoryItem: If we have a cached item, this is it. None: Otherwise we return None indicating nothing has been cached. """ # Cache needs all of this stuff to do proper caching. environ = self._get_environ_to_use(environ) if not directory_chain or not environ: return None return self._get_listing( directory_chain=directory_chain, provider_chain=provider_chain, environ=environ, ).get_item(name)
def get_value(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory)
-
Inherited from:
ProviderCacher
.get_value
Gets an item's value for directory from provider. Return None if not found.
def mark_errored_directory(self, directory: Directory)
-
Inherited from:
ProviderCacher
.mark_errored_directory
If a directory has an error, this is called. For informational purposes only.
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Mapping[str, DirectoryItem]
-
This is mostly useful for getting this to cache, so I am not going to implement it in the cacher (if we ever do, the
ProviderChain
will have to figure out how to skip us).Expand source code
def retrieved_items_map(self, directory: DirectoryOrPath) -> Mapping[str, DirectoryItem]: """ This is mostly useful for getting this to cache, so I am not going to implement it in the cacher (if we ever do, the `xcon.provider.ProviderChain` will have to figure out how to skip us). """ return {}
- Hash key: Is the
class DynamoDBResource (dynamodb_xboto_resource: BotoResources.DynamoDB | DefaultType = Default)
-
If you have not already done so, you should also read the xinject project's README.md for an overview of the library before diving into the below text, that's more of like reference material.
Summary
Allows you to create subclasses that act as sharable dependencies. Things that should stick around and should be created lazily.
Also allows code to temporarily create, customize and activate a dependency if you don't want the customization to stick around permanently. You can do it without your or other code needing to be aware of each other.
This helps promote code decoupling, since it's so easy to make a Resource activate it as the 'current' version to use.
The only coupling that takes place is to the Resource sub-class it's self.
You can also easily have each thread lazily create seperate instance of your Resource, by inheriting from
PerThreadResource
.Each separate piece of code that uses a particular Resource subclass can be completely unaware of each other, and yet each one can take advantage of the shared dependency.
This means that Resource can help cover dependency-injection use-cases.
Overview
A
Resource
represents an object in aXContext
. Generally, dependencies that are added/created inside aXContext
inherit from this abstract baseResource
class, but are not required too.Resource
just adds some class-level conveince methods and configuratino options. Inheriting from Resource also helps self-document that it's a Resource.See Resources at top of this module for a general overview of how dependencies and
XContext
's work. You should also read the xinject project's README.md for a high-level overview. The text below is more like plain refrence matrial.Get the current dependency via
Resource.dependency
, you can call it on sub-class/concreate dependency type, like so:>>> from xinject import Dependency >>> class MyConfig(Dependency): ... some_setting: str = "default-setting-string" >>> >>> MyConfig.grab().some_setting
By default, Resource's act like a singletons; in that child contexs will simply get the same instance of the dependency that the parent context has.
If you inherit from this class, when you have
Resource.dependency
called on you, we will do our best to ensure that the same object instance is returned every time (there are two exceptions, keep reading).These dependencies are stored in the current
XContext
's parent. What happens is:If the current
XContext
and none of their parents have this object and it's asked for (like what happens whenResource.dependency
is called on it), it will be created in the deepest/oldest parent XContext.This is the first parent
XContext
who'sXContext.parent
is None. That way it should be visible to everyone on the current thread since it will normally be created in the app-rootXContext
.If the Dependency can't be shared between multiple threads, creation would normally happen at the thread-root XContext instead of the app-root one.
If we don't already exist in any parent, then we must be created the first time we are asked for. Normally it will simply be a direct call the dependency-type being requested, this is the normal way to create objects in python:
>>> class MyResource(Dependency): >>> pass >>> >>> MyResource.grab()
When that last line is executed, and the current or any parent context has a
MyResource
dependency;XContext
will simply create one via calling the dependency type:>>> MyResource()
You can allocate the dependency yourself with custom options and add it to the XContext your self.
Here are the various ways to do that, via:
-
XContext.add
Adds dependency to a specific XContext that already exists (or replaces if one has already been directly added in the past to that specific Context). When/While XContext is active, these added dependencies will be thecurrent
ones. -
Decorator, ie:
@MyResource()
>>> from xny_config import Config, config >>> >>> @DependencySubclass(service="override-service-name") >>> def my_method(): >>> assert config.service == "override-service-name"
-
via a with statement.
>>> def my_method(): >>> with @DependencySubclass(service="override-service-name") >>> assert config.service == "override-service-name"
-
multiple in single statement by making your own XContext directly:
>>> def my_method(): >>> with @XContext([ >>> DependencySubclass(service="override-service-name"), >>> SomeOtherDep(name='new-name') >>> ]): >>> assert config.service == "override-service-name"
Background on Unit Testing
By default, unit tests always create a new blank
XContext
withparent=None
. THis is done by an autouse fixture (xinject_test_context()
) THis forces every unit test run to create new dependencies when they are asked for (lazily).This fixture is used automatically for each unit test, it clears the app-root XContext, removes all current thread-root XContext's and their children from being
active
. just beofre each run of a unit test.That way it will recreate any shared dependency each time and a unit test can't leak dependencies it added or changed into the next run.
One example of why this is good is for
moto
when mocking dynamodb in boto3 client. Can use dependency to ensure that we get a new dynamodb shared dependency forboto
each time a unit test executes (which helps withmoto
, it needs to be active when a dependency is allocated/used).This is exactly what we want for each unit test run, to have a blank-slate for all the vairous dependencies.
If a particulre set of unit-tests need to have specific dependcies, you can use fixtures to modify/add various dependcies as needed for each indivirual unit-test function run.
When the application runs for real though, we do generally want to use the dependencies in a shared fashion. So normally we only allocate a new blank-root
@XContext(parent=None)
either at the start of a normal application run, or during a unit-test.Expand source code
class DynamoDBResource(Dependency): dynamodb_xboto_resource: BotoResources.DynamoDB | DefaultType = Default _table_name_to_boto_resource: dict[str, Any] def __init__(self, dynamodb_xboto_resource: BotoResources.DynamoDB | DefaultType = Default): self.dynamodb_xboto_resource = dynamodb_xboto_resource self._table_name_to_boto_resource = {} def table_resource(self, table_name): if resource := self._table_name_to_boto_resource.get(table_name): return resource if self.dynamodb_xboto_resource is Default: dynamodb = BotoResources.grab().dynamodb else: dynamodb = self.dynamodb_xboto_resource.boto_resource resource = dynamodb.Table(table_name) self._table_name_to_boto_resource[table_name] = resource return resource
Ancestors
Class variables
var dynamodb_xboto_resource : xboto.dependencies.DynamodbResource | DefaultType
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
Dependency
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
Dependency
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
Dependency
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
Dependency
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var obj : Self
-
Inherited from:
Dependency
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
Dependency
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
Dependency
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def table_resource(self, table_name)
-
Expand source code
def table_resource(self, table_name): if resource := self._table_name_to_boto_resource.get(table_name): return resource if self.dynamodb_xboto_resource is Default: dynamodb = BotoResources.grab().dynamodb else: dynamodb = self.dynamodb_xboto_resource.boto_resource resource = dynamodb.Table(table_name) self._table_name_to_boto_resource[table_name] = resource return resource
-
class DynamoProvider
-
Access a dynamo tabled called
global-all-config
when searching for a config value. This provider allows one to have a structured list or dictionary. It supports JSON and will parse/decode it when it gets it from Dynamo into a real Python dict/list/str/etc!Expand source code
class DynamoProvider(AwsProvider): """ Access a dynamo tabled called `global-all-config` when searching for a config value. This provider allows one to have a structured list or dictionary. It supports JSON and will parse/decode it when it gets it from Dynamo into a real Python dict/list/str/etc! """ name = "dynamo" _directories: Dict[Directory, DirectoryListing] @property def _table(self) -> _ConfigDynamoTable: # todo: make table name configurable return _ConfigDynamoTable(table_name='global-all-config') def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> Optional[DirectoryItem]: if directory is None: return None directory = Directory.from_path(directory) listing = self.local_cache.get(directory) if listing: return listing.get_item(name) # We need to look up the directory listing from Dynamo. items = [] try: if self.botocore_error_ignored_exception: # Raise same error we previously had, and handle it the same way # for this new directory. log.info( f"We've already previously had a botocore error. Botocore error's [vs client" f"errors] are generally related to something that will keep failing. " f"Assuming we can't do anything with the service so bailing out early via " f"the same previous exception; for directory {directory}." ) raise self.botocore_error_ignored_exception from None items = list(self._table.get_items_for_directory(directory=directory)) self.log_about_items(items=items, path=directory.path) except Exception as e: # Will either re-raise the exception or handle it for us. handle_aws_exception(exception=e, provider=self, directory=directory) listing = DirectoryListing(directory=directory, items=items) self.local_cache[directory] = listing return listing.get_item(name) def retrieved_items_map( self, directory: DirectoryOrPath ) -> Optional[Mapping[str, DirectoryItem]]: directory = Directory.from_path(directory) listing = self.local_cache.get(directory) if listing is None: return None return listing.item_mapping()
Ancestors
Class variables
var botocore_error_ignored_exception : botocore.exceptions.BotoCoreError
-
Inherited from:
AwsProvider
.botocore_error_ignored_exception
This means that any attempt to communicat with aws service will probably fail; probable due to a corrupted or missing aws credentials.
var is_cacher
-
Inherited from:
AwsProvider
.is_cacher
Easy way to figure out if a provider is a
ProviderCacher
or just a normal provider. Should be set toTrue
for provider subclasses that are … var name
-
Inherited from:
AwsProvider
.name
This is the value that will normally be set to the items
DirectoryItem.source
, also displayed when logging out the names of providers … var needs_directory
-
Inherited from:
AwsProvider
.needs_directory
By default, providers can't really use a
None
for a directory when callingget_item()
. If you CAN work with a None directory then set this to … var query_before_cache_if_possible
-
Inherited from:
AwsProvider
.query_before_cache_if_possible
If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll …
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
AwsProvider
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
AwsProvider
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
AwsProvider
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
AwsProvider
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var obj : Self
-
Inherited from:
AwsProvider
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
AwsProvider
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
AwsProvider
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def directory_has_error(self, directory: Directory)
-
Inherited from:
AwsProvider
.directory_has_error
If a directory had an error in the past, this returns true. For informational purposes only.
def get_item(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory) ‑> Optional[DirectoryItem]
-
Inherited from:
AwsProvider
.get_item
Grabs a config value for name in directory …
Expand source code
def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> Optional[DirectoryItem]: if directory is None: return None directory = Directory.from_path(directory) listing = self.local_cache.get(directory) if listing: return listing.get_item(name) # We need to look up the directory listing from Dynamo. items = [] try: if self.botocore_error_ignored_exception: # Raise same error we previously had, and handle it the same way # for this new directory. log.info( f"We've already previously had a botocore error. Botocore error's [vs client" f"errors] are generally related to something that will keep failing. " f"Assuming we can't do anything with the service so bailing out early via " f"the same previous exception; for directory {directory}." ) raise self.botocore_error_ignored_exception from None items = list(self._table.get_items_for_directory(directory=directory)) self.log_about_items(items=items, path=directory.path) except Exception as e: # Will either re-raise the exception or handle it for us. handle_aws_exception(exception=e, provider=self, directory=directory) listing = DirectoryListing(directory=directory, items=items) self.local_cache[directory] = listing return listing.get_item(name)
def get_value(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory)
-
Inherited from:
AwsProvider
.get_value
Gets an item's value for directory from provider. Return None if not found.
def mark_errored_directory(self, directory: Directory)
-
Inherited from:
AwsProvider
.mark_errored_directory
If a directory has an error, this is called. For informational purposes only.
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Optional[Mapping[str, DirectoryItem]]
-
Inherited from:
AwsProvider
.retrieved_items_map
Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's
item_mapping()
…Expand source code
def retrieved_items_map( self, directory: DirectoryOrPath ) -> Optional[Mapping[str, DirectoryItem]]: directory = Directory.from_path(directory) listing = self.local_cache.get(directory) if listing is None: return None return listing.item_mapping()