Source code for karton.core.task

import enum
import fnmatch
import json
import time
import uuid
import warnings
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
)

from .resource import RemoteResource, ResourceBase
from .utils import recursive_iter, recursive_iter_with_keys, recursive_map

if TYPE_CHECKING:
    from .backend import KartonBackend  # noqa

import orjson


class TaskState(enum.Enum):
    DECLARED = "Declared"  # Task declared in TASKS_QUEUE
    SPAWNED = "Spawned"  # Task spawned into subsystem queue
    STARTED = "Started"  # Task is running in subsystem
    FINISHED = "Finished"  # Task finished (ready to forget)
    CRASHED = "Crashed"  # Task crashed


class TaskPriority(enum.Enum):
    HIGH = "high"
    NORMAL = "normal"
    LOW = "low"


[docs]class Task(object): """ Task representation with headers and resources. :param headers: Routing information for other systems, this is what allows for \ evaluation of given system usefulness for given task. \ Systems filter by these. :param payload: Any instance of :py:class:`dict` - contains resources \ and additional informations :param headers_persistent: Persistent headers for whole task subtree, \ propagated from initial task. :param payload_persistent: Persistent payload set for whole task subtree, \ propagated from initial task :param priority: Priority of whole task subtree, \ propagated from initial task like `payload_persistent` :param parent_uid: Id of a routed task that has created this task by a karton with \ :py:meth:`.send_task` :param root_uid: Id of an unrouted task that is the root of this \ task's analysis tree :param orig_uid: Id of an unrouted (or crashed routed) task that was forked to \ create this task :param uid: This tasks unique identifier :param error: Traceback of a exception that happened while performing this task """ __slots__ = ( "uid", "root_uid", "orig_uid", "parent_uid", "error", "headers", "status", "last_update", "priority", "payload", "payload_persistent", "_headers_persistent_keys", ) def __init__( self, headers: Dict[str, Any], payload: Optional[Dict[str, Any]] = None, headers_persistent: Optional[Dict[str, Any]] = None, payload_persistent: Optional[Dict[str, Any]] = None, priority: Optional[TaskPriority] = None, parent_uid: Optional[str] = None, root_uid: Optional[str] = None, orig_uid: Optional[str] = None, uid: Optional[str] = None, error: Optional[List[str]] = None, _status: Optional[TaskState] = None, _last_update: Optional[float] = None, ) -> None: payload = payload or {} payload_persistent = payload_persistent or {} headers_persistent = headers_persistent or {} if not isinstance(payload, dict): raise ValueError("Payload should be an instance of a dict") if not isinstance(payload_persistent, dict): raise ValueError("Persistent payload should be an instance of a dict") if not isinstance(headers_persistent, dict): raise ValueError("Persistent headers should be an instance of a dict") if uid is None: self.uid = str(uuid.uuid4()) else: self.uid = uid if root_uid is None: self.root_uid = self.uid else: self.root_uid = root_uid self.orig_uid = orig_uid self.parent_uid = parent_uid self.error = error self.headers = {**headers, **headers_persistent} self._headers_persistent_keys = set(headers_persistent.keys()) self.status = _status or TaskState.DECLARED self.last_update: float = _last_update or time.time() self.priority = priority or TaskPriority.NORMAL self.payload = dict(payload) self.payload_persistent = dict(payload_persistent) @property def headers_persistent(self) -> Dict[str, Any]: return {k: v for k, v in self.headers.items() if self.is_header_persistent(k)} @property def receiver(self) -> Optional[str]: return self.headers.get("receiver") def fork_task(self) -> "Task": """ Fork task to transfer single task to many queues (but use different UID). Used internally by karton-system :return: Forked copy of the original task :meta private: """ new_task = Task( headers=self.headers, headers_persistent=self.headers_persistent, payload=self.payload, payload_persistent=self.payload_persistent, priority=self.priority, parent_uid=self.parent_uid, root_uid=self.root_uid, orig_uid=self.uid, ) return new_task
[docs] def derive_task(self, headers: Dict[str, Any]) -> "Task": """ Creates copy of task with different headers, useful for proxying resource with added metadata. .. code-block:: python class MZClassifier(Karton): identity = "karton.mz-classifier" filters = { "type": "sample", "kind": "raw" } def process(self, task: Task) -> None: sample = task.get_resource("sample") if sample.content.startswith(b"MZ"): self.log.info("MZ detected!") task = task.derive_task({ "type": "sample", "kind": "exe" }) self.send_task(task) self.log.info("Not a MZ :<") .. versionchanged:: 3.0.0 Moved from static method to regular method: :code:`Task.derive_task(headers, task)` must be ported to :code:`task.derive_task(headers)` :param headers: New headers for the task :return: Copy of task with new headers """ new_task = Task( headers=headers, headers_persistent=self.headers_persistent, payload=self.payload, payload_persistent=self.payload_persistent, ) return new_task
def matches_filters(self, filters: List[Dict[str, Any]]) -> bool: """ Checks whether provided task headers match filters :param filters: Task header filters :return: True if task headers match specific filters :meta private: """ def test_filter(headers: Dict[str, Any], filter: Dict[str, Any]) -> int: """ Filter match follows AND logic, but it's non-boolean because filters may be negated (task:!platform). Result values are as follows: - 1 - positive match, no mismatched values in headers (all matched) - 0 - no match, found value that doesn't match to the filter (some are not matched) - -1 - negative match, found value that matches negated filter value (all matched but found negative matches) """ matches = 1 for filter_key, filter_value in filter.items(): # Coerce filter value to string filter_value_str = str(filter_value) negated = False if filter_value_str.startswith("!"): negated = True filter_value_str = filter_value_str[1:] # If expected key doesn't exist in headers if filter_key not in headers: # Negated filter ignores non-existent values if negated: continue # But positive filter doesn't return 0 # Coerce header value to string header_value_str = str(headers[filter_key]) # fnmatch is great for handling simple wildcard patterns (?, *, [abc]) match = fnmatch.fnmatchcase(header_value_str, filter_value_str) # If matches, but it's negated: it's negative match if match and negated: matches = -1 # If doesn't match but filter is not negated: it's not a match if not match and not negated: return 0 # If there are no mismatched values: filter is matched return matches # List of filter matches follow OR logic, but -1 is special # If there is any -1, result is False # (any matched, but it's negative match) # If there is any 1, but no -1's: result is True # (any matched, no negative match) # If there are only 0's: result is False # (none matched) matches = False for task_filter in filters: match_result = test_filter(self.headers, task_filter) if match_result == -1: # Any negative match results in False return False if match_result == 1: # Any positive match but without negative matches results in True matches = True return matches def set_task_parent(self, parent: "Task"): """ Bind existing Task to parent task :param parent: Task to bind to :meta private: """ self.parent_uid = parent.uid self.root_uid = parent.root_uid def merge_persistent_payload(self, other_task: "Task") -> None: """ Merge persistent payload from another task :param other_task: Task from which to merge persistent payload :meta private: """ for name, content in other_task.payload_persistent.items(): self.payload_persistent[name] = content if name in self.payload: # Delete conflicting non-persistent payload del self.payload[name] def merge_persistent_headers(self, other_task: "Task") -> None: """ Merge persistent headers from another task :param other_task: Task from which to merge persistent headers :meta private: """ self.headers.update(other_task.headers_persistent) self._headers_persistent_keys = self._headers_persistent_keys.union( other_task._headers_persistent_keys ) def to_dict(self) -> Dict[str, Any]: """ Transform task data into dictionary :return: Task data dictionary :meta private: """ def serialize_resources(obj): if type(obj) is dict: return {k: serialize_resources(v) for k, v in obj.items()} elif type(obj) is list or type(obj) is tuple: return [serialize_resources(v) for v in obj] elif isinstance(obj, ResourceBase): return {"__karton_resource__": obj.to_dict()} else: return obj headers_persistent = self.headers_persistent payload_persistent = { **self.payload_persistent, # Compatibility with Karton <5.2.0 # Consumers <5.2.0 are not merging headers_persistent # from previous task, so we need to hide it there to # let karton-system fix it for us during deserialization "__headers_persistent": headers_persistent, } return { "uid": self.uid, "root_uid": self.root_uid, "parent_uid": self.parent_uid, "orig_uid": self.orig_uid, "status": self.status.value, "priority": self.priority.value, "last_update": self.last_update, "payload": serialize_resources(self.payload), "payload_persistent": serialize_resources(payload_persistent), "headers": self.headers, "headers_persistent": headers_persistent, "error": self.error, } def serialize(self, indent: Optional[int] = None) -> str: """ Serialize task data into JSON string :param indent: Indent to use while serializing :return: Serialized task data :meta private: """ return json.dumps( self.to_dict(), indent=indent, sort_keys=True, )
[docs] def walk_payload_bags(self) -> Iterator[Tuple[Dict[str, Any], str, Any]]: """ Iterate over all payload bags and direct payloads contained in them Generates tuples (payload_bag, key, value) :return: An iterator over all task payload bags """ for payload_bag in [self.payload, self.payload_persistent]: for key, value in payload_bag.items(): yield payload_bag, key, value
[docs] def walk_payload_items(self) -> Iterator[Tuple[str, Any]]: """ Iterate recursively over all payload items Generates tuples (path, value). :return: An iterator over all task payload values """ yield from recursive_iter_with_keys(self.payload, "payload") yield from recursive_iter_with_keys( self.payload_persistent, "payload_persistent" )
def transform_payload_bags(self, func: Callable[[Any], Any]) -> None: """ Recursively transform contents of all payload bags and payloads contained in them :meta private: """ self.payload, self.payload_persistent = recursive_map( func, [self.payload, self.payload_persistent] )
[docs] def iterate_resources(self) -> Iterator[ResourceBase]: """ Get list of resource objects bound to Task .. versionchanged: 5.0.0 Returns Resource values instead of tuples (key, value) :return: An iterator over all task resources """ for element in recursive_iter([self.payload, self.payload_persistent]): if isinstance(element, ResourceBase): yield element
@staticmethod def unserialize( data: Union[str, bytes], backend: Optional["KartonBackend"] = None, parse_resources: bool = True, ) -> "Task": """ Unserialize Task instance from JSON string :param data: JSON-serialized task :param backend: Backend instance to be bound to RemoteResource objects :param parse_resources: | If set to False (default is True), method doesn't deserialize '__karton_resource__' entries, which speeds up deserialization process. This flag is used mainly for multiple task processing e.g. filtering based on status. When resource deserialization is turned off, Task.unserialize will try to use faster 3rd-party JSON parser (orjson) if it's installed. It's not added as a required dependency but can speed up things if you need to check status of multiple tasks at once. :return: Unserialized Task object :meta private: """ def unserialize_resources(value: Any) -> Any: """ Transforms __karton_resource__ serialized entries into RemoteResource object instances """ if isinstance(value, dict) and "__karton_resource__" in value: return RemoteResource.from_dict(value["__karton_resource__"], backend) return value if not isinstance(data, str): data = data.decode("utf8") if parse_resources: task_data = json.loads(data, object_hook=unserialize_resources) else: try: task_data = orjson.loads(data) except orjson.JSONDecodeError: # fallback, in case orjson raises exception during loading task_data = json.loads(data, object_hook=unserialize_resources) # Compatibility with Karton <5.2.0 headers_persistent_fallback = task_data["payload_persistent"].get( "__headers_persistent", None ) headers_persistent = task_data.get( "headers_persistent", headers_persistent_fallback ) task = Task( task_data["headers"], headers_persistent=headers_persistent, uid=task_data["uid"], root_uid=task_data["root_uid"], parent_uid=task_data["parent_uid"], # Compatibility with <= 3.x.x (get) orig_uid=task_data.get("orig_uid", None), payload=task_data["payload"], payload_persistent=task_data["payload_persistent"], # Compatibility with <= 3.x.x (get) error=task_data.get("error"), # Compatibility with <= 2.x.x (get) priority=( TaskPriority(task_data.get("priority")) if "priority" in task_data else TaskPriority.NORMAL ), _status=TaskState(task_data["status"]), _last_update=task_data.get("last_update", None), ) return task def __repr__(self) -> str: return self.serialize()
[docs] def add_payload(self, name: str, content: Any, persistent: bool = False) -> None: """ Add payload to task :param name: Name of the payload :param content: Payload to be added :param persistent: Flag if the payload should be persistent """ if name in self.payload: raise ValueError("Payload already exists") if name in self.payload_persistent: raise ValueError("Payload already exists in persistent payloads") if not persistent: self.payload[name] = content else: self.payload_persistent[name] = content
[docs] def add_resource( self, name: str, resource: ResourceBase, persistent: bool = False ) -> None: """ Add resource to task. Alias for :py:meth:`add_payload` .. deprecated:: 3.0.0 Use :meth:`add_payload` instead. :param name: Name of the resource :param resource: Resource to be added :param persistent: Flag if the resource should be persistent """ warnings.warn( "add_resource is deprecated, use add_payload instead", DeprecationWarning, stacklevel=2, ) self.add_payload(name, resource, persistent)
[docs] def get_payload(self, name: str, default: Any = None) -> Any: """ Get payload from task :param name: name of the payload :param default: Value to be returned if payload is not present :return: Payload content """ if name in self.payload_persistent: return self.payload_persistent[name] return self.payload.get(name, default)
[docs] def get_resource(self, name: str) -> ResourceBase: """ Get resource from task. Ensures that payload contains an Resource object. If not - raises :class:`TypeError` :param name: Name of the resource to get :return: :py:class:`karton.ResourceBase` - resource with given name """ resource = self.get_payload(name) if not isinstance(resource, ResourceBase): raise TypeError("Resource was expected but not found") return resource
[docs] def remove_payload(self, name: str) -> None: """ Removes payload for the task If payload doesn't exist or is persistent - raises KeyError :param name: Payload name to be removed """ del self.payload[name]
[docs] def has_payload(self, name: str) -> bool: """ Checks whether payload exists :param name: Name of the payload to be checked :return: If tasks payload contains a value with given name """ return name in self.payload or name in self.payload_persistent
[docs] def is_header_persistent(self, name: str) -> bool: """ Checks whether header exists and is persistent :param name: Name of the header to be checked :return: If tasks header with given name is persistent """ return name in self._headers_persistent_keys
[docs] def is_payload_persistent(self, name: str) -> bool: """ Checks whether payload exists and is persistent :param name: Name of the payload to be checked :return: If tasks payload with given name is persistent """ return name in self.payload_persistent