API Reference

firexkit.argument_conversion module

exception firexkit.argument_conversion.ArgumentConversionException[source]

Bases: Exception

An exception occurred while executing a converter

exception firexkit.argument_conversion.CircularDependencyException[source]

Bases: ConverterRegistrationException

A converter was registered with a dependency that is itself directly or indirectly dependent on it.

class firexkit.argument_conversion.ConverterRegister[source]

Bases: object

Converters are a practical mechanism for altering the input values into microservices. They can also be used in upfront validation of the inputs.

convert(pre_task=True, verbose=True, **kwargs) dict[source]

Run all registered converters :param pre_task: Converters can be registered to run before or after a task runs.

classmethod get_register(task_name)[source]
get_visit_order(pre_task=True)[source]

Provide a list of all converters in order of execution, accounting for dependencies.

classmethod list_converters(task_name, pre_task=True)[source]
register(*args)[source]

Register a converter function.

Parameters:

args – A collection of optional arguments, the function of which is based on it’s type:

  • callable (only once): A function that will be called to convert arguments

  • boolean (only once): At which point should this converter be called? True is pre (before task), False is post. (after task)

  • str: Dependencies. Any dependency of the current converter on the one in the string.

classmethod register_for_task(task: PromiseProxy, pre_task=True, *args)[source]

Register a converter function for a given task.

Parameters:
  • task – A microservice signature against which to register the task

  • pre_task – At which point should this converter be called? True is pre (before task), False is post. (after task)

  • args – A collection of optional arguments, the function of which is based on it’s type:

  • callable (only once): A function that will be called to convert arguments

  • str: Dependencies. Any dependency of the current converter on the one in the string.

classmethod task_convert(task_name: str, pre_task=True, **kwargs) dict[source]

Run the argument conversion for a given task.

Parameters:
  • task_name – the short name of the task. If long name is given, it will be reduced to that short name

  • pre_task – Converters can be registered to run before or after a task runs

  • kwargs – the argument dict to be converted

exception firexkit.argument_conversion.ConverterRegistrationException[source]

Bases: Exception

A coding error in the registration of the converter

exception firexkit.argument_conversion.MissingConverterDependencyError[source]

Bases: ConverterRegistrationException

A converter was registered with a dependency that does not exist.

exception firexkit.argument_conversion.NameDuplicationException[source]

Bases: ConverterRegistrationException

A converter was registered with the same name as another converter. This creates conflicts during dependency check, and is not allow

class firexkit.argument_conversion.SingleArgDecorator(*args)[source]

Bases: object

Decorator to simplify a common use case for argument converters, in which a single argument in the bag of goodies needs to be validated or converted. Converter is only called if the argument is in kwargs.

Example:

@ConverterRegister.ConverterRegister(BirthdayCake) @SingleArgDecorator(“message”): def yell_loud(arg_value):

return arg_value.upper()

firexkit.bag_of_goodies module

class firexkit.bag_of_goodies.BagOfGoodies(sig: Signature, args, kwargs, has_returns_from_previous_task=True)[source]

Bases: object

INDIRECT_ARG_CHAR = '@'
get_bag() {}[source]
pop(k, *default)[source]
resolve_circular_indirect_references(sig: Signature, args: tuple, has_returns_from_previous_task: bool) tuple[source]
split_for_signature()[source]
update(updates: {})[source]

firexkit.chain module

class firexkit.chain.InjectArgs(**kwargs)[source]

Bases: object

exception firexkit.chain.InvalidChainArgsException(msg, wrong_args: dict | None = None)[source]

Bases: Exception

firexkit.chain.get_label(sig: Signature)[source]
firexkit.chain.returns(*args)[source]

The decorator is used to allow us to specify the keys of the dict that the task returns.

This is used only to signal to the user the inputs and outputs of a task, and deduce what arguments are required for a chain.

firexkit.chain.set_execution_options(sig: Signature, **options)[source]

Set arbitrary executions options in every task in the sig

firexkit.chain.set_label(sig: Signature, label)[source]
firexkit.chain.set_priority(sig: Signature, priority: int)[source]

Set the priority execution option in every task in sig

firexkit.chain.set_queue(sig: Signature, queue)[source]

Set the queue execution option in every task in sig

firexkit.chain.set_soft_time_limit(sig: Signature, soft_time_limit)[source]

