Module xcon.provider
Expand source code
from __future__ import annotations
import dataclasses
import os
import threading
from abc import ABC, abstractmethod
from inspect import isclass
from typing import Iterable, Optional, Mapping, Set, Type, Any, Callable, Dict
from typing import Union
from botocore.exceptions import BotoCoreError
from xinject import Dependency, XContext
from xloop import xloop
from .directory import Directory, DirectoryOrPath, DirectoryItem, DirectoryChain, DirectoryListing
import datetime as dt
from logging import getLogger
log = getLogger(__name__)
class Provider(Dependency):
"""
Represents a Provider, which wraps a resource that can be used to store Config values based
on a provided directory. It caches these directories so future lookups don't keep having to
fetch them from the Dependency again in the future, while the process is still running.
Most of the time there is no need for a several of the same provider in the same process/app.
The providers generally keep a cache of every directory they already looked up, so it's nice to
share that cache with other Child context's by default via the
`xinject.dependency.Dependency` mechanism.
Config will use the current/active provider when it needs to consult a Provider for values.
Each provider should be careful that they communicate with any remote service in a thread-safe
manner, as Config objects from other threads could use the same Provider instance.
Most of the time, network clients are not thread-safe to use, examples include:
- boto clients/resources are also not thread-safe.
- Sessions from requests library can't be used cross-thread.
For boto, if you use `xboto` to grab the boto client/resource,
it will allow you to lazily get a shared object that is guaranteed to only be shared
for the current thread.
This allows boto to reuse connections for things running on the same thread,
but `xboto` will lazily create a new client if your on a separate thread.
"""
name = "?"
""" This is the value that will normally be set to the items
`xcon.directory.DirectoryItem.source`, also displayed
when logging out the names of providers when something can't be found.
"""
is_cacher = False
""" Easy way to figure out if a provider is a `ProviderCacher` or just a normal provider.
Should be set to `True` for provider subclasses that are cachers.
Defaults to `False`.
"""
query_before_cache_if_possible = False
""" If True, and this is before any other providers that have this set to False, the
cacher will be consulted AFTER that provider(s). In this way I'll make my best-effort
to accommodate this particular request.
If False, then I guarantee the cacher [if there is one] will be consulted BEFORE
this provider is.
See [Config Overview - Caching](config.html#caching) for more details on how caching
works.
"""
needs_directory = True
""" By default, providers can't really use a `None` for a directory when calling `get_item()`.
If you CAN work with a None directory then set this to False (for example
`xcon.providers.environmental.EnvironmentalProvider` uses this).
A `None` normally means that we could not determine the proper directory to use.
This can happen if no SERVICE_NAME and APP_ENV are defined. But some providers don't
really use directories and are ok with getting a `None` query.
This flag lets Config class know if it should bother to allocate this provider or not based
on if it has a directory list or not to work with.
"""
_errored_directories: Set[Directory]
""" Used to keep track of directories we are excluding. """
# ------------------------------------
# --------- Abstract Methods ---------
@abstractmethod
def get_item(
self,
name: str,
directory: Optional[DirectoryOrPath],
directory_chain: DirectoryChain,
provider_chain: ProviderChain,
environ: Directory
) -> Optional[DirectoryItem]:
"""
Grabs a config value for name in directory.
The other arg's are more for the `ProviderCache`. They describe the current environment
of the current config lookup.
Side Note: For and overview of the caching process, see
[Config Overview - Caching](../config.html#caching)
Args:
name: Name of the config value, ie: `XYNAPI_BASE_URL`.
directory: Directory to lookup value in, this is not really used right now by
cacher. But it's used by the other providers. This cacher acts just like
a provider and so accepts the parameter.
directory_chain: Current directory chain that is being used to lookup value.
provider_chain (xcon.provider.ProviderChain): Current provider chain
that is being used to lookup value.
environ:
This is supposed to have the full service and environment name.
Example Directory Path: `/hubspot/testing`
Returns:
xcon.directory.DirectoryItem: If we have the item, this is it.
None: Otherwise we return None indicating we don't know about it.
"""
raise NotImplementedError(f"Need to implement in ({self}).")
@abstractmethod
def retrieved_items_map(
self, directory: DirectoryOrPath
) -> Optional[Mapping[str, DirectoryItem]]:
""" Should return a read-only lower-case item name TO item mapping.
You can easily get one of these from a DirectoryList object's `item_mapping()`.
If provider has not yet retrieved the listing for the passed-in directory,
it should simply pass back None. It's important to know the difference between
a blank retrieval and no-retrieval attempt, so please pass back None in that case!!!
"""
raise NotImplementedError(f"Need to implement in ({self}).")
# ---------------------------------
# --------- Other Methods ---------
def __init__(self):
self._errored_directories = set()
def log_about_items(
self, *, items: Iterable[DirectoryItem], path: str, msg_prefix='Retrieved'
):
# We could be called before application has configured it's logging;
# ensure logging has been configured before we log out.
# Other-wise log message may never get logged out
# (Python defaults to Warning log level).
# Use cache_range_key if it exists, otherwise use name.
# cache_range_key has the name + other uniquely identifying information.
names = [v.cache_range_key or v.name for v in items]
provider_class = self.__class__.__name__
thread_name = threading.current_thread().name
log.info(
f"{msg_prefix} values via provider ({self.name}/{provider_class}) "
f"for path ({path}), for thread ({thread_name}), for names ({names}).",
extra=dict(
msg_prefix=msg_prefix,
provider=self.name,
provider_class=provider_class,
names=names,
path=path,
thread_name=thread_name,
)
)
def mark_errored_directory(self, directory: Directory):
""" If a directory has an error, this is called. For informational purposes only. """
self._errored_directories.add(directory)
def directory_has_error(self, directory: Directory):
""" If a directory had an error in the past, this returns true.
For informational purposes only.
"""
return directory in self._errored_directories
def get_value(
self,
name: str,
directory: Optional[DirectoryOrPath],
directory_chain: DirectoryChain,
provider_chain: ProviderChain,
environ: Directory
):
""" Gets an item's value for directory from provider. Return None if not found.
"""
item = self.get_item(
name=name,
directory=directory,
directory_chain=directory_chain,
provider_chain=provider_chain,
environ=environ
)
return item.value if item else None
class AwsProvider(Provider):
""" AwsProvider is the Base class for Aws-associated config providers.
There is some aws specific error handing that this class helps with among the
aws providers.
This is the Default Doc message, you will want to override this doc-comment in
any subclasses.
"""
botocore_error_ignored_exception: BotoCoreError = None
""" This means that any attempt to communicat with aws service will probably fail;
probable due to a corrupted or missing aws credentials.
"""
@property
def local_cache(self) -> Dict[Directory, DirectoryListing]:
cacher = InternalLocalProviderCache.grab()
return cacher.get_cache_for_provider(provider=self, cache_constructor=lambda c: dict())
class ProviderCacher(AwsProvider):
# See `Provider.is_cacher` for docs.
is_cacher = True
""" This is set to True by default for ProviderCacher's.
See `Provider.is_cacher`.
"""
@abstractmethod
def cache_items(
self,
items: Iterable[DirectoryItem],
provider_chain: ProviderChain,
directory_chain: DirectoryChain,
environ: Directory
):
raise NotImplementedError(f"Need to implement in ({self}).")
@dataclasses.dataclass(eq=True, frozen=True)
class ProviderChain:
""" A prioritized list of providers to consult when getting a value.
"""
providers: Iterable[Union[Provider, Type[Provider]]] = dataclasses.field(
compare=False, repr=False
)
""" This will be a tuple of ordered providers.
When you create a ProviderChain, you can give it a class or objects.
It will convert any class types passed in into a proper object for you automatically
via the current context [as a resource].
"""
concatenated_provider_names: str = dataclasses.field(init=False, compare=True)
""" Concatenated list of all of my provider's `Provider.name` with a pipe `|` in-between.
Only includes providers that are cachable.
Cachable providers are ones assigned to myself after any providers list first that have
`Provider.query_before_cache_if_possible` set to True.
Starting with the first provider in my list has `Provider.query_before_cache_if_possible`
set to False (default) we will consider them cachable.
Normally, the `xcon.providers.environmental.EnvironmentalProvider` provider
is the only non-cacheable provider, and normally it's listed first.
This means that we will normally not cache values from this EnvironmentalProvider.
If the EnvironmentalProvider happens to be after a cachable provider, we will include
it as one of the keys in range-key of the items that gets cached into the Dynamo
config cache table.
This is because finding a value in some other provider before looking at
EnvironmentalProvider can effect the results since we would not look in the
EnvironmentalProvider in that case. As would finding a value in the environmental
provider would prevent looking at other providers.
Therefore it *might* effect the results. Just to be on safe side we use
EnvironmentalProvider as one of the range cache keys in this situations.
But like I said previously, normally the `EnvironmentalProvider` is the first provider
and so is not included in the final `concatenated_provider_names` list.
"""
have_any_cachable_providers: bool = True
""" If any providers have `Provider.query_before_cache_if_possible` set to `False
this will be `True`.
If **all** providers have `Provider.query_before_cache_if_possible` set to `True`
then this will set to `False`.
"""
def __post_init__(self):
providers = list()
context = XContext.grab()
provider_key_names = []
query_before_finished = False
for p in xloop(self.providers, default_not_iterate=[str]):
# Check to see if any of them are classes [and type's resources needs to be grabbed].
if isclass(p):
p = context.dependency(p)
providers.append(p)
if not query_before_finished:
if p.query_before_cache_if_possible:
continue
query_before_finished = True
provider_key_names.append(p.name)
object.__setattr__(self, 'providers', tuple(providers))
if not query_before_finished:
object.__setattr__(self, 'have_any_cachable_providers', False)
# Pre-calculate a useful field, a concatenated list of the directory paths.
provider_names = '|'.join(provider_key_names)
object.__setattr__(self, 'concatenated_provider_names', provider_names)
def _providers_with_cacher(
self,
directory_chain: DirectoryChain,
cacher: Optional[ProviderCacher] = None,
environ: Directory = None
) -> Iterable[Provider]:
""" Generator of providers and cacher, as needed. """
already_used_cache = False
for provider in self.providers:
if already_used_cache or provider.query_before_cache_if_possible:
yield provider
continue
already_used_cache = True
if cacher:
yield cacher
yield provider
def get_item(
self,
name: str,
directory_chain: DirectoryChain,
cacher: ProviderCacher = None,
environ: Directory = None
) -> DirectoryItem:
"""
Goes though passed in directory_chain, querying each provider in `DirectoryChain.providers`
for a value. If it finds one, that's what we will return. Otherwise None.
We will check with the passed in cacher at the appropriate time as we go though
our own providers via `DirectoryChain.providers`.
If needed we will tell the cacher before we return to cache the values we find.
"""
use_cacher = (cacher and environ)
items_cache = {}
item = None
places_checked = []
for directory in directory_chain.directories:
item = None
for provider in self._providers_with_cacher(
directory_chain=directory_chain, cacher=cacher, environ=environ
):
item = provider.get_item(
name=name, directory=directory,
directory_chain=directory_chain,
provider_chain=self,
environ=environ
)
had_error = provider.directory_has_error(directory)
if had_error:
result = "error"
elif item and item.directory and item.directory.is_non_existent:
result = "found(cached-as-non-existent)"
elif item:
result = "found"
else:
result = "not-found"
places_checked.append(
f"{provider.name}:{directory.path} | result={result}"
)
if item is not None:
break
if use_cacher and item and not item.cacheable:
# Optimization: Don't spend time looking at what cacher could send if our
# value is not cacheable [probably an environmental var].
use_cacher = False
# Priority for items is given to directories order, keep what we've already got
# over the new stuff from a lower-priority directory.
#
# We do this so we can cache as much as we can at a time, otherwise we would be
# caching with many single items with single requests at a time.
#
# The other option is to have the app/service that uses us use a batch-writer
# for the ConfigDynamoTable, which might be better [so we only write exactly what
# we need based on what got looked up].
#
# todo: [see discussion above]
# Consider if we should batch-write the whole time the program is running
# or if we should collect everything we can based on what we current have
# looked up and writing it to cache [only things that have not been in cache
# previously]. But we would need a way to force-write the current batch when
# we are done [think about it].
if use_cacher:
items_cache = {
**self.retrieved_items_map(directory=directory),
**items_cache
}
if item:
break
# If we did not find the item, create a 'nonExistent' item in it's place.
if not item:
item = DirectoryItem(None, name, value=None, source=f"/_nonExistent")
item.add_supplemental_metadata("locations_searched", places_checked)
if use_cacher and item.cacheable:
items_cache[item.name] = item
cacher.cache_items(
items_cache.values(),
provider_chain=self,
directory_chain=directory_chain,
environ=environ
)
return item
def retrieved_items_map(
self, directory: DirectoryOrPath
) -> Mapping[str, DirectoryItem]:
"""
Will return a read-only lower-case item name TO item mapping by going through each
provider in my chain, starting with the highest priority and calling
`retrieved_items_map()` on them and collecting the results into a single dict that I'll
return.
Keep in mind that if a provider has not retrieved anything yet, I'll stop and return
what I have at that point. These providers can be shared with other provider chains
and if a lower-priority provider has retrieved their values before a higher-priority, we
could end up with the wrong values. Since I stop at the first provider that has not
retrieved the passed in directory yet, we are protected from that possibility.
"""
final_map = {}
for provider in self.providers:
provider_map = provider.retrieved_items_map(directory)
if provider_map is None:
# We stop when we encounter a provider that has not retrieved
# the directory listing yet [safety mechanism, see doc comment above].
break
# `final_map` is second, so it overrides the `provider_map` dict.
final_map = {**provider_map, **final_map}
return final_map
class InternalLocalProviderCache(Dependency):
"""
Used by the providers for a place to store/cache things they retrieve from the systems
they provide configuration values from.
The reason we have a central resource to keep track of the cache now,
instead of doing it directly inside the providers like they used to be is
so that all providers can have their internal/local cache expire at the same time.
If the dynamo cache provider expires, and it happens to be something did change in SSM
and deleted a key in the dynamo cache table we want to look up the new value in SSM
and not version we may have in the internal/local cache.
If we did not expire everything simultaneously, it could be that the dynamo cache
expires before the SSM. So we end up check the SSM provider and the new value is
not looked up because of its internal/local cache.
Expiring all providers internal/local cached simultaneously avoids this problem,
and simplifies the 'high-level' conceptual aspect of how the Config and its internal
caching works from a usability/user-of-the-library point of view.
At the moment, the key is id/memory-address of the provider instance.
This means a new provider instance would provide a new/blank cache for that object.
The old instance, if still used, would still have access to whatever it previously cached.
"""
_local_internal_cache = None
_time_cache_last_reset = None
expire_time_delta: dt.timedelta = dt.timedelta(minutes=15)
"""
Amount of time before cache expires.
You can change this to anything you want at any time,
as it's checked each time a provider retrieves it's cache.
The providers do this every time they are asked for a value.
In addition to changing this directly
(via `InternalLocalProviderCache.grab().expire_time_delta` = ...)
you can also override this via an environmental variable:
`XCON_INTERNAL_CACHE_EXPIRATION_MINUTES`
If this variable is defined, we will take the value as the number of minutes
to wait until we expire/reset our cache.
Otherwise the default expiration is 15 minutes.
"""
def __init__(self):
super().__init__()
from xcon import xcon_settings
if minutes := xcon_settings.internal_cache_expiration_minutes:
if minutes > 0:
self.expire_time_delta = dt.timedelta(minutes=minutes)
self.reset_cache()
def get_cache_for_provider(
self, *, provider: Provider, cache_constructor: Callable[[InternalLocalProviderCache], Any]
) -> Any:
"""
Given `provider`, we will return a cached object keyed to the `provider` instance.
If there currently is no cache object for `provider` instance in self,
and you provide a `cache_constructor`, we will call the `cache_constructor` and provide
a single positional argument we pass `self` (instance of InternalLocalProviderCache) to.
If you return a non-None value from this constructor, we will store this value
as the cache object, keyed under the instance of `provider` and return this same
object in the future until the cache is expired or reset.
Otherwise, if you return None your expected to call `set_cache_for_provider` on
the instance of InternalLocalProviderCache we give the constructor callback.
If we still don't have a value, and you provided a constructor, we will raise a ValueError.
"""
self.expire_cache_if_needed()
provider_id = id(provider)
cache = self._local_internal_cache.get(provider_id)
if cache is None and cache_constructor:
cache = cache_constructor(self)
if cache is None:
cache = self._local_internal_cache.get(provider_id)
if cache is None:
raise ValueError(
f"Provided cache_constructor returned a None value and also did not set "
f"a value either in InternalLocalProviderCache for provider ({provider})."
)
self._local_internal_cache[provider_id] = cache
return cache
def set_cache_for_provider(self, *, provider: Provider, cache: Any):
self.expire_cache_if_needed()
self._local_internal_cache[id(provider)] = cache
def expire_cache_if_needed(self):
if self._time_cache_last_reset < dt.datetime.now() - self.expire_time_delta:
self.reset_cache()
def reset_cache(self):
self._local_internal_cache = {}
self._time_cache_last_reset = dt.datetime.now()
Classes
class AwsProvider
-
AwsProvider is the Base class for Aws-associated config providers.
There is some aws specific error handing that this class helps with among the aws providers.
This is the Default Doc message, you will want to override this doc-comment in any subclasses.
Expand source code
class AwsProvider(Provider): """ AwsProvider is the Base class for Aws-associated config providers. There is some aws specific error handing that this class helps with among the aws providers. This is the Default Doc message, you will want to override this doc-comment in any subclasses. """ botocore_error_ignored_exception: BotoCoreError = None """ This means that any attempt to communicat with aws service will probably fail; probable due to a corrupted or missing aws credentials. """ @property def local_cache(self) -> Dict[Directory, DirectoryListing]: cacher = InternalLocalProviderCache.grab() return cacher.get_cache_for_provider(provider=self, cache_constructor=lambda c: dict())
Ancestors
Subclasses
Class variables
var botocore_error_ignored_exception : botocore.exceptions.BotoCoreError
-
This means that any attempt to communicat with aws service will probably fail; probable due to a corrupted or missing aws credentials.
var is_cacher
-
Inherited from:
Provider
.is_cacher
Easy way to figure out if a provider is a
ProviderCacher
or just a normal provider. Should be set toTrue
for provider subclasses that are … var name
-
This is the value that will normally be set to the items
DirectoryItem.source
, also displayed when logging out the names of providers … var needs_directory
-
Inherited from:
Provider
.needs_directory
By default, providers can't really use a
None
for a directory when callingget_item()
. If you CAN work with a None directory then set this to … var query_before_cache_if_possible
-
Inherited from:
Provider
.query_before_cache_if_possible
If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll …
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
Provider
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
Provider
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
Provider
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var local_cache : Dict[Directory, DirectoryListing]
-
Expand source code
@property def local_cache(self) -> Dict[Directory, DirectoryListing]: cacher = InternalLocalProviderCache.grab() return cacher.get_cache_for_provider(provider=self, cache_constructor=lambda c: dict())
var obj : Self
-
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
Provider
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
Provider
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def directory_has_error(self, directory: Directory)
-
Inherited from:
Provider
.directory_has_error
If a directory had an error in the past, this returns true. For informational purposes only.
def get_item(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory) ‑> Optional[DirectoryItem]
-
Inherited from:
Provider
.get_item
Grabs a config value for name in directory …
def get_value(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory)
-
Inherited from:
Provider
.get_value
Gets an item's value for directory from provider. Return None if not found.
def mark_errored_directory(self, directory: Directory)
-
Inherited from:
Provider
.mark_errored_directory
If a directory has an error, this is called. For informational purposes only.
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Optional[Mapping[str, DirectoryItem]]
-
Inherited from:
Provider
.retrieved_items_map
Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's
item_mapping()
…
class InternalLocalProviderCache
-
Used by the providers for a place to store/cache things they retrieve from the systems they provide configuration values from.
The reason we have a central resource to keep track of the cache now, instead of doing it directly inside the providers like they used to be is so that all providers can have their internal/local cache expire at the same time.
If the dynamo cache provider expires, and it happens to be something did change in SSM and deleted a key in the dynamo cache table we want to look up the new value in SSM and not version we may have in the internal/local cache.
If we did not expire everything simultaneously, it could be that the dynamo cache expires before the SSM. So we end up check the SSM provider and the new value is not looked up because of its internal/local cache.
Expiring all providers internal/local cached simultaneously avoids this problem, and simplifies the 'high-level' conceptual aspect of how the Config and its internal caching works from a usability/user-of-the-library point of view.
At the moment, the key is id/memory-address of the provider instance. This means a new provider instance would provide a new/blank cache for that object. The old instance, if still used, would still have access to whatever it previously cached.
Expand source code
class InternalLocalProviderCache(Dependency): """ Used by the providers for a place to store/cache things they retrieve from the systems they provide configuration values from. The reason we have a central resource to keep track of the cache now, instead of doing it directly inside the providers like they used to be is so that all providers can have their internal/local cache expire at the same time. If the dynamo cache provider expires, and it happens to be something did change in SSM and deleted a key in the dynamo cache table we want to look up the new value in SSM and not version we may have in the internal/local cache. If we did not expire everything simultaneously, it could be that the dynamo cache expires before the SSM. So we end up check the SSM provider and the new value is not looked up because of its internal/local cache. Expiring all providers internal/local cached simultaneously avoids this problem, and simplifies the 'high-level' conceptual aspect of how the Config and its internal caching works from a usability/user-of-the-library point of view. At the moment, the key is id/memory-address of the provider instance. This means a new provider instance would provide a new/blank cache for that object. The old instance, if still used, would still have access to whatever it previously cached. """ _local_internal_cache = None _time_cache_last_reset = None expire_time_delta: dt.timedelta = dt.timedelta(minutes=15) """ Amount of time before cache expires. You can change this to anything you want at any time, as it's checked each time a provider retrieves it's cache. The providers do this every time they are asked for a value. In addition to changing this directly (via `InternalLocalProviderCache.grab().expire_time_delta` = ...) you can also override this via an environmental variable: `XCON_INTERNAL_CACHE_EXPIRATION_MINUTES` If this variable is defined, we will take the value as the number of minutes to wait until we expire/reset our cache. Otherwise the default expiration is 15 minutes. """ def __init__(self): super().__init__() from xcon import xcon_settings if minutes := xcon_settings.internal_cache_expiration_minutes: if minutes > 0: self.expire_time_delta = dt.timedelta(minutes=minutes) self.reset_cache() def get_cache_for_provider( self, *, provider: Provider, cache_constructor: Callable[[InternalLocalProviderCache], Any] ) -> Any: """ Given `provider`, we will return a cached object keyed to the `provider` instance. If there currently is no cache object for `provider` instance in self, and you provide a `cache_constructor`, we will call the `cache_constructor` and provide a single positional argument we pass `self` (instance of InternalLocalProviderCache) to. If you return a non-None value from this constructor, we will store this value as the cache object, keyed under the instance of `provider` and return this same object in the future until the cache is expired or reset. Otherwise, if you return None your expected to call `set_cache_for_provider` on the instance of InternalLocalProviderCache we give the constructor callback. If we still don't have a value, and you provided a constructor, we will raise a ValueError. """ self.expire_cache_if_needed() provider_id = id(provider) cache = self._local_internal_cache.get(provider_id) if cache is None and cache_constructor: cache = cache_constructor(self) if cache is None: cache = self._local_internal_cache.get(provider_id) if cache is None: raise ValueError( f"Provided cache_constructor returned a None value and also did not set " f"a value either in InternalLocalProviderCache for provider ({provider})." ) self._local_internal_cache[provider_id] = cache return cache def set_cache_for_provider(self, *, provider: Provider, cache: Any): self.expire_cache_if_needed() self._local_internal_cache[id(provider)] = cache def expire_cache_if_needed(self): if self._time_cache_last_reset < dt.datetime.now() - self.expire_time_delta: self.reset_cache() def reset_cache(self): self._local_internal_cache = {} self._time_cache_last_reset = dt.datetime.now()
Ancestors
Class variables
var expire_time_delta : datetime.timedelta
-
Amount of time before cache expires. You can change this to anything you want at any time, as it's checked each time a provider retrieves it's cache. The providers do this every time they are asked for a value.
In addition to changing this directly (via
Dependency.grab().expire_time_delta
= …) you can also override this via an environmental variable:XCON_INTERNAL_CACHE_EXPIRATION_MINUTES
If this variable is defined, we will take the value as the number of minutes to wait until we expire/reset our cache.
Otherwise the default expiration is 15 minutes.
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
Dependency
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
Dependency
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
Dependency
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
Dependency
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var obj : Self
-
Inherited from:
Dependency
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
Dependency
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
Dependency
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def expire_cache_if_needed(self)
-
Expand source code
def expire_cache_if_needed(self): if self._time_cache_last_reset < dt.datetime.now() - self.expire_time_delta: self.reset_cache()
def get_cache_for_provider(self, *, provider: Provider, cache_constructor: Callable[[InternalLocalProviderCache], Any]) ‑> Any
-
Given
provider
, we will return a cached object keyed to theprovider
instance.If there currently is no cache object for
provider
instance in self, and you provide acache_constructor
, we will call thecache_constructor
and provide a single positional argument we passself
(instance of InternalLocalProviderCache) to.If you return a non-None value from this constructor, we will store this value as the cache object, keyed under the instance of
provider
and return this same object in the future until the cache is expired or reset.Otherwise, if you return None your expected to call
set_cache_for_provider
on the instance of InternalLocalProviderCache we give the constructor callback.If we still don't have a value, and you provided a constructor, we will raise a ValueError.
Expand source code
def get_cache_for_provider( self, *, provider: Provider, cache_constructor: Callable[[InternalLocalProviderCache], Any] ) -> Any: """ Given `provider`, we will return a cached object keyed to the `provider` instance. If there currently is no cache object for `provider` instance in self, and you provide a `cache_constructor`, we will call the `cache_constructor` and provide a single positional argument we pass `self` (instance of InternalLocalProviderCache) to. If you return a non-None value from this constructor, we will store this value as the cache object, keyed under the instance of `provider` and return this same object in the future until the cache is expired or reset. Otherwise, if you return None your expected to call `set_cache_for_provider` on the instance of InternalLocalProviderCache we give the constructor callback. If we still don't have a value, and you provided a constructor, we will raise a ValueError. """ self.expire_cache_if_needed() provider_id = id(provider) cache = self._local_internal_cache.get(provider_id) if cache is None and cache_constructor: cache = cache_constructor(self) if cache is None: cache = self._local_internal_cache.get(provider_id) if cache is None: raise ValueError( f"Provided cache_constructor returned a None value and also did not set " f"a value either in InternalLocalProviderCache for provider ({provider})." ) self._local_internal_cache[provider_id] = cache return cache
def reset_cache(self)
-
Expand source code
def reset_cache(self): self._local_internal_cache = {} self._time_cache_last_reset = dt.datetime.now()
def set_cache_for_provider(self, *, provider: Provider, cache: Any)
-
Expand source code
def set_cache_for_provider(self, *, provider: Provider, cache: Any): self.expire_cache_if_needed() self._local_internal_cache[id(provider)] = cache
class Provider
-
Represents a Provider, which wraps a resource that can be used to store Config values based on a provided directory. It caches these directories so future lookups don't keep having to fetch them from the Dependency again in the future, while the process is still running.
Most of the time there is no need for a several of the same provider in the same process/app.
The providers generally keep a cache of every directory they already looked up, so it's nice to share that cache with other Child context's by default via the
Dependency
mechanism.Config will use the current/active provider when it needs to consult a Provider for values.
Each provider should be careful that they communicate with any remote service in a thread-safe manner, as Config objects from other threads could use the same Provider instance.
Most of the time, network clients are not thread-safe to use, examples include:
- boto clients/resources are also not thread-safe.
- Sessions from requests library can't be used cross-thread.
For boto, if you use
xboto
to grab the boto client/resource, it will allow you to lazily get a shared object that is guaranteed to only be shared for the current thread.This allows boto to reuse connections for things running on the same thread, but
xboto
will lazily create a new client if your on a separate thread.Expand source code
class Provider(Dependency): """ Represents a Provider, which wraps a resource that can be used to store Config values based on a provided directory. It caches these directories so future lookups don't keep having to fetch them from the Dependency again in the future, while the process is still running. Most of the time there is no need for a several of the same provider in the same process/app. The providers generally keep a cache of every directory they already looked up, so it's nice to share that cache with other Child context's by default via the `xinject.dependency.Dependency` mechanism. Config will use the current/active provider when it needs to consult a Provider for values. Each provider should be careful that they communicate with any remote service in a thread-safe manner, as Config objects from other threads could use the same Provider instance. Most of the time, network clients are not thread-safe to use, examples include: - boto clients/resources are also not thread-safe. - Sessions from requests library can't be used cross-thread. For boto, if you use `xboto` to grab the boto client/resource, it will allow you to lazily get a shared object that is guaranteed to only be shared for the current thread. This allows boto to reuse connections for things running on the same thread, but `xboto` will lazily create a new client if your on a separate thread. """ name = "?" """ This is the value that will normally be set to the items `xcon.directory.DirectoryItem.source`, also displayed when logging out the names of providers when something can't be found. """ is_cacher = False """ Easy way to figure out if a provider is a `ProviderCacher` or just a normal provider. Should be set to `True` for provider subclasses that are cachers. Defaults to `False`. """ query_before_cache_if_possible = False """ If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll make my best-effort to accommodate this particular request. If False, then I guarantee the cacher [if there is one] will be consulted BEFORE this provider is. See [Config Overview - Caching](config.html#caching) for more details on how caching works. """ needs_directory = True """ By default, providers can't really use a `None` for a directory when calling `get_item()`. If you CAN work with a None directory then set this to False (for example `xcon.providers.environmental.EnvironmentalProvider` uses this). A `None` normally means that we could not determine the proper directory to use. This can happen if no SERVICE_NAME and APP_ENV are defined. But some providers don't really use directories and are ok with getting a `None` query. This flag lets Config class know if it should bother to allocate this provider or not based on if it has a directory list or not to work with. """ _errored_directories: Set[Directory] """ Used to keep track of directories we are excluding. """ # ------------------------------------ # --------- Abstract Methods --------- @abstractmethod def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> Optional[DirectoryItem]: """ Grabs a config value for name in directory. The other arg's are more for the `ProviderCache`. They describe the current environment of the current config lookup. Side Note: For and overview of the caching process, see [Config Overview - Caching](../config.html#caching) Args: name: Name of the config value, ie: `XYNAPI_BASE_URL`. directory: Directory to lookup value in, this is not really used right now by cacher. But it's used by the other providers. This cacher acts just like a provider and so accepts the parameter. directory_chain: Current directory chain that is being used to lookup value. provider_chain (xcon.provider.ProviderChain): Current provider chain that is being used to lookup value. environ: This is supposed to have the full service and environment name. Example Directory Path: `/hubspot/testing` Returns: xcon.directory.DirectoryItem: If we have the item, this is it. None: Otherwise we return None indicating we don't know about it. """ raise NotImplementedError(f"Need to implement in ({self}).") @abstractmethod def retrieved_items_map( self, directory: DirectoryOrPath ) -> Optional[Mapping[str, DirectoryItem]]: """ Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's `item_mapping()`. If provider has not yet retrieved the listing for the passed-in directory, it should simply pass back None. It's important to know the difference between a blank retrieval and no-retrieval attempt, so please pass back None in that case!!! """ raise NotImplementedError(f"Need to implement in ({self}).") # --------------------------------- # --------- Other Methods --------- def __init__(self): self._errored_directories = set() def log_about_items( self, *, items: Iterable[DirectoryItem], path: str, msg_prefix='Retrieved' ): # We could be called before application has configured it's logging; # ensure logging has been configured before we log out. # Other-wise log message may never get logged out # (Python defaults to Warning log level). # Use cache_range_key if it exists, otherwise use name. # cache_range_key has the name + other uniquely identifying information. names = [v.cache_range_key or v.name for v in items] provider_class = self.__class__.__name__ thread_name = threading.current_thread().name log.info( f"{msg_prefix} values via provider ({self.name}/{provider_class}) " f"for path ({path}), for thread ({thread_name}), for names ({names}).", extra=dict( msg_prefix=msg_prefix, provider=self.name, provider_class=provider_class, names=names, path=path, thread_name=thread_name, ) ) def mark_errored_directory(self, directory: Directory): """ If a directory has an error, this is called. For informational purposes only. """ self._errored_directories.add(directory) def directory_has_error(self, directory: Directory): """ If a directory had an error in the past, this returns true. For informational purposes only. """ return directory in self._errored_directories def get_value( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ): """ Gets an item's value for directory from provider. Return None if not found. """ item = self.get_item( name=name, directory=directory, directory_chain=directory_chain, provider_chain=provider_chain, environ=environ ) return item.value if item else None
Ancestors
Subclasses
Class variables
var is_cacher
-
Easy way to figure out if a provider is a
ProviderCacher
or just a normal provider. Should be set toTrue
for provider subclasses that are cachers. Defaults toFalse
. var name
-
This is the value that will normally be set to the items
DirectoryItem.source
, also displayed when logging out the names of providers when something can't be found. var needs_directory
-
By default, providers can't really use a
None
for a directory when callingget_item()
. If you CAN work with a None directory then set this to False (for exampleEnvironmentalProvider
uses this).A
None
normally means that we could not determine the proper directory to use. This can happen if no SERVICE_NAME and APP_ENV are defined. But some providers don't really use directories and are ok with getting aNone
query.This flag lets Config class know if it should bother to allocate this provider or not based on if it has a directory list or not to work with.
var query_before_cache_if_possible
-
If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll make my best-effort to accommodate this particular request.
If False, then I guarantee the cacher [if there is one] will be consulted BEFORE this provider is.
See Config Overview - Caching for more details on how caching works.
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
Dependency
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
Dependency
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
Dependency
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
Dependency
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var obj : Self
-
Inherited from:
Dependency
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
Dependency
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
Dependency
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def directory_has_error(self, directory: Directory)
-
If a directory had an error in the past, this returns true. For informational purposes only.
Expand source code
def directory_has_error(self, directory: Directory): """ If a directory had an error in the past, this returns true. For informational purposes only. """ return directory in self._errored_directories
def get_item(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory) ‑> Optional[DirectoryItem]
-
Grabs a config value for name in directory.
The other arg's are more for the
ProviderCache
. They describe the current environment of the current config lookup.Side Note: For and overview of the caching process, see Config Overview - Caching
Args
name
- Name of the config value, ie:
XYNAPI_BASE_URL
. directory
- Directory to lookup value in, this is not really used right now by cacher. But it's used by the other providers. This cacher acts just like a provider and so accepts the parameter.
directory_chain
- Current directory chain that is being used to lookup value.
provider_chain
:ProviderChain
- Current provider chain that is being used to lookup value.
environ: This is supposed to have the full service and environment name.
Example Directory Path: `/hubspot/testing`
Returns
DirectoryItem
- If we have the item, this is it.
None
- Otherwise we return None indicating we don't know about it.
Expand source code
@abstractmethod def get_item( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ) -> Optional[DirectoryItem]: """ Grabs a config value for name in directory. The other arg's are more for the `ProviderCache`. They describe the current environment of the current config lookup. Side Note: For and overview of the caching process, see [Config Overview - Caching](../config.html#caching) Args: name: Name of the config value, ie: `XYNAPI_BASE_URL`. directory: Directory to lookup value in, this is not really used right now by cacher. But it's used by the other providers. This cacher acts just like a provider and so accepts the parameter. directory_chain: Current directory chain that is being used to lookup value. provider_chain (xcon.provider.ProviderChain): Current provider chain that is being used to lookup value. environ: This is supposed to have the full service and environment name. Example Directory Path: `/hubspot/testing` Returns: xcon.directory.DirectoryItem: If we have the item, this is it. None: Otherwise we return None indicating we don't know about it. """ raise NotImplementedError(f"Need to implement in ({self}).")
def get_value(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory)
-
Gets an item's value for directory from provider. Return None if not found.
Expand source code
def get_value( self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory ): """ Gets an item's value for directory from provider. Return None if not found. """ item = self.get_item( name=name, directory=directory, directory_chain=directory_chain, provider_chain=provider_chain, environ=environ ) return item.value if item else None
def log_about_items(self, *, items: Iterable[DirectoryItem], path: str, msg_prefix='Retrieved')
-
Expand source code
def log_about_items( self, *, items: Iterable[DirectoryItem], path: str, msg_prefix='Retrieved' ): # We could be called before application has configured it's logging; # ensure logging has been configured before we log out. # Other-wise log message may never get logged out # (Python defaults to Warning log level). # Use cache_range_key if it exists, otherwise use name. # cache_range_key has the name + other uniquely identifying information. names = [v.cache_range_key or v.name for v in items] provider_class = self.__class__.__name__ thread_name = threading.current_thread().name log.info( f"{msg_prefix} values via provider ({self.name}/{provider_class}) " f"for path ({path}), for thread ({thread_name}), for names ({names}).", extra=dict( msg_prefix=msg_prefix, provider=self.name, provider_class=provider_class, names=names, path=path, thread_name=thread_name, ) )
def mark_errored_directory(self, directory: Directory)
-
If a directory has an error, this is called. For informational purposes only.
Expand source code
def mark_errored_directory(self, directory: Directory): """ If a directory has an error, this is called. For informational purposes only. """ self._errored_directories.add(directory)
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Optional[Mapping[str, DirectoryItem]]
-
Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's
item_mapping()
.If provider has not yet retrieved the listing for the passed-in directory, it should simply pass back None. It's important to know the difference between a blank retrieval and no-retrieval attempt, so please pass back None in that case!!!
Expand source code
@abstractmethod def retrieved_items_map( self, directory: DirectoryOrPath ) -> Optional[Mapping[str, DirectoryItem]]: """ Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's `item_mapping()`. If provider has not yet retrieved the listing for the passed-in directory, it should simply pass back None. It's important to know the difference between a blank retrieval and no-retrieval attempt, so please pass back None in that case!!! """ raise NotImplementedError(f"Need to implement in ({self}).")
class ProviderCacher
-
AwsProvider is the Base class for Aws-associated config providers.
There is some aws specific error handing that this class helps with among the aws providers.
This is the Default Doc message, you will want to override this doc-comment in any subclasses.
Expand source code
class ProviderCacher(AwsProvider): # See `Provider.is_cacher` for docs. is_cacher = True """ This is set to True by default for ProviderCacher's. See `Provider.is_cacher`. """ @abstractmethod def cache_items( self, items: Iterable[DirectoryItem], provider_chain: ProviderChain, directory_chain: DirectoryChain, environ: Directory ): raise NotImplementedError(f"Need to implement in ({self}).")
Ancestors
Subclasses
Class variables
var botocore_error_ignored_exception : botocore.exceptions.BotoCoreError
-
Inherited from:
AwsProvider
.botocore_error_ignored_exception
This means that any attempt to communicat with aws service will probably fail; probable due to a corrupted or missing aws credentials.
var is_cacher
-
Inherited from:
AwsProvider
.is_cacher
Easy way to figure out if a provider is a
ProviderCacher
or just a normal provider. Should be set toTrue
for provider subclasses that are … var name
-
Inherited from:
AwsProvider
.name
This is the value that will normally be set to the items
DirectoryItem.source
, also displayed when logging out the names of providers … var needs_directory
-
Inherited from:
AwsProvider
.needs_directory
By default, providers can't really use a
None
for a directory when callingget_item()
. If you CAN work with a None directory then set this to … var query_before_cache_if_possible
-
Inherited from:
AwsProvider
.query_before_cache_if_possible
If True, and this is before any other providers that have this set to False, the cacher will be consulted AFTER that provider(s). In this way I'll …
Static methods
def __init_subclass__(thread_sharable=Default, attributes_to_skip_while_copying: Optional[Iterable[str]] = Default, **kwargs)
-
Inherited from:
AwsProvider
.__init_subclass__
Args
thread_sharable
- If
False
: While a dependency is lazily auto-created, we will ensure we do it per-thread, and not make it visible …
def grab() ‑> ~T
-
Inherited from:
AwsProvider
.grab
Gets a potentially shared dependency from the current
udpend.context.XContext
… def proxy() ‑> ~R
-
Inherited from:
AwsProvider
.proxy
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current object of
cls
… def proxy_attribute(attribute_name: str) ‑> Any
-
Inherited from:
AwsProvider
.proxy_attribute
Returns a proxy-object, that when and attribute is asked for, it will proxy it to the current attribute value on the current object of
cls
…
Instance variables
var obj : Self
-
Inherited from:
AwsProvider
.obj
class property/attribute that will return the current dependency for the subclass it's asked on by calling
Dependency.grab
, passing no extra …
Methods
def __call__(self, func)
-
Inherited from:
AwsProvider
.__call__
This makes Resource subclasses have an ability to be used as function decorators by default unless this method is overriden to provide some other …
def __copy__(self)
-
Inherited from:
AwsProvider
.__copy__
Basic shallow copy protection (I am wondering if I should just remove this default copy code) …
def cache_items(self, items: Iterable[DirectoryItem], provider_chain: ProviderChain, directory_chain: DirectoryChain, environ: Directory)
-
Expand source code
@abstractmethod def cache_items( self, items: Iterable[DirectoryItem], provider_chain: ProviderChain, directory_chain: DirectoryChain, environ: Directory ): raise NotImplementedError(f"Need to implement in ({self}).")
def directory_has_error(self, directory: Directory)
-
Inherited from:
AwsProvider
.directory_has_error
If a directory had an error in the past, this returns true. For informational purposes only.
def get_item(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory) ‑> Optional[DirectoryItem]
-
Inherited from:
AwsProvider
.get_item
Grabs a config value for name in directory …
def get_value(self, name: str, directory: Optional[DirectoryOrPath], directory_chain: DirectoryChain, provider_chain: ProviderChain, environ: Directory)
-
Inherited from:
AwsProvider
.get_value
Gets an item's value for directory from provider. Return None if not found.
def mark_errored_directory(self, directory: Directory)
-
Inherited from:
AwsProvider
.mark_errored_directory
If a directory has an error, this is called. For informational purposes only.
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Optional[Mapping[str, DirectoryItem]]
-
Inherited from:
AwsProvider
.retrieved_items_map
Should return a read-only lower-case item name TO item mapping. You can easily get one of these from a DirectoryList object's
item_mapping()
…
class ProviderChain (providers: Iterable[Union[Provider, Type[Provider]]], have_any_cachable_providers: bool = True)
-
A prioritized list of providers to consult when getting a value.
Expand source code
@dataclasses.dataclass(eq=True, frozen=True) class ProviderChain: """ A prioritized list of providers to consult when getting a value. """ providers: Iterable[Union[Provider, Type[Provider]]] = dataclasses.field( compare=False, repr=False ) """ This will be a tuple of ordered providers. When you create a ProviderChain, you can give it a class or objects. It will convert any class types passed in into a proper object for you automatically via the current context [as a resource]. """ concatenated_provider_names: str = dataclasses.field(init=False, compare=True) """ Concatenated list of all of my provider's `Provider.name` with a pipe `|` in-between. Only includes providers that are cachable. Cachable providers are ones assigned to myself after any providers list first that have `Provider.query_before_cache_if_possible` set to True. Starting with the first provider in my list has `Provider.query_before_cache_if_possible` set to False (default) we will consider them cachable. Normally, the `xcon.providers.environmental.EnvironmentalProvider` provider is the only non-cacheable provider, and normally it's listed first. This means that we will normally not cache values from this EnvironmentalProvider. If the EnvironmentalProvider happens to be after a cachable provider, we will include it as one of the keys in range-key of the items that gets cached into the Dynamo config cache table. This is because finding a value in some other provider before looking at EnvironmentalProvider can effect the results since we would not look in the EnvironmentalProvider in that case. As would finding a value in the environmental provider would prevent looking at other providers. Therefore it *might* effect the results. Just to be on safe side we use EnvironmentalProvider as one of the range cache keys in this situations. But like I said previously, normally the `EnvironmentalProvider` is the first provider and so is not included in the final `concatenated_provider_names` list. """ have_any_cachable_providers: bool = True """ If any providers have `Provider.query_before_cache_if_possible` set to `False this will be `True`. If **all** providers have `Provider.query_before_cache_if_possible` set to `True` then this will set to `False`. """ def __post_init__(self): providers = list() context = XContext.grab() provider_key_names = [] query_before_finished = False for p in xloop(self.providers, default_not_iterate=[str]): # Check to see if any of them are classes [and type's resources needs to be grabbed]. if isclass(p): p = context.dependency(p) providers.append(p) if not query_before_finished: if p.query_before_cache_if_possible: continue query_before_finished = True provider_key_names.append(p.name) object.__setattr__(self, 'providers', tuple(providers)) if not query_before_finished: object.__setattr__(self, 'have_any_cachable_providers', False) # Pre-calculate a useful field, a concatenated list of the directory paths. provider_names = '|'.join(provider_key_names) object.__setattr__(self, 'concatenated_provider_names', provider_names) def _providers_with_cacher( self, directory_chain: DirectoryChain, cacher: Optional[ProviderCacher] = None, environ: Directory = None ) -> Iterable[Provider]: """ Generator of providers and cacher, as needed. """ already_used_cache = False for provider in self.providers: if already_used_cache or provider.query_before_cache_if_possible: yield provider continue already_used_cache = True if cacher: yield cacher yield provider def get_item( self, name: str, directory_chain: DirectoryChain, cacher: ProviderCacher = None, environ: Directory = None ) -> DirectoryItem: """ Goes though passed in directory_chain, querying each provider in `DirectoryChain.providers` for a value. If it finds one, that's what we will return. Otherwise None. We will check with the passed in cacher at the appropriate time as we go though our own providers via `DirectoryChain.providers`. If needed we will tell the cacher before we return to cache the values we find. """ use_cacher = (cacher and environ) items_cache = {} item = None places_checked = [] for directory in directory_chain.directories: item = None for provider in self._providers_with_cacher( directory_chain=directory_chain, cacher=cacher, environ=environ ): item = provider.get_item( name=name, directory=directory, directory_chain=directory_chain, provider_chain=self, environ=environ ) had_error = provider.directory_has_error(directory) if had_error: result = "error" elif item and item.directory and item.directory.is_non_existent: result = "found(cached-as-non-existent)" elif item: result = "found" else: result = "not-found" places_checked.append( f"{provider.name}:{directory.path} | result={result}" ) if item is not None: break if use_cacher and item and not item.cacheable: # Optimization: Don't spend time looking at what cacher could send if our # value is not cacheable [probably an environmental var]. use_cacher = False # Priority for items is given to directories order, keep what we've already got # over the new stuff from a lower-priority directory. # # We do this so we can cache as much as we can at a time, otherwise we would be # caching with many single items with single requests at a time. # # The other option is to have the app/service that uses us use a batch-writer # for the ConfigDynamoTable, which might be better [so we only write exactly what # we need based on what got looked up]. # # todo: [see discussion above] # Consider if we should batch-write the whole time the program is running # or if we should collect everything we can based on what we current have # looked up and writing it to cache [only things that have not been in cache # previously]. But we would need a way to force-write the current batch when # we are done [think about it]. if use_cacher: items_cache = { **self.retrieved_items_map(directory=directory), **items_cache } if item: break # If we did not find the item, create a 'nonExistent' item in it's place. if not item: item = DirectoryItem(None, name, value=None, source=f"/_nonExistent") item.add_supplemental_metadata("locations_searched", places_checked) if use_cacher and item.cacheable: items_cache[item.name] = item cacher.cache_items( items_cache.values(), provider_chain=self, directory_chain=directory_chain, environ=environ ) return item def retrieved_items_map( self, directory: DirectoryOrPath ) -> Mapping[str, DirectoryItem]: """ Will return a read-only lower-case item name TO item mapping by going through each provider in my chain, starting with the highest priority and calling `retrieved_items_map()` on them and collecting the results into a single dict that I'll return. Keep in mind that if a provider has not retrieved anything yet, I'll stop and return what I have at that point. These providers can be shared with other provider chains and if a lower-priority provider has retrieved their values before a higher-priority, we could end up with the wrong values. Since I stop at the first provider that has not retrieved the passed in directory yet, we are protected from that possibility. """ final_map = {} for provider in self.providers: provider_map = provider.retrieved_items_map(directory) if provider_map is None: # We stop when we encounter a provider that has not retrieved # the directory listing yet [safety mechanism, see doc comment above]. break # `final_map` is second, so it overrides the `provider_map` dict. final_map = {**provider_map, **final_map} return final_map
Class variables
var concatenated_provider_names : str
-
Concatenated list of all of my provider's
Provider.name
with a pipe|
in-between.Only includes providers that are cachable. Cachable providers are ones assigned to myself after any providers list first that have
Provider.query_before_cache_if_possible
set to True.Starting with the first provider in my list has
Provider.query_before_cache_if_possible
set to False (default) we will consider them cachable.Normally, the
EnvironmentalProvider
provider is the only non-cacheable provider, and normally it's listed first.This means that we will normally not cache values from this EnvironmentalProvider. If the EnvironmentalProvider happens to be after a cachable provider, we will include it as one of the keys in range-key of the items that gets cached into the Dynamo config cache table.
This is because finding a value in some other provider before looking at EnvironmentalProvider can effect the results since we would not look in the EnvironmentalProvider in that case. As would finding a value in the environmental provider would prevent looking at other providers.
Therefore it might effect the results. Just to be on safe side we use EnvironmentalProvider as one of the range cache keys in this situations.
But like I said previously, normally the
EnvironmentalProvider
is the first provider and so is not included in the finalconcatenated_provider_names
list. var have_any_cachable_providers : bool
-
If any providers have
Provider.query_before_cache_if_possible
set to `False this will beTrue
.If all providers have
Provider.query_before_cache_if_possible
set toTrue
then this will set toFalse
. var providers : Iterable[Union[Provider, Type[Provider]]]
-
This will be a tuple of ordered providers. When you create a ProviderChain, you can give it a class or objects.
It will convert any class types passed in into a proper object for you automatically via the current context [as a resource].
Methods
def get_item(self, name: str, directory_chain: DirectoryChain, cacher: ProviderCacher = None, environ: Directory = None) ‑> DirectoryItem
-
Goes though passed in directory_chain, querying each provider in
DirectoryChain.providers
for a value. If it finds one, that's what we will return. Otherwise None.We will check with the passed in cacher at the appropriate time as we go though our own providers via
DirectoryChain.providers
.If needed we will tell the cacher before we return to cache the values we find.
Expand source code
def get_item( self, name: str, directory_chain: DirectoryChain, cacher: ProviderCacher = None, environ: Directory = None ) -> DirectoryItem: """ Goes though passed in directory_chain, querying each provider in `DirectoryChain.providers` for a value. If it finds one, that's what we will return. Otherwise None. We will check with the passed in cacher at the appropriate time as we go though our own providers via `DirectoryChain.providers`. If needed we will tell the cacher before we return to cache the values we find. """ use_cacher = (cacher and environ) items_cache = {} item = None places_checked = [] for directory in directory_chain.directories: item = None for provider in self._providers_with_cacher( directory_chain=directory_chain, cacher=cacher, environ=environ ): item = provider.get_item( name=name, directory=directory, directory_chain=directory_chain, provider_chain=self, environ=environ ) had_error = provider.directory_has_error(directory) if had_error: result = "error" elif item and item.directory and item.directory.is_non_existent: result = "found(cached-as-non-existent)" elif item: result = "found" else: result = "not-found" places_checked.append( f"{provider.name}:{directory.path} | result={result}" ) if item is not None: break if use_cacher and item and not item.cacheable: # Optimization: Don't spend time looking at what cacher could send if our # value is not cacheable [probably an environmental var]. use_cacher = False # Priority for items is given to directories order, keep what we've already got # over the new stuff from a lower-priority directory. # # We do this so we can cache as much as we can at a time, otherwise we would be # caching with many single items with single requests at a time. # # The other option is to have the app/service that uses us use a batch-writer # for the ConfigDynamoTable, which might be better [so we only write exactly what # we need based on what got looked up]. # # todo: [see discussion above] # Consider if we should batch-write the whole time the program is running # or if we should collect everything we can based on what we current have # looked up and writing it to cache [only things that have not been in cache # previously]. But we would need a way to force-write the current batch when # we are done [think about it]. if use_cacher: items_cache = { **self.retrieved_items_map(directory=directory), **items_cache } if item: break # If we did not find the item, create a 'nonExistent' item in it's place. if not item: item = DirectoryItem(None, name, value=None, source=f"/_nonExistent") item.add_supplemental_metadata("locations_searched", places_checked) if use_cacher and item.cacheable: items_cache[item.name] = item cacher.cache_items( items_cache.values(), provider_chain=self, directory_chain=directory_chain, environ=environ ) return item
def retrieved_items_map(self, directory: DirectoryOrPath) ‑> Mapping[str, DirectoryItem]
-
Will return a read-only lower-case item name TO item mapping by going through each provider in my chain, starting with the highest priority and calling
retrieved_items_map()
on them and collecting the results into a single dict that I'll return.Keep in mind that if a provider has not retrieved anything yet, I'll stop and return what I have at that point. These providers can be shared with other provider chains and if a lower-priority provider has retrieved their values before a higher-priority, we could end up with the wrong values. Since I stop at the first provider that has not retrieved the passed in directory yet, we are protected from that possibility.
Expand source code
def retrieved_items_map( self, directory: DirectoryOrPath ) -> Mapping[str, DirectoryItem]: """ Will return a read-only lower-case item name TO item mapping by going through each provider in my chain, starting with the highest priority and calling `retrieved_items_map()` on them and collecting the results into a single dict that I'll return. Keep in mind that if a provider has not retrieved anything yet, I'll stop and return what I have at that point. These providers can be shared with other provider chains and if a lower-priority provider has retrieved their values before a higher-priority, we could end up with the wrong values. Since I stop at the first provider that has not retrieved the passed in directory yet, we are protected from that possibility. """ final_map = {} for provider in self.providers: provider_map = provider.retrieved_items_map(directory) if provider_map is None: # We stop when we encounter a provider that has not retrieved # the directory listing yet [safety mechanism, see doc comment above]. break # `final_map` is second, so it overrides the `provider_map` dict. final_map = {**provider_map, **final_map} return final_map