API Reference
firexkit.argument_conversion module
- exception firexkit.argument_conversion.ArgumentConversionException[source]
Bases:
Exception
An exception occurred while executing a converter
- exception firexkit.argument_conversion.CircularDependencyException[source]
Bases:
ConverterRegistrationException
A converter was registered with a dependency that is itself directly or indirectly dependent on it.
- class firexkit.argument_conversion.ConverterRegister[source]
Bases:
object
Converters are a practical mechanism for altering the input values into microservices. They can also be used in upfront validation of the inputs.
- convert(pre_task=True, verbose=True, **kwargs) dict [source]
Run all registered converters :param pre_task: Converters can be registered to run before or after a task runs.
- get_visit_order(pre_task=True)[source]
Provide a list of all converters in order of execution, accounting for dependencies.
- register(*args)[source]
Register a converter function.
- Parameters:
args – A collection of optional arguments, the function of which is based on it’s type:
callable (only once): A function that will be called to convert arguments
boolean (only once): At which point should this converter be called? True is pre (before task), False is post. (after task)
str: Dependencies. Any dependency of the current converter on the one in the string.
- classmethod register_for_task(task: PromiseProxy, pre_task=True, *args)[source]
Register a converter function for a given task.
- Parameters:
task – A microservice signature against which to register the task
pre_task – At which point should this converter be called? True is pre (before task), False is post. (after task)
args – A collection of optional arguments, the function of which is based on it’s type:
callable (only once): A function that will be called to convert arguments
str: Dependencies. Any dependency of the current converter on the one in the string.
- classmethod task_convert(task_name: str, pre_task=True, **kwargs) dict [source]
Run the argument conversion for a given task.
- Parameters:
task_name – the short name of the task. If long name is given, it will be reduced to that short name
pre_task – Converters can be registered to run before or after a task runs
kwargs – the argument dict to be converted
- exception firexkit.argument_conversion.ConverterRegistrationException[source]
Bases:
Exception
A coding error in the registration of the converter
- exception firexkit.argument_conversion.MissingConverterDependencyError[source]
Bases:
ConverterRegistrationException
A converter was registered with a dependency that does not exist.
- exception firexkit.argument_conversion.NameDuplicationException[source]
Bases:
ConverterRegistrationException
A converter was registered with the same name as another converter. This creates conflicts during dependency check, and is not allow
- class firexkit.argument_conversion.SingleArgDecorator(*args)[source]
Bases:
object
Decorator to simplify a common use case for argument converters, in which a single argument in the bag of goodies needs to be validated or converted. Converter is only called if the argument is in kwargs.
- Example:
@ConverterRegister.ConverterRegister(BirthdayCake) @SingleArgDecorator(“message”): def yell_loud(arg_value):
return arg_value.upper()
firexkit.bag_of_goodies module
firexkit.chain module
- exception firexkit.chain.InvalidChainArgsException(msg, wrong_args: dict | None = None)[source]
Bases:
Exception
- firexkit.chain.returns(*args)[source]
The decorator is used to allow us to specify the keys of the dict that the task returns.
This is used only to signal to the user the inputs and outputs of a task, and deduce what arguments are required for a chain.
- firexkit.chain.set_execution_options(sig: Signature, **options)[source]
Set arbitrary executions options in every task in the
sig
- firexkit.chain.set_priority(sig: Signature, priority: int)[source]
Set the
priority
execution option in every task insig
- firexkit.chain.set_queue(sig: Signature, queue)[source]
Set the
queue
execution option in every task insig
- firexkit.chain.set_soft_time_limit(sig: Signature, soft_time_limit)[source]
Set the
soft_time_limit
execution option in every task insig
firexkit.result module
- exception firexkit.result.ChainInterruptedByZombieTaskException(task_id=None, task_name=None, cause=None)[source]
Bases:
ChainInterruptedException
- exception firexkit.result.ChainInterruptedException(task_id=None, task_name=None, cause=None)[source]
Bases:
ChainException
- MESSAGE = 'The chain has been interrupted by a failure in microservice '
- exception firexkit.result.ChainRevokedException(task_id=None, task_name=None)[source]
Bases:
ChainException
- MESSAGE = 'The chain has been interrupted by the revocation of microservice '
- exception firexkit.result.ChainRevokedPreRunException(task_id=None, task_name=None)[source]
Bases:
ChainRevokedException
- exception firexkit.result.MultipleFailuresException(task_ids=('UNKNOWN',))[source]
Bases:
ChainInterruptedException
- MESSAGE = 'The chain has been interrupted by multiple failing microservices: %s'
- class firexkit.result.WaitLoopCallBack(func, frequency, kwargs)
Bases:
tuple
- frequency
Alias for field number 1
- func
Alias for field number 0
- kwargs
Alias for field number 2
- firexkit.result.find_all_unsuccessful(result: AsyncResult, ignore_non_ready=False, depth=0) {} [source]
- firexkit.result.forget_chain_results(result: AsyncResult, forget_chain_head_node_result: bool = True, do_not_forget_nodes: Iterable[AsyncResult] | None = None, **kwargs) None [source]
Forget results of the tree rooted at the “chain-head” of result, while skipping subtrees in skip_subtree_nodes, as well as nodes in do_not_forget_nodes.
If forget_chain_head_node_result is False (default True), do not forget the head of the result chain
- firexkit.result.forget_single_async_result(r: AsyncResult)[source]
Forget the result of this task
AsyncResult.forget() also forgets the parent (which is not always desirable), so, we had to implement our own
- firexkit.result.forget_subtree_results(head_node_result: AsyncResult, skip_subtree_nodes: Iterable[AsyncResult | str] | None = None, do_not_forget_nodes: Iterable[AsyncResult | str] | None = None) None [source]
Forget results of the subtree rooted at head_node_result, while skipping subtrees in skip_subtree_nodes, as well as nodes in do_not_forget_nodes
- firexkit.result.get_all_children(node, timeout=180, skip_subtree_nodes: list[AsyncResult | str] | None = None) Iterator[AsyncResult] [source]
Iterate the children of node, skipping any nodes found in skip_subtree_nodes
- firexkit.result.get_result_id(r: AsyncResult | str) str [source]
Return the string id of r if it was an AsyncResult, otherwise returns r
- firexkit.result.get_results(result: AsyncResult, return_keys=(), parent_id: str | None = None, return_keys_only=True, merge_children_results=False, extract_from_parents=True)[source]
Extract and return task results
- Args:
result: The AsyncResult to extract actual returned results from return_keys: A single return key string, or a tuple of keys to extract from the AsyncResult.
The default value of
None
will return a dictionary of key/value pairs for the returned results.- return_keys_only: If
True
(default), only return results for keys specified by the task’s @returns decorator or
returns
attribute. IfFalse
, returns will include key/value pairs from the bag of goodies.- parent_id: If
extract_from_parents
is set, extract results up to this parent_id, or until we can no longer traverse up the parent hierarchy
- merge_children_results: If
True
, traverse children of result, and merge results produced by them. The default value of
False
will not collect results from the children.- extract_from_parents: If
True
(default), will consider all results returned from tasks of the given chain (parents of the last task). Else will consider only results returned by the last task of the chain.
- return_keys_only: If
- Returns:
If return_keys parameter was specified, returns a tuple of the results in the same order of the return_keys. If return_keys parameter wasn’t specified, return a dictionary of the key/value pairs of the returned results.
- firexkit.result.get_results_with_default(result: AsyncResult, default=None, error_msg: str | None = None, **kwargs)[source]
- firexkit.result.is_result_ready(result: AsyncResult, timeout=900, retry_delay=1)[source]
Protect against broker being temporary unreachable and throwing a TimeoutError
- firexkit.result.wait_for_any_results(results, max_wait=None, poll_max_wait=0.1, log_msg=False, callbacks: Iterator[WaitLoopCallBack] = (), **kwargs)[source]
- firexkit.result.wait_for_running_tasks_from_results(results, max_wait=120, sleep_between_iterations=0.05)[source]
firexkit.revoke module
- class firexkit.revoke.RevokedRequests(timer_expiry_secs=60, skip_first_cycle=True)[source]
Bases:
object
Need to inspect the app for the revoked requests, because AsyncResult.state of a task that hasn’t
been de-queued and executed by a worker but was revoked is PENDING (i.e., the REVOKED state is only updated upon executing a task). This phenomenon makes the wait_for_results wait on such “revoked” tasks, and therefore required us to implement this work-around.
firexkit.task module
- exception firexkit.task.CacheResultNotPopulatedYetInRedis[source]
Bases:
NotInCache
- class firexkit.task.DictWillNotAllowWrites(_instrumentation_context=None, **kwargs)[source]
Bases:
dict
- pop(k[, d]) v, remove specified key and return the corresponding value. [source]
If key is not found, default is returned if given, otherwise KeyError is raised
- popitem()[source]
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(*args, **kwargs)[source]
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- class firexkit.task.FireXTask[source]
Bases:
Task
Task object that facilitates passing of arguments and return values from one task to another, to be used in chains
- DYNAMIC_RETURN = '__DYNAMIC_RETURN__'
- RETURN_KEYS_KEY = '__task_return_keys'
- property abog: DictWillNotAllowWrites
- add_task_result_with_report_to_db()[source]
Maintain a list in the backend of all executed tasks that will generate reports
- property all_args: MappingProxyType
- apply_async(*args, **kwargs)[source]
Apply tasks asynchronously by sending a message.
- Arguments:
args (Tuple): The positional arguments to pass on to the task.
kwargs (Dict): The keyword arguments to pass on to the task.
- countdown (float): Number of seconds into the future that the
task should execute. Defaults to immediate execution.
- eta (~datetime.datetime): Absolute time and date of when the task
should be executed. May not be specified if countdown is also supplied.
- expires (float, ~datetime.datetime): Datetime or
seconds in the future for the task should expire. The task won’t be executed after the expiration time.
- shadow (str): Override task name used in logs/monitoring.
Default is retrieved from
shadow_name()
.- connection (kombu.Connection): Re-use existing broker connection
instead of acquiring one from the connection pool.
- retry (bool): If enabled sending of the task message will be
retried in the event of connection loss or failure. Default is taken from the :setting:`task_publish_retry` setting. Note that you need to handle the producer/connection manually for this to work.
- retry_policy (Mapping): Override the retry policy used.
See the :setting:`task_publish_retry_policy` setting.
time_limit (int): If set, overrides the default time limit.
- soft_time_limit (int): If set, overrides the default soft
time limit.
- queue (str, kombu.Queue): The queue to route the task to.
This must be a key present in :setting:`task_queues`, or :setting:`task_create_missing_queues` must be enabled. See guide-routing for more information.
- exchange (str, kombu.Exchange): Named custom exchange to send the
task to. Usually not used in combination with the
queue
argument.- routing_key (str): Custom routing key used to route the task to a
worker server. If in combination with a
queue
argument only used to specify custom routing keys to topic exchanges.- priority (int): The task priority, a number between 0 and 9.
Defaults to the
priority
attribute.- serializer (str): Serialization method to use.
Can be pickle, json, yaml, msgpack or any custom serialization method that’s been registered with
kombu.serialization.registry
. Defaults to theserializer
attribute.- compression (str): Optional compression method
to use. Can be one of
zlib
,bzip2
, or any custom compression methods registered withkombu.compression.register()
. Defaults to the :setting:`task_compression` setting.- link (Signature): A single, or a list of tasks signatures
to apply if the task returns successfully.
- link_error (Signature): A single, or a list of task signatures
to apply if an error occurs while executing the task.
- producer (kombu.Producer): custom producer to use when publishing
the task.
- add_to_parent (bool): If set to True (default) and the task
is applied while executing another task, then the result will be appended to the parent tasks
request.children
attribute. Trailing can also be disabled by default using thetrail
attribute- ignore_result (bool): If set to False (default) the result
of a task will be stored in the backend. If set to True the result will not be stored. This can also be set using the
ignore_result
in the app.task decorator.
publisher (kombu.Producer): Deprecated alias to
producer
.headers (Dict): Message headers to be included in the message.
- Returns:
celery.result.AsyncResult: Promise of future evaluation.
- Raises:
- TypeError: If not enough arguments are passed, or too many
arguments are passed. Note that signature checks may be disabled by specifying
@task(typing=False)
.- kombu.exceptions.OperationalError: If a connection to the
transport cannot be made, or if the connection is lost.
- Note:
Also supports all keyword arguments supported by
kombu.Producer.publish()
.
- property args: list
- property bag: MappingProxyType
- property bound_args: dict
- property called_as_orig
- property default_bound_args: dict
- property default_use_cache
- enqueue_child(chain: Signature, add_to_enqueued_children: bool = True, block: bool = False, raise_exception_on_failure: bool | None = None, apply_async_epilogue: Callable[[AsyncResult], None] | None = None, apply_async_options=None, forget: bool = False, inject_uid: bool = False, **kwargs) AsyncResult | None [source]
Schedule a child task to run
- enqueue_child_and_extract(*args, **kwargs) tuple | dict [source]
Apply a
chain
, and extract results from it.- See:
_enqueue_child_and_extract
- enqueue_child_and_get_results(*args, return_keys: str | tuple = (), return_keys_only: bool = True, merge_children_results: bool = False, extract_from_parents: bool = True, **kwargs) tuple | dict [source]
Apply a
chain
, and extract results from it.This is a better version of enqueue_child_and_extract where the defaults for extract_from_children and extract_task_returns_only defaults are more intuitive. Additionally, extract_from_parents defaults to True in this API.
- Note:
This is shorthand for
enqueue_child()
followed withget_results()
.- Args:
*args: Tuple of args required by
enqueue_child()
return_keys: A single return key string, or a tuple of keys to extract from the task results.The default value of
None
will return a dictionary of key/value pairs for the returned results.- return_keys_only: If set, only return results for keys specified by the tasks’ @returns
decorator or
returns
attribute, otherwise, returns will include key/value pairs from the BoG.
merge_children_results: If set, extract and merge results from the children tasks as well. extract_from_parents: If set, will consider all results returned from tasks of the given chain (parents
of the last task). Else will consider only results returned by the last task of the chain.
**kwargs: Other options to
enqueue_child()
- Returns:
The returns of get_results.
- See Also:
get_results
- enqueue_child_from_spec(task_spec: TaskEnqueueSpec, inject_args: dict | None = None)[source]
- enqueue_child_once(*args, enqueue_once_key, block=False, **kwargs)[source]
See :meth:`enqueue_child_once_and_extract
- enqueue_child_once_and_extract(*args, enqueue_once_key: str, **kwargs) [<class 'tuple'>, <class 'dict'>] [source]
Apply a
chain
with a unique key only once per FireX run, and extract results from it.- Note:
This is like
enqueue_child_and_extract()
, but it sets enqueue_once_key.
- enqueue_in_parallel(chains, max_parallel_chains=15, wait_for_completion=True, raise_exception_on_failure=False, **kwargs)[source]
This method executes the provided list of Signatures/Chains in parallel and returns the associated list of “async_result” objects. The results are returned in the same order as the input Signatures/Chains.
- property enqueued_children
- property file_logging_dirpath
- forget_child_result(child_result: AsyncResult, do_not_forget_report_nodes: bool = True, do_not_forget_enqueue_once_nodes: bool = True, do_not_forget_cache_enabled_tasks_results: bool = True, **kwargs)[source]
Forget results of the tree rooted at the “chain-head” of child_result, while skipping subtrees in skip_subtree_nodes, as well as nodes in do_not_forget_nodes.
If do_not_forget_report_nodes is True (default), do not forget report nodes (e.g. nodes decorated with @email)
If do_not_forget_enqueue_once_nodes is True (default), do not forget subtrees rooted at nodes that were enqueued with enqueue_once
If do_not_forget_cache_enabled_tasks_results is True (default), do not forget subtrees rooted at nodes that belong to services with cached=True
- forget_enqueued_children_results(**kwargs)[source]
Forget results for the enqueued children of current task
- forget_specific_children_results(child_results: list[AsyncResult], **kwargs)[source]
Forget results for the explicitly provided child_results
- property from_plugin
- ignore_result = False
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- property kwargs: dict
- property logs_dir_for_worker
- property name_without_orig
- property nonready_enqueued_children
- property optional_args: dict
- Returns:
dict of optional arguments to the microservice, and their values.
- property pending_enqueued_children
- abstract post_task_run(results, extra_events: dict | None = None)[source]
Overrideable method to allow subclasses to do something with the BagOfGoodies after the task has been run
- abstract pre_task_run(extra_events: dict | None = None)[source]
Overrideable method to allow subclasses to do something with the BagOfGoodies before returning the results
- priority = None
Default task priority.
- rate_limit = None
Rate limit for this task type. Examples:
None
(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)
- reject_on_worker_lost = None
Even if
acks_late
is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- property request_soft_time_limit
- request_stack = <celery.utils.threads._LocalStack object>
Task request stack, the current request will be the topmost.
- property required_args: list
- Returns:
list of required arguments to the microservice.
- retry(*args, **kwargs)[source]
Retry the task, adding it to the back of the queue.
- Example:
>>> from imaginary_twitter_lib import Twitter >>> from proj.celery import app
>>> @app.task(bind=True) ... def tweet(self, auth, message): ... twitter = Twitter(oauth=auth) ... try: ... twitter.post_status_update(message) ... except twitter.FailWhale as exc: ... # Retry in 5 minutes. ... raise self.retry(countdown=60 * 5, exc=exc)
- Note:
Although the task will never return above as retry raises an exception to notify the worker, we use raise in front of the retry to convey that the rest of the block won’t be executed.
- Arguments:
args (Tuple): Positional arguments to retry with. kwargs (Dict): Keyword arguments to retry with. exc (Exception): Custom exception to report when the max retry
limit has been exceeded (default:
@MaxRetriesExceededError
).If this argument is set and retry is called while an exception was raised (
sys.exc_info()
is set) it will attempt to re-raise the current exception.If no exception was raised it will raise the
exc
argument provided.countdown (float): Time in seconds to delay the retry for. eta (~datetime.datetime): Explicit time and date to run the
retry at.
- max_retries (int): If set, overrides the default retry limit for
this execution. Changes to this parameter don’t propagate to subsequent task retry attempts. A value of
None
, means “use the default”, so if you want infinite retries you’d have to set themax_retries
attribute of the task toNone
first.
time_limit (int): If set, overrides the default time limit. soft_time_limit (int): If set, overrides the default soft
time limit.
- throw (bool): If this is
False
, don’t raise the @Retry
exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns after the retry call.
**options (Any): Extra options to pass on to
apply_async()
.
Raises:
- celery.exceptions.Retry:
To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to
False
, and is considered normal operation.
- property root_logger
- property root_logger_file_handler
- property root_orig
Return the very original Task that this Task had overridden. If this task has been overridden multiple times, this will return the very first/original task. Return self if the task was not overridden
- send_display_collapse(task_uuid: str | None = None)[source]
Collapse the current task (default), or collapse the task with the supplied UUID.
- send_event(*args, **kwargs)[source]
Send monitoring event message.
This can be used to add custom event types in :pypi:`Flower` and other monitors.
- Arguments:
type_ (str): Type of event, e.g.
"task-failed"
.- Keyword Arguments:
- retry (bool): Retry sending the message
if the connection is lost. Default is taken from the :setting:`task_publish_retry` setting.
- retry_policy (Mapping): Retry settings. Default is taken
from the :setting:`task_publish_retry_policy` setting.
- **fields (Any): Map containing information about the event.
Must be JSON serializable.
- serializer = 'json'
The name of a serializer that are registered with
kombu.serialization.registry
. Default is ‘json’.
- property short_name
- property short_name_without_orig
- signature(*args, **kwargs)[source]
Create signature.
- Returns:
signature
: object forthis task, wrapping arguments and execution options for a single task invocation.
- store_errors_even_if_ignored = False
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- property task_label: str
Returns a label for this task
- Examples:
8345379a-e536-4566-b5c9-3d515ec5936a 8345379a-e536-4566-b5c9-3d515ec5936a_2 (if it was the second retry) microservices.testsuites_tasks.CreateWorkerConfigFromTestsuites (if there was no request id yet)
- property task_log_url
- property task_logfile
- property task_logging_dirpath
- track_started = False
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing
.
- property uid
- wait_for_any_children(pending_only=True, **kwargs)[source]
Wait for any of the enqueued child tasks to run and complete
- wait_for_children(pending_only=True, **kwargs)[source]
Wait for all enqueued child tasks to run and complete
- wait_for_specific_children(child_results, forget: bool = False, **kwargs)[source]
Wait for the explicitly provided child_results to run and complete
- property worker_log_file
- property worker_log_url
- class firexkit.task.PendingChildStrategy(value)[source]
Bases:
Enum
Available strategies for handling remaining pending child tasks upon successful completion of the parent microservice.
- Block = (0, 'Default')
- Continue = 2
- Revoke = 1
- class firexkit.task.TaskEnqueueSpec(signature: celery.canvas.Signature, inject_abog: bool = True, enqueue_opts: dict[str, Any] | None = None)[source]
Bases:
object
- enqueue_opts: dict[str, Any] | None = None
- inject_abog: bool = True
- signature: Signature
- firexkit.task.add_task_result_with_report_to_db(db, result_id: str)[source]
Append task id to the list of tasks with reports (e.g. tasks decorated with @email)
- firexkit.task.create_flame_config(existing_configs, formatter=<function _default_flame_formatter>, data_type='html', bind=False, on_next=False, on_next_args=())[source]
- firexkit.task.flame(flame_key=None, formatter=<function _default_flame_formatter>, data_type='html', bind=False, on_next=False, on_next_args=(), decorator_name='flame')[source]
- firexkit.task.get_attr_unwrapped(fun: callable, attr_name, *default_value)[source]
Unwraps a function and returns an attribute of the root function
- firexkit.task.get_current_cache_enabled_uids(db) set[str] [source]
Returns a set of all task/result ids whose tasks were cache-enabled
- firexkit.task.get_current_enqueue_child_once_uids(db) set[str] [source]
Returns a set of all task/result ids that were executed with enqueue_once
- firexkit.task.get_current_reports_uids(db) set[str] [source]
Return the list of task/results ids for all tasks with reports (e.g. @email) executed so far
- firexkit.task.is_jsonable(obj) bool [source]
Returns
True
if the obj can be serialized via Json, otherwise returnsFalse
- firexkit.task.parse_signature(sig: ~inspect.Signature) -> (<class 'set'>, <class 'dict'>)[source]
Parse the run function of a microservice and return required and optional arguments
- firexkit.task.task_prerequisite(pre_req_task: ~celery.local.PromiseProxy, key: str | None = None, trigger: callable = <class 'bool'>) callable [source]
- Register a prerequisite to a microservice.
- param pre_req_task:
microservice to be invoked if trigger returns False
- param key:
key in kwargs to pass to the trigger. If None, all kwargs are passed
- param trigger:
a function returning a bool. When False is returned, then pre_req_task is enqueued
When adding multiple prerequisites, they must be added in reverse order (i.e. last one to run first)