Set the soft_time_limit execution option in every task in sig

firexkit.chain.set_use_cache(sig: Signature, use_cache: bool)[source]

Set the use_cache execution option in every task in sig

firexkit.chain.verify_chain_arguments(sig: Signature)[source]

Verifies that the chain is not missing any parameters. Asserts if any parameters are missing, or if a reference parameter (@something) has not provider

firexkit.result module

exception firexkit.result.ChainException[source]

Bases: Exception

exception firexkit.result.ChainInterruptedByZombieTaskException(task_id=None, task_name=None, cause=None)[source]

Bases: ChainInterruptedException

exception firexkit.result.ChainInterruptedException(task_id=None, task_name=None, cause=None)[source]

Bases: ChainException

MESSAGE = 'The chain has been interrupted by a failure in microservice '
exception firexkit.result.ChainRevokedException(task_id=None, task_name=None)[source]

Bases: ChainException

MESSAGE = 'The chain has been interrupted by the revocation of microservice '
exception firexkit.result.ChainRevokedPreRunException(task_id=None, task_name=None)[source]

Bases: ChainRevokedException

exception firexkit.result.MultipleFailuresException(task_ids=('UNKNOWN',))[source]

Bases: ChainInterruptedException

MESSAGE = 'The chain has been interrupted by multiple failing microservices: %s'
class firexkit.result.WaitLoopCallBack(func, frequency, kwargs)

Bases: tuple

frequency

Alias for field number 1

func

Alias for field number 0

kwargs

Alias for field number 2

exception firexkit.result.WaitOnChainTimeoutError[source]

Bases: Exception

firexkit.result.climb_up_until_null_parent(result: AsyncResult) AsyncResult[source]
firexkit.result.disable_all_async_results()[source]
firexkit.result.disable_async_result(result: AsyncResult)[source]
firexkit.result.find_all_unsuccessful(result: AsyncResult, ignore_non_ready=False, depth=0) {}[source]
firexkit.result.find_unsuccessful_in_chain(result: AsyncResult) {}[source]
firexkit.result.first_non_chain_interrupted_exception(ex)[source]
firexkit.result.forget_chain_results(result: AsyncResult, forget_chain_head_node_result: bool = True, do_not_forget_nodes: Iterable[AsyncResult] | None = None, **kwargs) None[source]

Forget results of the tree rooted at the “chain-head” of result, while skipping subtrees in skip_subtree_nodes, as well as nodes in do_not_forget_nodes.

If forget_chain_head_node_result is False (default True), do not forget the head of the result chain

firexkit.result.forget_single_async_result(r: AsyncResult)[source]

Forget the result of this task

AsyncResult.forget() also forgets the parent (which is not always desirable), so, we had to implement our own

firexkit.result.forget_subtree_results(head_node_result: AsyncResult, skip_subtree_nodes: Iterable[AsyncResult | str] | None = None, do_not_forget_nodes: Iterable[AsyncResult | str] | None = None) None[source]

Forget results of the subtree rooted at head_node_result, while skipping subtrees in skip_subtree_nodes, as well as nodes in do_not_forget_nodes

firexkit.result.get_all_children(node, timeout=180, skip_subtree_nodes: list[AsyncResult | str] | None = None) Iterator[AsyncResult][source]

Iterate the children of node, skipping any nodes found in skip_subtree_nodes

firexkit.result.get_result_id(r: AsyncResult | str) str[source]

Return the string id of r if it was an AsyncResult, otherwise returns r

firexkit.result.get_result_logging_name(result: AsyncResult, name=None)[source]
firexkit.result.get_results(result: AsyncResult, return_keys=(), parent_id: str | None = None, return_keys_only=True, merge_children_results=False, extract_from_parents=True)[source]

Extract and return task results

Args:

result: The AsyncResult to extract actual returned results from return_keys: A single return key string, or a tuple of keys to extract from the AsyncResult.

The default value of None will return a dictionary of key/value pairs for the returned results.

return_keys_only: If True (default), only return results for keys specified by the task’s

@returns decorator or returns attribute. If False, returns will include key/value pairs from the bag of goodies.

parent_id: If extract_from_parents is set, extract results up to this parent_id, or until we can no

longer traverse up the parent hierarchy

merge_children_results: If True, traverse children of result, and merge results produced by them.

The default value of False will not collect results from the children.

