Module xdynamo.client
Global variables
var dyn_client_options
-
Proxy to the current
DynClientOptions
currently used/injected at the current moment. Used this object use like you would use normal instance of DynClientOptions.
Classes
class DynClient (api: RemoteApi[M])
-
Skeleton/Placeholder Class for future work, see story and the other classes in this file for more details: https://app.clubhouse.io/xyngular/story/13989
Args
api
- The
xmodel.remote.api.RemoteApi
object that is creating this object.
Expand source code
class DynClient(RemoteClient[M]): """ Skeleton/Placeholder Class for future work, see story and the other classes in this file for more details: https://app.clubhouse.io/xyngular/story/13989 """ # This is only here to give IDE's a more concrete-class to use for type/code completion. # The type-hint is not otherwise used. That var is valid/gettable on an instance. # See `xmodel.remote.client.RemoteClient.api` for more details. api: 'DynApi[M]' def delete_obj(self, obj: Union[M, DynKey], *, condition: Query = None): """ Attempts to delete passed in object from dynamodb table. If you pass in a condition, it will be evaluated on the dynamodb-service side and the deletes will only happen if the condition is met. This can help prevent race conditions if used correctly. If there is a batch-writer currently in use, we will try to use that to batch the deletes. Keep in mind that if you pass in a `condition`, we can't use the batch write. We will instead send of a single-item delete request with the condition attached (bypassing any current batch-writer that may be in use). Args: obj: Object to delete, it MUST have values for it's primary key(s) (no other values are required, the primary-key values are the only ones used). condition: Optional, if passed in we will send this as a `ConditionExpression` to dynamo. You can pass in a Query, ie: the same structure that is used for `self.query()` and `self.scan()`. We will take the Query correctly format it for you into the `ConditionExpression`. If the condition is met, the item will be deleted. If the condition is NOT met, we will handle it for you by catch the exception, logging about it, setting error on response_state of object, and then returning. Conditions are generally used to help prevent race-conditions, ie: to prevent a deleted on purpose. Most of the time the intent is to ignore what the result is, it was either deleted or not; ie: it will eventually be handled in some way it if it was not deleted. If you want to know when a conditional delete fails, you can look at deleted objects `obj.api.response_state`. It's `had_error` will be `True`, there will be some info in `errors`, but also a field error with name `_conditional_check` and code `failed` you can easily check for via: `....response_state.has_field_error('_conditional_check', 'failed')` or you can use `xyn_model_dynamo.const.CONDITIONAL_CHECK_FAILED_KEY` for the field key. """ # Reset object error response state, we are try afresh. obj.api.response_state.reset() # Get the DynKey # If we don't get a dyn-key, check for that and raise nicer, higher-level error. key = obj if isinstance(obj, DynKey) else DynKey.via_obj(obj) params = { 'Key': key.key_as_dict(), } # To keep things simple, I am using 'put' which replaces entire item, # so get all properties of item regardless if they changed or not. # todo: Check for primary key and raise a nicer, higher-level exception in that case. if not condition: resource = self._table_or_batch_writer() resource.delete_item(**params) return # Add the conditional query to the dynamodb params dict... # (can't batch-delete things with conditions, so we do them one at a time instead). self._add_conditions_from_query( query=condition, params=params, filter_key='ConditionExpression', consistent_read=False ) try: self.api.table.delete_item(**params) except dynamodb.meta.client.exceptions.ConditionalCheckFailedException as e: # By default, we ignore conditional check failed, because this means we did not want # to delete the item on purpose; ie: it's not an error. # conditions are mostly used to prevent race-conditions, hence ignored by default. # If someone in the future ends up wanting the exception, we can have an argument # or some other way to indicate that we should re-raise the condition check exception. # See `self.delete_obj` doc comment (above) for more details. log.info( f"Didn't delete dynamo obj ({obj}) due to condition not met ({condition}), " f"exception from dynamo ({e})." ) state = obj.api.response_state state.had_error = True state.errors = [ 'The conditional check failed, item was not deleted.', {'conditional-check-failed-exception': e} ] state.add_field_error(field=CONDITIONAL_CHECK_FAILED_KEY, code='failed') def delete_objs(self, objs: Sequence[Union[M, DynKey]], condition: Query = None): """ Uses a batch-writer to put the items. Much more efficient than doing it one at a time. If you pass in a `condition`, the batch-writer can't be used and normal single-deletes will automatically be used instead. If you only give me one item, directly calls `delete_obj` without a batch-writer. """ if not objs: return if len(objs) == 1: self.delete_obj(obj=objs[0], condition=condition) return with _DynBatchResource.grab().current_writer(create_if_none=True): for i in objs: self.delete_obj(obj=i, condition=condition) # todo: Someday support an iterable for `objs` # (ie: open it wider then a Sequence, ie: to support generators). def send_objs( self, objs: Sequence[M], *, condition: Query = None, url: UrlStr = None, send_limit: int = None ): """ Used to send any number of objects to Dynamo in as efficient a manner as possible. If possible, uses a batch-writer to put the items. It's WAY more efficient than doing it one at a time. If a condition is supplied, won't use a batch-writer as only one item can be put at a time with a condition. In the future at some point, will support transactions and at that point could put more items per-request at a time with condition(s). Args: objs: Objects to send to dynamo. url: Not used in Dynamo, ignore send_limit: Currently unused, we try to push as much as possible. condition: Optional, if passed in we will send this as a `ConditionExpression` to dynamo. You can pass in a Query, ie: the same structure that is used for `self.query()` and `self.scan()`. We will take the Query correctly format it for you into the `ConditionExpression`. If the condition is met, the item will be put into table. If the condition is NOT met, we will handle it for you by catch the exception, logging about it, setting error on response_state of object, and then returning. Conditions are generally used to help prevent race-conditions, ie: to prevent a put on purpose; so we don't raise an exception. Most of the time the intent is to ignore what the result is, it was either put into table or not; ie: it will eventually be handled in some way it if it was not put into table. If you want to know when a conditional delete fails, you can look at deleted objects `obj.api.response_state`. It's `had_error` will be `True`, there will be some info in `errors`, but also a field error with name `_conditional_check` and code `failed` you can easily check for via: `....response_state.has_field_error('_conditional_check', 'failed')` or you can use `xdynamo.const.CONDITIONAL_CHECK_FAILED_KEY` for the field key. """ if not objs: return if len(objs) == 1: self._put_item(item=objs[0], condition=condition) return if condition: for i in objs: self._put_item(item=i, condition=condition) return with _DynBatchResource.grab().current_writer(create_if_none=True): for i in objs: self._put_item(item=i) def get( self, query: Query = None, *, top: int = None, fields: FieldNames = Default, allow_scan: bool = False, consistent_read: bool | DefaultType = Default, reverse: bool = False, **dynamo_params, ) -> Iterable[M]: """ This is the standard/abstract interface method that all RestClient's support. The idea behind this method is to figure out how to route this query in the most efficient manner we can do it in. If you want to guarantee a specific type of query you can use `DynClient.query`, etc. Generally, this is the best, generally method to use since it can adapt your request to the most efficient way to query Dynamo. The `DynApi` will use this method generally when it's asked to get something (for example, when using `DynApi.get_via_id`). For more info on how you can query, see: - [Advanced Queries](#advanced-queries) - [Examples](#examples_1) Here is the general process this method goes though to determine how to query dynamo for a given query: 1. If provided query is empty. - Will paginate though all items in table efficiently; multiple items will be returned per-request in parallel. You'll get back a generator that gives you an object at time, but behind the scenes we are getting a page at a time from dynamo. 2. We will try to batch it en-mass in parallel if we can. We can do this if one of the following is true: - Query only contains 'id' key; this is because an 'id' should have full primary key; meaning it will contain both hash and range key (if needed). You can provide a list of strings or `DynKey` objects. - See `DynModel.id` for more details on how to use `str` with an 'id'. - See `DynKey` for more details on what a composite primary-key as a `str` is like. - Table only has a hash-key (no range/sort key) and you provide nothing but hash-keys (ie: no other attributes in query) 3. Next, we see if we can use a Dynamo query via `DynClient.query`. - This will use multiple queries if/as needed, but while minimizing the number of queries that are needed. Just depends on the query that is provided. More then one query is needed if there is more then one hash-key in the query, and there are other attributes you are filtering on (and so #2 above, batch-get, can't be used) Doing multiple queries is still far-faster then doing a scan. We return objects via the generator and only execute the second (or more) queries - In the future, this will automatically querying a Secondary Index if one is available. (Not Implemented Yet, we will add this when needed) 4. Next, fallback to using a Global Index if one is available. - (Not Implemented Yet, we will do it when we need it) 5. If that's not possible, in the future we fallback to scan operation. - (Not Implemented Yet; This isn't implemented currently and will raise NotImplemented. Scan operations can be very slow and so it's not something we really want to do normally... I did implement this if getting everything, but with filters/attributes to query on I decided to wait until we need it before implementing this currently). .. todo:: Implement querying via Secondary/Global indexes; Scanning with query values. see above for details. Args: query: Dict keys are the attribute/key names and values are what to filter on for that name. Operators after double `__` work just like you would expect for our xyngular API's... here is an example: ```python { "some_attr_name__gte": 2 } ``` In this case, we look for `some_attr_name` greater than or equal to integer `2`. You MUST provide at least one value for the hash key of the table for right now. In the future, we will support doing a table-scan to support queries without a hash-key. But for right now it's required. For more information with examples see [Advanced Queries](#advanced-queries). top: This is supposed to only return this mean records; currently not implemented. fields: This is supposed to only retrieve provided field names in returned objects; currently not implemented. allow_scan: Defaults to False, which means this will raise an exception if a scan is required to execute your get. Set to True to allow a scan if needed (it will still do a query, batch-get, etc; if it can, it only does a scan if there is no other choice). If the query is blank or None will still do a scan regardless of what you pass (to return all items in the table). consistent_read: Defaults to Model.api.structure.dyn_consistent_read, which can be set via class arguments when DynModel subclass is defined. You can use this to override the model default. True means we use consistent reads, otherwise false. reverse: Defaults to False, which means sort order is ascending. Set to True to reverse the order, which will set the "ScanIndexForward" parameter to False in the query. **dynamo_params: Extra parameters to include on the Dynamo request(s) that are generated. This is 100% optional. Returns: """ if reverse and (not query or allow_scan): log.warning('The `reverse` param has been set along with no query or allow_scan=True. ' 'There is no way to Scan in reverse.') if not query: # If no query.... just get all items via a bulk-scan. return self._get_all_items(consistent_read=consistent_read) # todo: We want basic logic in here to decide on batch_get vs query # vs [eventually] dyn_scan. # We sort of implement basic logic here to determin # the best way to query dynamo so we have that logic mostly centralized instead # of scattered everywhere. The logic is cetnered around how we will mostly be using # dynamo. The times where we have exceptions, the outside world can just directly # call `dyn_scan/query` them selves and customize the dynamo call more. # `get` is the thing that figured out the best to this to in the general # case. # Future Vision of logic flow [this is not fully implemented or final yet]: # 1. Only have id/hash/range keys # A. Have more than one id/hash/range keys. # I. We can use the batch method. # B. Use query # 2. Have id/hash + other attributes # A. More than one hash # I. Use several query calls. # B. Only one hash # I. Use single query call. # 3. Have no hash key, just other attributes. # A. Must use `scan`. # I. This is just something we will do later, don't need `scan` at moment. # query = _ProcessedQuery.process_query(query, api=self.api) structure = self.api.structure have_range_key = bool(structure.dyn_range_key_name) # Check to see if we only have key fields (without other filtering criteria); # if so we can do a batch-get, which is the fastest way to get a number of specific # values. If we if query.contains_only_keys(): dyn_keys = query.dyn_keys() # If we have no range-key, they all dyn-gets support get-batch. # it's only the lack of range-key, or a non 'eq' operator for range-key # that would disqualify a specific DynKey. # # todo: Some of these keys could support batch-get, we might consider getting # the ones that do via batch-get, get others via query? all_support_batch_get = True if have_range_key: for dyn_key in dyn_keys: if not dyn_key.range_key: all_support_batch_get = False break if dyn_key.range_operator and dyn_key.range_operator not in ('eq', 'is_in'): all_support_batch_get = False break # We have just DynKey's, so we can do a batch get (no other conditions/filters). # This will automatically batch a 100 at a time for us via a generator. # Dynamo will fetch these in parallel! # TODO: If we only have one key, use `get_item` instead of `batch_get_item`. if all_support_batch_get and dyn_keys: return self.batch_get(keys=dyn_keys, consistent_read=consistent_read, reverse=reverse) # If we have some sort of key(s) we can use (a hash key with an optional range key). if query.dyn_keys(): # todo: Support `top` and `fields`. # todo: Support multiple hash-keys [one query per hash key]. # todo: unless this table does not have a range key [no hash/range to tie]. # We have a query that has the hash-key in it, that's good enough to use a query. return self.query(query=query, consistent_read=consistent_read, reverse=reverse) if allow_scan: return self.scan(query=query, consistent_read=consistent_read) # todo: Support 'scans' or always raise error? Scans are very expensive. # todo: Support Global + Secondary Indexes # (secondary indexes are partially supported now, since they require the hash-key # so we would currently do a query with a filter, and scan whole hash-key/page). raise NotImplementedError( "There are no hash-keys or id's in query, and I don't support auto routing to a scan " "when `allow_scan` is False, or using a global indexes at the moment. " "This is what you need to do without a hash-key/id. " "Scan operations are slow, so for being conservative to prevent accidentally doing " "one. For now you need to explicitly do them via self.scan or pass in True to the " "`allow_scan` parameter of this `get` method; unless your " "retrieving all records (ie: blank query). This may change in the future, but for " "now a scan operation requires an explicit opt-in." "\n" "TODO: Support Global Indexes - if there is a global index " "then we should use them if query is using the global-index hash-key and other" "attributes are in global-index as well." ) def batch_get( self, keys: Iterable[DynKey], *, consistent_read: bool | DefaultType = Default, reverse: bool = False, **params: DynParams, ) -> Iterable[M]: """ Will fetch keys in the largest batch it can at a time it can from Dynamo; Dynamo will fetch each page of values in parallel! We split up the keys into 100 increments at a time automatically right now For each unique hash key in the set of keys provided, Dynamo will parallel fetch the keys (if two keys have the same hash but different range key, dynamo will do them sequentially). In the future, we may attempt to fetch multiple 100 blocks of keys asynchronously. As the returned generator/iterable is gone through to increase the speed. We don't do that yet. Args: keys (Iterable[DynKey]): Keys to fetch. Can be of any size, a generator will be returned to minimize memory use. .. tip:: If you pass in a `set`, we will be slightly more efficient. We need to ensure the results are uniquified, if you pass a set we can skip doing it. consistent_read: Defaults to Model.api.structure.dyn_consistent_read, which can be set via class arguments when DynModel subclass is defined. You can use this to override the model default. True means we use consistent reads, otherwise false. reverse: Defaults to False, which means sort order is ascending. Set to True to reverse the order, which will set the "ScanIndexForward" parameter to False in the query. **params: An optional set of extra parameters to include in request to Dynamo, if so desired. Returns: An Iterable/Generator that will efficiently paginate though the results for you. """ structure = self.api.structure hash_name = structure.dyn_hash_key_name range_name = structure.dyn_range_key_name base_params = {**params} if consistent_read is Default: consistent_read = self.consistent_reads if not keys: return [] def batch_pagination_generator(items): if not items: return xloop() table_name = structure.fully_qualified_table_name() # We want to merge our items with anything that could already be there... copy_params = base_params.copy() req_items_param = copy_params.setdefault('RequestItems', {}) table_items = req_items_param.setdefault(table_name, {}) if consistent_read: table_items['ConsistentRead'] = True if reverse: params['ScanIndexForward'] = False table_keys = table_items.setdefault('Keys', []) table_keys.extend(items) # batch_get_item is only available on dynamo-resource, not table-resource. return self._paginate_all_items_generator( method='batch_get_item', params=copy_params, use_table=False ) # Go though all the keys and grab them 100 at a time from Dynamo. # Dynamo only supports a max of 100 keys at a time when doing a 'batch_get_item'. items_requested = [] have_range = bool(range_name) uniquified_keys = keys if not isinstance(keys, set): uniquified_keys = set(uniquified_keys) uniquified_keys = list(uniquified_keys) for i in range(0, len(uniquified_keys), 100): key_subset = [key.key_as_dict() for key in set(uniquified_keys[i:i + 100])] for x in batch_pagination_generator(list(key_subset)): yield x def _parse_keys_from_query(self, query: Query) -> Optional[List[DynKey]]: query = _ProcessedQuery.process_query(query, api=self.api) def _parse_id( self, _id: Union[str, Iterable[str]] ) -> List[DynKey]: keys = [] if not _id: return keys api = self.api for current_id in xloop(_id, iterate_dicts=True): keys.append(DynKey(api=api, id=current_id)) if not keys: return keys return keys @property def consistent_reads(self) -> bool: # Check for injected default, use that if it exists. injected_default = dyn_client_options.consistent_read if injected_default is not Default: return injected_default # Otherwise use the model's default for consistent reads. return self.api.structure.dyn_consistent_read or False def query( self, query: Query = None, *, consistent_read: bool | DefaultType = Default, reverse: bool = False, **dynamo_params: DynParams ) -> Iterable[M]: """ Forces `DynClient` to use a query. If you want a way for client to automatically figure out the best way to execute your query, use one of these instead: - `DynApi.get` - `DynClient.get` For more info see: - [Advanced Queries](#advanced-queries) - [Examples](#examples_1) For a quick summary on how to provide query, see 'query' argument doc (just a few lines down). But I would highly recommend looking at [Advanced Queries](#advanced-queries) for more details with examples! Args: api (DynApi): BaseApi object to use, this is how we know the table name, model class, etc. query (Query): You can give a simple dict here, modeled after how the standard rest-api query dict's work. Dict keys are the attribute/key names and values are what to filter on for that name. Operators after double `__` work just like you would expect for our xyngular API's... here is an example: ```python { "some_attr_name__gte": 2 } ``` In this case, we look for `some_attr_name` greater than or equal to integer `2`. You MUST provide at least one value for the hash key of the table when using `query` or boto3/dynamo will raise an exception. This is an easy way to fill out `KeyConditionExpression` and/or `FilterExpression`. This method can figure out which attribute goes with which one and construct both expressions as needed. For more information with examples see [Advanced Queries](#advanced-queries). consistent_read: Defaults to Model.api.structure.dyn_consistent_read, which can be set via class arguments when DynModel subclass is defined. You can use this to override the model default. True means we use consistent reads, otherwise false. reverse: Defaults to False, which means sort order is ascending. Set to True to reverse the order, which will set the "ScanIndexForward" parameter to False in the query. **params (DynParams): You can provide other standard boto3 query parameters here as you need. If you provide both dynamo_params and query, the ones in query will overwrite ones in dynamo_params if there is a conflict; The `query` param could use either a `KeyConditionExpression` or `FilterExpression` depending on what attributes are in the query dict. Yields: M: (DynModel subclass instances) - The next object we got from dynamo. This method returns a generator that will eventually go through all the results from dynamo in a memory-efficient manner. """ # todo: support in/lists as values.... # todo: support `id`. structure = self.api.structure hash_key = structure.dyn_hash_key_name query = _ProcessedQuery.process_query(query, api=self.api) # if 'id' in query: # # # hash_key in query: # 1. If we have 'id', iterate though that and get DynKey's out of them # 2. Look at hash/range keys and try to match them up if they are lists into DynKey's # 3. Considering auto-finding out if we have a list of keys and can just do batch-get # Query for each dyn-key we find. keys = query.dyn_keys() if not keys: raise XRemoteError( "query got called with a query that had no valid DynKey's in it. " "This means we could not find any part(s) of the primary key we could use to do " "a query on (a Dynamo query requires at least a hash-key). " "" "If you have no conditions and just want to " "simply retrieve every item in the table use `DynClient.get` with no parameters. " "" "If you do have conditions/filters in query you need to do a `DynClient.dyn_scan` " "and have Dynamo scan the entire table. This will allow dynamo to evaluate your " "conditions on every item in the table." ) for dyn_key in query.dyn_keys(): params = {**dynamo_params} self._add_conditions_from_query( query=query, params=params, dyn_key=dyn_key, consistent_read=consistent_read, reverse=reverse ) for value in self._paginate_all_items_generator(method='query', params=params): yield value def scan( self, query: Query = None, *, consistent_read: bool | DefaultType = Default, **dynamo_params: DynParams ) -> Iterable[M]: """ Scans entire table (vs doing a `DynClient.query`, which is much more efficient). Looks at every item in the table, evaluating `query` to filter which ones to return. The scanning/filtering happens on the server-side. If provided query is empty, will return all items in the table. """ params = {**dynamo_params} self._add_conditions_from_query( query=query, params=params, consistent_read=consistent_read, ) return self._paginate_all_items_generator(method='scan', params=params) def _add_conditions_from_query( self, query: Query, params: DynParams, dyn_key: DynKey = None, filter_key: str = 'FilterExpression', consistent_read: bool | DefaultType = Default, reverse: bool = False ): if consistent_read is Default: consistent_read = self.consistent_reads if consistent_read: params['ConsistentRead'] = True if reverse: params['ScanIndexForward'] = False if not query and not dyn_key: return query = _ProcessedQuery.process_query(query, api=self.api) key_names: Set[str] = set() api = self.api structure = self.api.structure if dyn_key: key_names.add('id') key_names.add(structure.dyn_hash_key_name) range_name = structure.dyn_range_key_name if range_name: key_names.add(range_name) def add_criterion(cond_list, condition_base, name, operator, value): # It just so happens the basic Django filter operators are generally named the same # as the ones in the boto3 dynamo library. So we grab the condition/operator # via the same names. _ProcessedQuery will normalize the names for us. # exists/not_exists don't require a 'value' parameter, # so we need to interpret/parse query value ourselves and do the right thing. pre_lookup_operator = operator operator_needs_param = True if operator == 'exists': operator_needs_param = False if not bool_value(value): # False value, so swap to the inverse/opposite operator. operator = 'not_exists' elif operator == 'not_exists': operator_needs_param = False if not bool_value(value): # False value, so swap to the inverse/opposite operator. operator = 'exists' elif operator == 'range': operator = 'between' # Construct condition by allocating base, grabbing operator and assigning value. op_name = operator operator = getattr(condition_base(name), operator, None) condition = None if operator: field = structure.get_field(name) if operator_needs_param and field and field.converter: if isinstance(value, list) and op_name == 'between': sub_results = [] for sub_v in value: sub_results.append(field.converter(api, Converter.Direction.to_json, field, sub_v)) value = sub_results else: value = [field.converter(api, Converter.Direction.to_json, field, value)] operator_params = value if operator_needs_param else [] condition = operator(*operator_params) # If we found a condition operator, use it. # Otherwise, we construct and raise a helpful error message. if operator is not None: cond_list.append(condition) return # Get all available conditions/operators from boto3 class so we can list them # in the exception message. available = [ f for f in dir(condition_base) if callable(getattr(condition_base, f)) and not f.startswith("__") ] supplemental_msg = "" if condition_base is conditions.Key: supplemental_msg = ( f"Attr ({name}) is part of primary key, there are reduced " f"operators available for keys when using a query. " f"We could route this to a 'scan' operation, that would work.... " f"Right now we don't automatically route this to a 'scan' operation " f"because that's much slower and you probably really are wanting " f"to do a query. You can use `dyn_scan` your self directly if that's " f"what you really want to do. Or we could implement an Option/Flag to " f"auto-route to a scan operation when needed." ) raise XRemoteError( f"Using unknown boto3/dynamo operator ({pre_lookup_operator}), " f"for query on attr ({name}); " f"the available ones are ({available}). " f"{supplemental_msg}" ) filters = [] keys = [] for (name, criterion) in query.items(): # TODO: If there are other filters beyond what we use for the RANGE-KEY query, # we should not skip them and still add them as regular attribute filters. # ___ # Update: You can't add regular filters for the hash/range, but I could query for them # and then filter the results further myself here on the Python side. if name in key_names: # This is handled later... continue for (operator, value) in criterion.items(): add_criterion( cond_list=filters, condition_base=conditions.Attr, name=name, operator=operator, value=value ) # Add the dyn-key conditions if needed... if dyn_key: add_criterion( cond_list=keys, condition_base=conditions.Key, name=structure.dyn_hash_key_name, operator='eq', value=dyn_key.hash_key ) range_key = dyn_key.range_key if range_key: operator = dyn_key.range_operator or 'eq' # If we have an 'in' operator, we translate that to 'eq' for this purpose. # We should be called with separate values if there is more than one dyn_key, # and so are 'simulating' the `is_in` operator aspect. if operator == 'is_in': operator = 'eq' add_criterion( cond_list=keys, condition_base=conditions.Key, name=structure.dyn_range_key_name, operator=operator, value=range_key ) params_to_mod = (('KeyConditionExpression', keys), (filter_key, filters)) for (param_key, exp_list) in params_to_mod: for key in exp_list: exp = params.get(param_key) exp = exp & key if exp is not None else key params[param_key] = exp def _table_or_batch_writer(self) -> Union[BatchWriter, TableResource]: """ Gets either a table or a batch-writer. So you should only call methods that are supported by a BatchWriter on this [since it could be one]. All BatchWriter methods are also supported by a Dynamo TableResource so you'll be safe as long as you limit calls to what BatchWriter supports. """ batch_writer = _DynBatchResource.grab().current_writer() if batch_writer: return batch_writer.batch_writer(api=self.api) return self.api.table # todo: BaseModel objects are capable of letting us know if something actually changed or not. # At some point take advantage of that. # This would allow us to prevent putting an unchanged item into dynamo [saves cost]. def _put_item(self, item: 'DynModel', condition: Query = None): """ Put item into dynamo-table. WON'T use any current batch-writer if a condition is supplied. If a condition is supplied, we always use the table and execute put immediately. Object will only be sent if there are any changes in object (compared to what was originally retrieved from table). If there is a change, entire object with all current attributes will be sent (a put fully replaces the item in table, it's not a PATCH). Condition will be sent with put if provided, it will be checked against anything that is currently in table before it's replaced in a transaction-safe way. Args: item: Item to put into dynamo table; if condition not supplied we check for a current batch-writer resource and use that if there is one. condition: Conditional query; query is sent to dynamodb, and it will check condition against any existing item in a transaction-safe way. If condition checks out then the put is done. Otherwise, it won't be. If it is not, we add a field-error to object to indicate conditional check failed: `item.api.response_state.add_field_error('_conditional_check', 'failed')` or you can use `xyn_model_dynamo.const.CONDITIONAL_CHECK_FAILED_KEY` for the field key. """ # Check to see if there is anything I actually need to send. if not item.api.json(only_include_changes=True, log_output=True, include_removals=True): log.info(f"Dynamo - {item} did not have any changes to send, skipping.") return structure = self.api.structure hash_name = structure.dyn_hash_key_name if not getattr(item, hash_name, None): raise XRemoteError(f"Item {item} needs a value for hash key ({hash_name}).") range_name = structure.dyn_range_key_name if range_name and not getattr(item, range_name, None): raise XRemoteError(f"Item {item} needs a value for range key ({range_name}).") # To keep things simple, I am using 'put' which replaces entire item, # so get all properties of item regardless if they changed or not. # todo: Check for primary key and raise a nicer, higher-level exception in that case. item.api.response_state.reset() params = { "Item": item.api.json() } resource = self._table_or_batch_writer() if condition: # Can't batch-put things with conditions, so we do them one at a time instead. resource = self.api.table # Add the conditional query to the dynamodb params dict... self._add_conditions_from_query( query=condition, params=params, filter_key='ConditionExpression', consistent_read=False ) # Finally, tell the boto resource to put the item: try: resource.put_item(**params) except dynamodb.meta.client.exceptions.ConditionalCheckFailedException as e: # By default, we ignore conditional check failed, because this means we did not want # to delete the item on purpose; ie: it's not an error. # conditions are mostly used to prevent race-conditions, hence ignored by default. # If someone in the future ends up wanting the exception, we can have an argument # or some other way to indicate that we should re-raise the condition check exception. # See `self.delete_obj` doc comment (above) for more details. log.info( f"Didn't send/put dynamo obj ({item}) due to condition not met ({condition}), " f"exception from dynamo ({e})." ) state = item.api.response_state state.had_error = True state.errors = [ 'The conditional check failed, item was not deleted.', {'conditional-check-failed-exception': e} ] state.add_field_error(field=CONDITIONAL_CHECK_FAILED_KEY, code='failed') def _get_all_items(self, consistent_read: bool | DefaultType = Default): params = {} if consistent_read is Default: consistent_read = self.consistent_reads if consistent_read: params['ConsistentRead'] = True return self._paginate_all_items_generator(method='scan', params=params) def _paginate_all_items_generator( self, *, method: str, params: Dict[str, Any], use_table=True, ) -> Iterable[M]: api = self.api model_type = api.model_type # Get table name, and also ensures table exists. table = api.table table_name = table.name resource = table if use_table else DynamoDB.grab().db while True: table_method = getattr(resource, method) # Execute Scan/Query on table: response = table_method(**params) last_key = response.get('LastEvaluatedKey', None) db_datas = response.get('Items') if not db_datas: responses: Dict[str, List] = response.get("Responses") if responses: db_datas = responses[table_name] else: db_datas = tuple() for data in db_datas: yield model_type(data) if last_key: params['ExclusiveStartKey'] = last_key continue unprocessed = response.get('UnprocessedKeys') if not unprocessed: return # We need to try the fetch again for the remaining items... # todo: Boto/AWS recommend an exponential backoff when we retry in this case... # see BatchGetItem / batch_get_item params['RequestItems'] = unprocessed continue def create_table(self): """ This is mainly here to create table when mocking aws for unit tests. If the table really does not exist in reality, this also can create it. Time To Live Notes: We can't enable TimeToLive during table creation, we have to wait until after it's created. This is a minor issue, since the table will still function correctly, the queries will still filter out expired items like normal. The only difference is we could get charged extra for storage we are not using. We need to still filter items out of our queries because deletion does not happen immediately [could take up to 48 hours]. At this point, it's expected that you'll have to go into the AWS dynamo console to setup automatic TimeToLive item deletion for a table if you want this to create it for you. In reality, serverless framework is expected to setup the real tables for services that are running directly in aws; and that's where you should setup the TTL stuff for real tables. """ structure = self.api.structure hash_key = structure.dyn_hash_key_name hash_type = structure.get_field(hash_key).type_hint key_schemas = [ # Partition Key {'AttributeName': hash_key, 'KeyType': 'HASH'} ] attribute_definitions = [ { 'AttributeName': hash_key, 'AttributeType': get_dynamo_type_from_python_type(hash_type) } ] # If we have a range-key, add that in. range_key = structure.dyn_range_key_name if range_key: range_type = structure.get_field(range_key).type_hint key_schemas.append({ 'AttributeName': range_key, 'KeyType': 'RANGE' }) attribute_definitions.append({ 'AttributeName': range_key, 'AttributeType': get_dynamo_type_from_python_type(range_type) }) return DynamoDB.grab().db.create_table( TableName=structure.fully_qualified_table_name(), KeySchema=key_schemas, AttributeDefinitions=attribute_definitions, BillingMode='PAY_PER_REQUEST', Tags=[{'Key': 'DDBTableGroupKey', 'Value': xcon_settings.service}] )
Ancestors
- xmodel.remote.client.RemoteClient
- typing.Generic
Instance variables
prop api : RemoteApi[M]
-
Every RemoteModel has a class-based RemoteApi And RestClient instance, this will point to the class based RemoteApi instance, the same one you would get via
xmodel.remote.model.RemoteModel.api
.When
xmodel.remote.api.RemoteApi
creates us (it looks at type-hint onxmodel.remote.api.RemoteApi.client
) it passes it's self to us here.Only ONE
RestClient
class is allocated per-xmodel.remote.model.RemoteModel
class/type. The object instances of the RemoteModel all get a separate instance ofxmodel.remote.api.RemoteApi
; but all instances of that model type share the sameRemoteClient
instance viaxmodel.remote.api.RemoteApi.client
.The typehint on this var is only here to support the IDE, it allows the IDE to know what
xmodel.remote.model.RemoteModel
type this client is going to be using. IDE knows based on how you get the RestClient, when code uses thexmodel.remote.model.RemoteModel.api
attribute; the IDE knows the concrete genericM
type represents the realxmodel.remote.model.RemoteModel
subclass type (for type completion).Expand source code
@property def api(self) -> "RemoteApi[M]": """ Every RemoteModel has a class-based RemoteApi And RestClient instance, this will point to the class based RemoteApi instance, the same one you would get via `xmodel.remote.model.RemoteModel.api`. When `xmodel.remote.api.RemoteApi` creates us (it looks at type-hint on `xmodel.remote.api.RemoteApi.client`) it passes it's self to us here. Only ONE `RestClient` class is allocated per-`xmodel.remote.model.RemoteModel` class/type. The object instances of the RemoteModel all get a separate instance of `xmodel.remote.api.RemoteApi`; but all instances of that model type share the same `RemoteClient` instance via `xmodel.remote.api.RemoteApi.client`. The typehint on this var is only here to support the IDE, it allows the IDE to know what `xmodel.remote.model.RemoteModel` type this client is going to be using. IDE knows based on how you get the RestClient, when code uses the `xmodel.remote.model.RemoteModel.api` attribute; the IDE knows the concrete generic `M` type represents the real `xmodel.remote.model.RemoteModel` subclass type (for type completion). """ return self._api
prop consistent_reads : bool
-
Expand source code
@property def consistent_reads(self) -> bool: # Check for injected default, use that if it exists. injected_default = dyn_client_options.consistent_read if injected_default is not Default: return injected_default # Otherwise use the model's default for consistent reads. return self.api.structure.dyn_consistent_read or False
Methods
def batch_get(self, keys: Iterable[DynKey], *, consistent_read: bool | xsentinels.default.DefaultType = Default, reverse: bool = False, **params: Dict[str, Any]) ‑> Iterable[~M]
-
Will fetch keys in the largest batch it can at a time it can from Dynamo; Dynamo will fetch each page of values in parallel!
We split up the keys into 100 increments at a time automatically right now For each unique hash key in the set of keys provided, Dynamo will parallel fetch the keys (if two keys have the same hash but different range key, dynamo will do them sequentially).
In the future, we may attempt to fetch multiple 100 blocks of keys asynchronously. As the returned generator/iterable is gone through to increase the speed. We don't do that yet.
Args
keys
:Iterable[DynKey]
-
Keys to fetch. Can be of any size, a generator will be returned to minimize memory use.
Tip: If you pass in a
set
, we will be slightly more efficient.We need to ensure the results are uniquified, if you pass a set we can skip doing it.
consistent_read
-
Defaults to Model.api.structure.dyn_consistent_read, which can be set via class arguments when DynModel subclass is defined.
You can use this to override the model default. True means we use consistent reads, otherwise false.
reverse
- Defaults to False, which means sort order is ascending. Set to True to reverse the order, which will set the "ScanIndexForward" parameter to False in the query.
**params
- An optional set of extra parameters to include in request to Dynamo, if so desired.
Returns: An Iterable/Generator that will efficiently paginate though the results for you.
def create_table(self)
-
This is mainly here to create table when mocking aws for unit tests. If the table really does not exist in reality, this also can create it.
Time To Live Notes:
We can't enable TimeToLive during table creation, we have to wait until after it's created. This is a minor issue, since the table will still function correctly, the queries will still filter out expired items like normal. The only difference is we could get charged extra for storage we are not using. We need to still filter items out of our queries because deletion does not happen immediately [could take up to 48 hours].
At this point, it's expected that you'll have to go into the AWS dynamo console to setup automatic TimeToLive item deletion for a table if you want this to create it for you.
In reality, serverless framework is expected to setup the real tables for services that are running directly in aws; and that's where you should setup the TTL stuff for real tables.
def delete_obj(self, obj: Union[~M, DynKey], *, condition: Dict[str, Union[str, int, datetime.date, xurls.url._FormattedQueryValue, ForwardRef(None), uuid.UUID, Iterable[str | int | uuid.UUID | datetime.date | xurls.url._FormattedQueryValue]]] = None)
-
Attempts to delete passed in object from dynamodb table.
If you pass in a condition, it will be evaluated on the dynamodb-service side and the deletes will only happen if the condition is met. This can help prevent race conditions if used correctly.
If there is a batch-writer currently in use, we will try to use that to batch the deletes.
Keep in mind that if you pass in a
condition
, we can't use the batch write. We will instead send of a single-item delete request with the condition attached (bypassing any current batch-writer that may be in use).Args
obj
- Object to delete, it MUST have values for it's primary key(s) (no other values are required, the primary-key values are the only ones used).
condition
-
Optional, if passed in we will send this as a
ConditionExpression
to dynamo. You can pass in a Query, ie: the same structure that is used forself.query()
andself.scan()
.We will take the Query correctly format it for you into the
ConditionExpression
.If the condition is met, the item will be deleted.
If the condition is NOT met, we will handle it for you by catch the exception, logging about it, setting error on response_state of object, and then returning. Conditions are generally used to help prevent race-conditions, ie: to prevent a deleted on purpose.
Most of the time the intent is to ignore what the result is, it was either deleted or not; ie: it will eventually be handled in some way it if it was not deleted.
If you want to know when a conditional delete fails, you can look at deleted objects
obj.api.response_state
.It's
had_error
will beTrue
, there will be some info inerrors
, but also a field error with name_conditional_check
and codefailed
you can easily check for via:....response_state.has_field_error('_conditional_check', 'failed')
or you can use
xyn_model_dynamo.const.CONDITIONAL_CHECK_FAILED_KEY
for the field key.
def delete_objs(self, objs: Sequence[Union[~M, DynKey]], condition: Dict[str, Union[str, int, datetime.date, xurls.url._FormattedQueryValue, ForwardRef(None), uuid.UUID, Iterable[str | int | uuid.UUID | datetime.date | xurls.url._FormattedQueryValue]]] = None)
-
Uses a batch-writer to put the items. Much more efficient than doing it one at a time.
If you pass in a
condition
, the batch-writer can't be used and normal single-deletes will automatically be used instead.If you only give me one item, directly calls
delete_obj
without a batch-writer. def get(self, query: Dict[str, Union[str, int, datetime.date, xurls.url._FormattedQueryValue, ForwardRef(None), uuid.UUID, Iterable[str | int | uuid.UUID | datetime.date | xurls.url._FormattedQueryValue]]] = None, *, top: int = None, fields: Sequence[str] = Default, allow_scan: bool = False, consistent_read: bool | xsentinels.default.DefaultType = Default, reverse: bool = False, **dynamo_params) ‑> Iterable[~M]
-
This is the standard/abstract interface method that all RestClient's support.
The idea behind this method is to figure out how to route this query in the most efficient manner we can do it in. If you want to guarantee a specific type of query you can use
DynClient.query()
, etc.Generally, this is the best, generally method to use since it can adapt your request to the most efficient way to query Dynamo. The
DynApi
will use this method generally when it's asked to get something (for example, when usingDynApi.get_via_id
).For more info on how you can query, see:
Here is the general process this method goes though to determine how to query dynamo for a given query:
- If provided query is empty.
- Will paginate though all items in table efficiently; multiple items will be returned per-request in parallel. You'll get back a generator that gives you an object at time, but behind the scenes we are getting a page at a time from dynamo.
- We will try to batch it en-mass in parallel if we can.
We can do this if one of the following is true:
- Query only contains 'id' key; this is because an 'id' should have full primary key;
meaning it will contain both hash and range key (if needed).
You can provide a list of strings or
DynKey
objects.- See
DynModel.id
for more details on how to usestr
with an 'id'. - See
DynKey
for more details on what a composite primary-key as astr
is like.
- See
- Table only has a hash-key (no range/sort key) and you provide nothing but hash-keys (ie: no other attributes in query)
- Query only contains 'id' key; this is because an 'id' should have full primary key;
meaning it will contain both hash and range key (if needed).
You can provide a list of strings or
- Next, we see if we can use a Dynamo query via
DynClient.query()
.- This will use multiple queries if/as needed, but while minimizing the number of queries that are needed. Just depends on the query that is provided. More then one query is needed if there is more then one hash-key in the query, and there are other attributes you are filtering on (and so #2 above, batch-get, can't be used) Doing multiple queries is still far-faster then doing a scan. We return objects via the generator and only execute the second (or more) queries
- In the future, this will automatically querying a Secondary Index if one is available. (Not Implemented Yet, we will add this when needed)
- Next, fallback to using a Global Index if one is available.
- (Not Implemented Yet, we will do it when we need it)
- If that's not possible, in the future we fallback to scan operation.
- (Not Implemented Yet; This isn't implemented currently and will raise NotImplemented. Scan operations can be very slow and so it's not something we really want to do normally… I did implement this if getting everything, but with filters/attributes to query on I decided to wait until we need it before implementing this currently).
TODO
Implement querying via Secondary/Global indexes; Scanning with query values. see above for details.
Args
query
-
Dict keys are the attribute/key names and values are what to filter on for that name. Operators after double
__
work just like you would expect for our xyngular API's… here is an example:python { "some_attr_name__gte": 2 }
In this case, we look for
some_attr_name
greater than or equal to integer2
.You MUST provide at least one value for the hash key of the table for right now. In the future, we will support doing a table-scan to support queries without a hash-key. But for right now it's required.
For more information with examples see Advanced Queries.
top
- This is supposed to only return this mean records; currently not implemented.
fields
- This is supposed to only retrieve provided field names in returned objects; currently not implemented.
allow_scan
-
Defaults to False, which means this will raise an exception if a scan is required to execute your get. Set to True to allow a scan if needed (it will still do a query, batch-get, etc; if it can, it only does a scan if there is no other choice).
If the query is blank or None will still do a scan regardless of what you pass (to return all items in the table).
consistent_read
-
Defaults to Model.api.structure.dyn_consistent_read, which can be set via class arguments when DynModel subclass is defined.
You can use this to override the model default. True means we use consistent reads, otherwise false.
reverse
- Defaults to False, which means sort order is ascending. Set to True to reverse the order, which will set the "ScanIndexForward" parameter to False in the query.
**dynamo_params
- Extra parameters to include on the Dynamo request(s) that are generated. This is 100% optional.
Returns:
- If provided query is empty.
def query(self, query: Dict[str, Union[str, int, datetime.date, xurls.url._FormattedQueryValue, ForwardRef(None), uuid.UUID, Iterable[str | int | uuid.UUID | datetime.date | xurls.url._FormattedQueryValue]]] = None, *, consistent_read: bool | xsentinels.default.DefaultType = Default, reverse: bool = False, **dynamo_params: Dict[str, Any]) ‑> Iterable[~M]
-
Forces
DynClient
to use a query. If you want a way for client to automatically figure out the best way to execute your query, use one of these instead:DynApi.get
DynClient.get()
For more info see:
For a quick summary on how to provide query, see 'query' argument doc (just a few lines down). But I would highly recommend looking at Advanced Queries for more details with examples!
Args
api
:DynApi
- BaseApi object to use, this is how we know the table name, model class, etc.
query
:Query
-
You can give a simple dict here, modeled after how the standard rest-api query dict's work. Dict keys are the attribute/key names and values are what to filter on for that name. Operators after double
__
work just like you would expect for our xyngular API's… here is an example:python { "some_attr_name__gte": 2 }
In this case, we look for
some_attr_name
greater than or equal to integer2
.You MUST provide at least one value for the hash key of the table when using
query
or boto3/dynamo will raise an exception.This is an easy way to fill out
KeyConditionExpression
and/orFilterExpression
. This method can figure out which attribute goes with which one and construct both expressions as needed.For more information with examples see Advanced Queries.
consistent_read
-
Defaults to Model.api.structure.dyn_consistent_read, which can be set via class arguments when DynModel subclass is defined.
You can use this to override the model default. True means we use consistent reads, otherwise false.
reverse
- Defaults to False, which means sort order is ascending. Set to True to reverse the order, which will set the "ScanIndexForward" parameter to False in the query.
**params
:DynParams
-
You can provide other standard boto3 query parameters here as you need. If you provide both dynamo_params and query, the ones in query will overwrite ones in dynamo_params if there is a conflict;
The
query
param could use either aKeyConditionExpression
orFilterExpression
depending on what attributes are in the query dict.
Yields
M
- (DynModel subclass instances) - The next object we got from dynamo. This method returns a generator that will eventually go through all the results from dynamo in a memory-efficient manner.
def scan(self, query: Dict[str, Union[str, int, datetime.date, xurls.url._FormattedQueryValue, ForwardRef(None), uuid.UUID, Iterable[str | int | uuid.UUID | datetime.date | xurls.url._FormattedQueryValue]]] = None, *, consistent_read: bool | xsentinels.default.DefaultType = Default, **dynamo_params: Dict[str, Any]) ‑> Iterable[~M]
-
Scans entire table (vs doing a
DynClient.query()
, which is much more efficient). Looks at every item in the table, evaluatingquery
to filter which ones to return. The scanning/filtering happens on the server-side.If provided query is empty, will return all items in the table.
def send_objs(self, objs: Sequence[~M], *, condition: Dict[str, Union[str, int, datetime.date, xurls.url._FormattedQueryValue, ForwardRef(None), uuid.UUID, Iterable[str | int | uuid.UUID | datetime.date | xurls.url._FormattedQueryValue]]] = None, url: xurls.url.Url | str | None = None, send_limit: int = None)
-
Used to send any number of objects to Dynamo in as efficient a manner as possible.
If possible, uses a batch-writer to put the items. It's WAY more efficient than doing it one at a time.
If a condition is supplied, won't use a batch-writer as only one item can be put at a time with a condition.
In the future at some point, will support transactions and at that point could put more items per-request at a time with condition(s).
Args
objs
- Objects to send to dynamo.
url
- Not used in Dynamo, ignore
send_limit
- Currently unused, we try to push as much as possible.
condition
-
Optional, if passed in we will send this as a
ConditionExpression
to dynamo. You can pass in a Query, ie: the same structure that is used forself.query()
andself.scan()
.We will take the Query correctly format it for you into the
ConditionExpression
.If the condition is met, the item will be put into table.
If the condition is NOT met, we will handle it for you by catch the exception, logging about it, setting error on response_state of object, and then returning. Conditions are generally used to help prevent race-conditions, ie: to prevent a put on purpose; so we don't raise an exception.
Most of the time the intent is to ignore what the result is, it was either put into table or not; ie: it will eventually be handled in some way it if it was not put into table.
If you want to know when a conditional delete fails, you can look at deleted objects
obj.api.response_state
.It's
had_error
will beTrue
, there will be some info inerrors
, but also a field error with name_conditional_check
and codefailed
you can easily check for via:....response_state.has_field_error('_conditional_check', 'failed')
or you can use
xdynamo.const.CONDITIONAL_CHECK_FAILED_KEY
for the field key.
class DynClientOptions (*, consistent_read: bool | xsentinels.default.DefaultType = Default)
-
If you have not already done so, you should also read the xinject project's README.md for an overview of the library before diving into the below text, that's more of like reference material.
Summary
Allows you to create subclasses that act as sharable dependencies. Things that should stick around and should be created lazily.
Also allows code to temporarily create, customize and activate a dependency if you don't want the customization to stick around permanently. You can do it without your or other code needing to be aware of each other.
This helps promote code decoupling, since it's so easy to make a Resource activate it as the 'current' version to use.
The only coupling that takes place is to the Resource sub-class it's self.
You can also easily have each thread lazily create seperate instance of your Resource, by inheriting from
PerThreadResource
.Each separate piece of code that uses a particular Resource subclass can be completely unaware of each other, and yet each one can take advantage of the shared dependency.
This means that Resource can help cover dependency-injection use-cases.
Overview
A
Resource
represents an object in axinject.context.XContext
. Generally, dependencies that are added/created inside aXContext
inherit from this abstract baseResource
class, but are not required too.Resource
just adds some class-level conveince methods and configuratino options. Inheriting from Resource also helps self-document that it's a Resource.See Resources at top of this module for a general overview of how dependencies and
XContext
's work. You should also read the xinject project's README.md for a high-level overview. The text below is more like plain refrence matrial.Get the current dependency via
Resource.dependency
, you can call it on sub-class/concreate dependency type, like so:>>> from xinject import Dependency >>> class MyConfig(Dependency): ... some_setting: str = "default-setting-string" >>> >>> MyConfig.grab().some_setting
By default, Resource's act like a singletons; in that child contexs will simply get the same instance of the dependency that the parent context has.
If you inherit from this class, when you have
Resource.dependency
called on you, we will do our best to ensure that the same object instance is returned every time (there are two exceptions, keep reading).These dependencies are stored in the current
XContext
's parent. What happens is:If the current
XContext
and none of their parents have this object and it's asked for (like what happens whenResource.dependency
is called on it), it will be created in the deepest/oldest parent XContext.This is the first parent
xinject.context.XContext
who'sXContext.parent
is None. That way it should be visible to everyone on the current thread since it will normally be created in the app-rootxinject.context.XContext
.If the Dependency can't be shared between multiple threads, creation would normally happen at the thread-root XContext instead of the app-root one.
If we don't already exist in any parent, then we must be created the first time we are asked for. Normally it will simply be a direct call the dependency-type being requested, this is the normal way to create objects in python:
>>> class MyResource(Dependency): >>> pass >>> >>> MyResource.grab()
When that last line is executed, and the current or any parent context has a
MyResource
dependency;XContext
will simply create one via calling the dependency type:>>> MyResource()
You can allocate the dependency yourself with custom options and add it to the XContext your self.
Here are the various ways to do that, via:
-
XContext.add
Adds dependency to a specific XContext that already exists (or replaces if one has already been directly added in the past to that specific Context). When/While XContext is active, these added dependencies will be thecurrent
ones. -
Decorator, ie:
@MyResource()
>>> from xny_config import Config, config >>> >>> @DependencySubclass(service="override-service-name") >>> def my_method(): >>> assert config.service == "override-service-name"
-
via a with statement.
>>> def my_method(): >>> with @DependencySubclass(service="override-service-name") >>> assert config.service == "override-service-name"
-
multiple in single statement by making your own XContext directly:
>>> def my_method(): >>> with @XContext([ >>> DependencySubclass(service="override-service-name"), >>> SomeOtherDep(name='new-name') >>> ]): >>> assert config.service == "override-service-name"
Background on Unit Testing
By default, unit tests always create a new blank
XContext
withparent=None
. THis is done by an autouse fixture (xinject.pytest_plugin.xinject_test_context
) THis forces every unit test run to create new dependencies when they are asked for (lazily).This fixture is used automatically for each unit test, it clears the app-root XContext, removes all current thread-root XContext's and their children from being
active
. just beofre each run of a unit test.That way it will recreate any shared dependency each time and a unit test can't leak dependencies it added or changed into the next run.
One example of why this is good is for
moto
when mocking dynamodb in boto3 client. Can use dependency to ensure that we get a new dynamodb shared dependency forboto
each time a unit test executes (which helps withmoto
, it needs to be active when a dependency is allocated/used).This is exactly what we want for each unit test run, to have a blank-slate for all the vairous dependencies.
If a particulre set of unit-tests need to have specific dependcies, you can use fixtures to modify/add various dependcies as needed for each indivirual unit-test function run.
When the application runs for real though, we do generally want to use the dependencies in a shared fashion. So normally we only allocate a new blank-root
@XContext(parent=None)
either at the start of a normal application run, or during a unit-test.Expand source code
class DynClientOptions(Dependency): def __init__(self, *, consistent_read: bool | DefaultType = Default): self.consistent_read = consistent_read consistent_read: bool | DefaultType = Default """ Way to change what the default consistent read value should be, If set it will be used over the default value for the model (but won't override True/False passed directly to client as method paramter to get/scan/etc. """
Ancestors
- xinject.dependency.Dependency
Class variables
var consistent_read : bool | xsentinels.default.DefaultType
-
Way to change what the default consistent read value should be, If set it will be used over the default value for the model (but won't override True/False passed directly to client as method paramter to get/scan/etc.
-