extract_from_parents: If True (default), will consider all results returned from tasks of the given

chain (parents of the last task). Else will consider only results returned by the last task of the chain.

Returns:

If return_keys parameter was specified, returns a tuple of the results in the same order of the return_keys. If return_keys parameter wasn’t specified, return a dictionary of the key/value pairs of the returned results.

firexkit.result.get_results_with_default(result: AsyncResult, default=None, error_msg: str | None = None, **kwargs)[source]
firexkit.result.get_task_info_from_result(result, key: str | None = None)[source]
firexkit.result.get_task_name_from_result(result)[source]
firexkit.result.get_task_postrun_info(result)[source]
firexkit.result.get_task_queue_from_result(result)[source]
firexkit.result.get_task_results(results: dict) dict[source]
firexkit.result.get_tasks_inputs_from_result(results: dict) dict[source]
firexkit.result.get_tasks_names_from_results(results)[source]
firexkit.result.is_async_result_monkey_patched_to_track()[source]
firexkit.result.is_result_ready(result: AsyncResult, timeout=900, retry_delay=1)[source]

Protect against broker being temporary unreachable and throwing a TimeoutError

firexkit.result.last_causing_chain_interrupted_exception(ex)[source]
firexkit.result.mark_queues_ready(*queue_names: str)[source]
firexkit.result.mark_task_postrun(task, task_id, **_kwargs)[source]
firexkit.result.monkey_patch_async_result_to_track_instances()[source]
firexkit.result.populate_task_info(sender, declare, headers, **_kwargs)[source]
firexkit.result.results2tuple(results: dict, return_keys: str | tuple) tuple[source]
firexkit.result.send_block_task_states_to_caller_task(func)[source]
firexkit.result.teardown_monkey_patch_async_result_to_track_instances()[source]
firexkit.result.update_task_name(sender, task_id, *_args, **_kwargs)[source]
firexkit.result.wait_for_any_results(results, max_wait=None, poll_max_wait=0.1, log_msg=False, callbacks: Iterator[WaitLoopCallBack] = (), **kwargs)[source]
firexkit.result.wait_for_running_tasks_from_results(results, max_wait=120, sleep_between_iterations=0.05)[source]
firexkit.result.wait_on_async_results(*args, **kwargs)[source]
firexkit.result.wait_on_async_results_and_maybe_raise(results, raise_exception_on_failure=True, caller_task=None, **kwargs)[source]
firexkit.result.was_queue_ready(queue_name: str)[source]

firexkit.revoke module

class firexkit.revoke.RevokedRequests(timer_expiry_secs=60, skip_first_cycle=True)[source]

Bases: object

Need to inspect the app for the revoked requests, because AsyncResult.state of a task that hasn’t

been de-queued and executed by a worker but was revoked is PENDING (i.e., the REVOKED state is only updated upon executing a task). This phenomenon makes the wait_for_results wait on such “revoked” tasks, and therefore required us to implement this work-around.

classmethod get_revoked_list_from_app()[source]
classmethod instance(existing_instance=None)[source]
is_revoked(result_id, timer_expiry_secs=None)[source]
update(verbose=False)[source]
firexkit.revoke.get_chain_head(parent, child)[source]
firexkit.revoke.revoke_nodes_up_to_parent(starting_node, parent)[source]

firexkit.task module

exception firexkit.task.CacheResultNotPopulatedYetInRedis[source]

Bases: NotInCache

class firexkit.task.DictWillNotAllowWrites(_instrumentation_context=None, **kwargs)[source]

Bases: dict

clear() None.  Remove all items from D.[source]
pop(k[, d]) v, remove specified key and return the corresponding value.[source]

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()[source]

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(*args, **kwargs)[source]

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.[source]

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

warn()[source]
exception firexkit.task.DyanmicReturnsNotADict[source]

Bases: Exception

class firexkit.task.FireXTask[source]

Bases: Task

Task object that facilitates passing of arguments and return values from one task to another, to be used in chains

DYNAMIC_RETURN = '__DYNAMIC_RETURN__'
RETURN_KEYS_KEY = '__task_return_keys'
property abog: DictWillNotAllowWrites
add_task_logfile_handler()[source]
add_task_result_with_report_to_db()[source]

Maintain a list in the backend of all executed tasks that will generate reports

property all_args: MappingProxyType
apply_async(*args, **kwargs)[source]

Apply tasks asynchronously by sending a message.

Arguments:

args (Tuple): The positional arguments to pass on to the task.

kwargs (Dict): The keyword arguments to pass on to the task.

countdown (float): Number of seconds into the future that the

task should execute. Defaults to immediate execution.

eta (~datetime.datetime): Absolute time and date of when the task

should be executed. May not be specified if countdown is also supplied.

expires (float, ~datetime.datetime): Datetime or

seconds in the future for the task should expire. The task won’t be executed after the expiration time.

shadow (str): Override task name used in logs/monitoring.

Default is retrieved from shadow_name().

connection (kombu.Connection): Re-use existing broker connection

instead of acquiring one from the connection pool.

retry (bool): If enabled sending of the task message will be

retried in the event of connection loss or failure. Default is taken from the :setting:`task_publish_retry` setting. Note that you need to handle the producer/connection manually for this to work.

retry_policy (Mapping): Override the retry policy used.

See the :setting:`task_publish_retry_policy` setting.

time_limit (int): If set, overrides the default time limit.

soft_time_limit (int): If set, overrides the default soft

time limit.

queue (str, kombu.Queue): The queue to route the task to.

This must be a key present in :setting:`task_queues`, or :setting:`task_create_missing_queues` must be enabled. See guide-routing for more information.

exchange (str, kombu.Exchange): Named custom exchange to send the

task to. Usually not used in combination with the queue argument.

routing_key (str): Custom routing key used to route the task to a

worker server. If in combination with a queue argument only used to specify custom routing keys to topic exchanges.

priority (int): The task priority, a number between 0 and 9.

Defaults to the priority attribute.

serializer (str): Serialization method to use.

Can be pickle, json, yaml, msgpack or any custom serialization method that’s been registered with kombu.serialization.registry. Defaults to the serializer attribute.

compression (str): Optional compression method

to use. Can be one of zlib, bzip2, or any custom compression methods registered with kombu.compression.register(). Defaults to the :setting:`task_compression` setting.

link (Signature): A single, or a list of tasks signatures

to apply if the task returns successfully.

link_error (Signature): A single, or a list of task signatures

to apply if an error occurs while executing the task.

producer (kombu.Producer): custom producer to use when publishing

the task.

add_to_parent (bool): If set to True (default) and the task

is applied while executing another task, then the result will be appended to the parent tasks request.children attribute. Trailing can also be disabled by default using the trail attribute

ignore_result (bool): If set to False (default) the result

of a task will be stored in the backend. If set to True the result will not be stored. This can also be set using the ignore_result in the app.task decorator.

publisher (kombu.Producer): Deprecated alias to producer.

headers (Dict): Message headers to be included in the message.

Returns:

celery.result.AsyncResult: Promise of future evaluation.

Raises:
TypeError: If not enough arguments are passed, or too many

arguments are passed. Note that signature checks may be disabled by specifying @task(typing=False).

kombu.exceptions.OperationalError: If a connection to the

transport cannot be made, or if the connection is lost.

Note:

Also supports all keyword arguments supported by kombu.Producer.publish().

property args: list
property bag: MappingProxyType
property bound_args: dict
cache_call()[source]
property called_as_orig
classmethod convert_cached_results(result)[source]
convert_results_if_returns_defined_by_task_definition(result)[source]
classmethod convert_returns_to_dict(return_keys, result) dict[source]
property default_bound_args: dict
property default_use_cache
duration()[source]
enqueue_child(chain: Signature, add_to_enqueued_children: bool = True, block: bool = False, raise_exception_on_failure: bool | None = None, apply_async_epilogue: Callable[[AsyncResult], None] | None = None, apply_async_options=None, forget: bool = False, inject_uid: bool = False, **kwargs) AsyncResult | None[source]

Schedule a child task to run

enqueue_child_and_extract(*args, **kwargs) tuple | dict[source]

Apply a chain, and extract results from it.

See:

_enqueue_child_and_extract

enqueue_child_and_get_results(*args, return_keys: str | tuple = (), return_keys_only: bool = True, merge_children_results: bool = False, extract_from_parents: bool = True, **kwargs) tuple | dict[source]

Apply a chain, and extract results from it.

This is a better version of enqueue_child_and_extract where the defaults for extract_from_children and extract_task_returns_only defaults are more intuitive. Additionally, extract_from_parents defaults to True in this API.

Note:

This is shorthand for enqueue_child() followed with get_results().

Args:

*args: Tuple of args required by enqueue_child() return_keys: A single return key string, or a tuple of keys to extract from the task results.

The default value of None will return a dictionary of key/value pairs for the returned results.

return_keys_only: If set, only return results for keys specified by the tasks’ @returns

decorator or returns attribute, otherwise, returns will include key/value pairs from the BoG.

merge_children_results: If set, extract and merge results from the children tasks as well. extract_from_parents: If set, will consider all results returned from tasks of the given chain (parents

of the last task). Else will consider only results returned by the last task of the chain.

**kwargs: Other options to enqueue_child()

Returns:

The returns of get_results.

See Also:

get_results

enqueue_child_from_spec(task_spec: TaskEnqueueSpec, inject_args: dict | None = None)[source]
enqueue_child_once(*args, enqueue_once_key, block=False, **kwargs)[source]

See :meth:`enqueue_child_once_and_extract

enqueue_child_once_and_extract(*args, enqueue_once_key: str, **kwargs) [<class 'tuple'>, <class 'dict'>][source]

Apply a chain with a unique key only once per FireX run, and extract results from it.

Note:

This is like enqueue_child_and_extract(), but it sets enqueue_once_key.

enqueue_in_parallel(chains, max_parallel_chains=15, wait_for_completion=True, raise_exception_on_failure=False, **kwargs)[source]

This method executes the provided list of Signatures/Chains in parallel and returns the associated list of “async_result” objects. The results are returned in the same order as the input Signatures/Chains.

property enqueued_children
property file_logging_dirpath
final_call(*args, **kwargs)[source]
forget_child_result(child_result: AsyncResult, do_not_forget_report_nodes: bool = True, do_not_forget_enqueue_once_nodes: bool = True, do_not_forget_cache_enabled_tasks_results: bool = True, **kwargs)[source]

Forget results of the tree rooted at the “chain-head” of child_result, while skipping subtrees in skip_subtree_nodes, as well as nodes in do_not_forget_nodes.

If do_not_forget_report_nodes is True (default), do not forget report nodes (e.g. nodes decorated with @email)

If do_not_forget_enqueue_once_nodes is True (default), do not forget subtrees rooted at nodes that were enqueued with enqueue_once

If do_not_forget_cache_enabled_tasks_results is True (default), do not forget subtrees rooted at nodes that belong to services with cached=True

forget_enqueued_children_results(**kwargs)[source]

Forget results for the enqueued children of current task

forget_specific_children_results(child_results: list[AsyncResult], **kwargs)[source]

Forget results for the explicitly provided child_results

property from_plugin
get_module_file_location()[source]
static get_short_name(task_name)[source]
classmethod get_short_name_without_orig(task_name)[source]
get_task_flame_configs() OrderedDict[source]
classmethod get_task_logfile(task_logging_dirpath, task_name, uuid)[source]
get_task_logfile_from_request(request)[source]
static get_task_logfilename(task_name, uuid)[source]
get_task_logging_dirpath_from_request(request)[source]
get_task_return_keys() tuple[source]
handle_exception(e, logging_extra: dict | None = None, raise_exception=True)[source]
has_report_meta() bool[source]

Does this task generate a report (e.g. decorated with @email)?

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.

initialize_context()[source]
is_cache_enabled()[source]
classmethod is_dynamic_return(value)[source]
property kwargs: dict
property logs_dir_for_worker
map_args(*args, **kwargs) dict[source]
map_input_args_kwargs(*args, **kwargs)[source]
property name_without_orig
property nonready_enqueued_children
property optional_args: dict
Returns:

dict of optional arguments to the microservice, and their values.

property pending_enqueued_children
abstract post_task_run(results, extra_events: dict | None = None)[source]

Overrideable method to allow subclasses to do something with the BagOfGoodies after the task has been run

abstract pre_task_run(extra_events: dict | None = None)[source]

Overrideable method to allow subclasses to do something with the BagOfGoodies before returning the results

print_postcall_header(result)[source]
print_precall_header(bound_args, default_bound_args)[source]
priority = None

Default task priority.

rate_limit = None

Rate limit for this task type. Examples: None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

real_call()[source]
reject_on_worker_lost = None

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

remove_task_logfile_handler()[source]
property request_soft_time_limit
request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

property required_args: list
Returns:

list of required arguments to the microservice.

retry(*args, **kwargs)[source]

Retry the task, adding it to the back of the queue.

Example:
>>> from imaginary_twitter_lib import Twitter
>>> from proj.celery import app
>>> @app.task(bind=True)
... def tweet(self, auth, message):
...     twitter = Twitter(oauth=auth)
...     try:
...         twitter.post_status_update(message)
...     except twitter.FailWhale as exc:
...         # Retry in 5 minutes.
...         raise self.retry(countdown=60 * 5, exc=exc)
Note:

Although the task will never return above as retry raises an exception to notify the worker, we use raise in front of the retry to convey that the rest of the block won’t be executed.

Arguments:

args (Tuple): Positional arguments to retry with. kwargs (Dict): Keyword arguments to retry with. exc (Exception): Custom exception to report when the max retry

limit has been exceeded (default: @MaxRetriesExceededError).

If this argument is set and retry is called while an exception was raised (sys.exc_info() is set) it will attempt to re-raise the current exception.

If no exception was raised it will raise the exc argument provided.

countdown (float): Time in seconds to delay the retry for. eta (~datetime.datetime): Explicit time and date to run the

retry at.

max_retries (int): If set, overrides the default retry limit for

this execution. Changes to this parameter don’t propagate to subsequent task retry attempts. A value of None, means “use the default”, so if you want infinite retries you’d have to set the max_retries attribute of the task to None first.

time_limit (int): If set, overrides the default time limit. soft_time_limit (int): If set, overrides the default soft

time limit.

throw (bool): If this is False, don’t raise the

@Retry exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns after the retry call.

**options (Any): Extra options to pass on to apply_async().

Raises:

celery.exceptions.Retry:

To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to False, and is considered normal operation.

revoke_child(result: AsyncResult, terminate=True, wait=False, timeout=None)[source]
revoke_nonready_children()[source]
property root_logger
property root_logger_file_handler
property root_orig

Return the very original Task that this Task had overridden. If this task has been overridden multiple times, this will return the very first/original task. Return self if the task was not overridden

run(*args, **kwargs)[source]

The body of the task executed by workers.

send_display_collapse(task_uuid: str | None = None)[source]

Collapse the current task (default), or collapse the task with the supplied UUID.

send_event(*args, **kwargs)[source]

Send monitoring event message.

This can be used to add custom event types in :pypi:`Flower` and other monitors.

Arguments:

type_ (str): Type of event, e.g. "task-failed".

Keyword Arguments:
retry (bool): Retry sending the message

if the connection is lost. Default is taken from the :setting:`task_publish_retry` setting.

retry_policy (Mapping): Retry settings. Default is taken

from the :setting:`task_publish_retry_policy` setting.

**fields (Any): Map containing information about the event.

Must be JSON serializable.

send_firex_data(data)[source]
send_firex_event_raw(data)[source]
send_firex_html(**kwargs)[source]
serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

property short_name
property short_name_without_orig
signature(*args, **kwargs)[source]

Create signature.

Returns:
signature: object for

this task, wrapping arguments and execution options for a single task invocation.

start_time()[source]
store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

static strip_orig_from_name(task_name)[source]
task_context()[source]
property task_label: str

Returns a label for this task

Examples:

8345379a-e536-4566-b5c9-3d515ec5936a 8345379a-e536-4566-b5c9-3d515ec5936a_2 (if it was the second retry) microservices.testsuites_tasks.CreateWorkerConfigFromTestsuites (if there was no request id yet)

property task_log_url
property task_logfile
property task_logging_dirpath
track_started = False

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the :setting:`task_track_started` setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

property uid
update_firex_data(**kwargs)[source]
wait_for_any_children(pending_only=True, **kwargs)[source]

Wait for any of the enqueued child tasks to run and complete

wait_for_children(pending_only=True, **kwargs)[source]

Wait for all enqueued child tasks to run and complete

wait_for_specific_children(child_results, forget: bool = False, **kwargs)[source]

Wait for the explicitly provided child_results to run and complete

property worker_log_file
property worker_log_url
write_task_log_html_header()[source]
exception firexkit.task.IllegalTaskNameException[source]

Bases: Exception

exception firexkit.task.NotInCache[source]

Bases: Exception

class firexkit.task.PendingChildStrategy(value)[source]

Bases: Enum

Available strategies for handling remaining pending child tasks upon successful completion of the parent microservice.

Block = (0, 'Default')
Continue = 2
Revoke = 1
exception firexkit.task.ReturnsCodingException[source]

Bases: Exception

class firexkit.task.TaskContext[source]

Bases: object

class firexkit.task.TaskEnqueueSpec(signature: celery.canvas.Signature, inject_abog: bool = True, enqueue_opts: dict[str, Any] | None = None)[source]

Bases: object

enqueue_opts: dict[str, Any] | None = None
inject_abog: bool = True
signature: Signature
exception firexkit.task.UidNotInjectedInAbog[source]

Bases: Exception

firexkit.task.add_task_result_with_report_to_db(db, result_id: str)[source]

Append task id to the list of tasks with reports (e.g. tasks decorated with @email)

firexkit.task.banner(text, ch='=', length=78, content='')[source]
firexkit.task.convert_to_serializable(obj, max_recursive_depth=10, _depth=0)[source]
firexkit.task.create_collapse_ops(flex_collapse_ops_spec)[source]
firexkit.task.create_flame_config(existing_configs, formatter=<function _default_flame_formatter>, data_type='html', bind=False, on_next=False, on_next_args=())[source]
firexkit.task.expand_self_op()[source]
firexkit.task.flame(flame_key=None, formatter=<function _default_flame_formatter>, data_type='html', bind=False, on_next=False, on_next_args=(), decorator_name='flame')[source]
firexkit.task.flame_collapse(flex_collapse_ops)[source]
firexkit.task.flame_collapse_formatter(ops, task)[source]
firexkit.task.get_attr_unwrapped(fun: callable, attr_name, *default_value)[source]

Unwraps a function and returns an attribute of the root function

firexkit.task.get_cache_enabled_uid_dbkey(cache_key_info: str) str[source]
firexkit.task.get_current_cache_enabled_uid_dbkeys(db) list[str][source]
firexkit.task.get_current_cache_enabled_uids(db) set[str][source]

Returns a set of all task/result ids whose tasks were cache-enabled

firexkit.task.get_current_enqueue_child_once_uid_dbkeys(db) list[str][source]
firexkit.task.get_current_enqueue_child_once_uids(db) set[str][source]

Returns a set of all task/result ids that were executed with enqueue_once

firexkit.task.get_current_reports_uids(db) set[str][source]

Return the list of task/results ids for all tasks with reports (e.g. @email) executed so far

firexkit.task.get_enqueue_child_once_count_dbkey(enqueue_once_key: str) str[source]
firexkit.task.get_enqueue_child_once_uid_dbkey(enqueue_once_key: str) str[source]
firexkit.task.get_starttime_dbkey(task_id)[source]
firexkit.task.get_task_start_time(task_id, backend)[source]
firexkit.task.get_time_from_task_start(task_id, backend)[source]
firexkit.task.is_jsonable(obj) bool[source]

Returns True if the obj can be serialized via Json, otherwise returns False

firexkit.task.parse_signature(sig: ~inspect.Signature) -> (<class 'set'>, <class 'dict'>)[source]

Parse the run function of a microservice and return required and optional arguments

firexkit.task.send_task_completed_event(task, task_id, backend)[source]
firexkit.task.statsd_task_postrun(sender, task, task_id, args, kwargs, **donotcare)[source]
firexkit.task.statsd_task_prerun(sender, task, task_id, args, kwargs, **donotcare)[source]
firexkit.task.statsd_task_revoked(sender, request, terminated, signum, expired, **kwargs)[source]
firexkit.task.task_prerequisite(pre_req_task: ~celery.local.PromiseProxy, key: str | None = None, trigger: callable = <class 'bool'>) callable[source]
Register a prerequisite to a microservice.
param pre_req_task:

microservice to be invoked if trigger returns False

param key:

key in kwargs to pass to the trigger. If None, all kwargs are passed

param trigger:

a function returning a bool. When False is returned, then pre_req_task is enqueued

When adding multiple prerequisites, they must be added in reverse order (i.e. last one to run first)

firexkit.task.undecorate(task)[source]
Returns:

the original function that was used to create a microservice

firexkit.task.undecorate_func(func)[source]