Source code for karton.core.karton

"""
Base library for karton subsystems.
"""
import abc
import argparse
import sys
import time
import traceback
from typing import Any, Callable, Dict, List, Optional, Tuple, cast

from .__version__ import __version__
from .backend import KartonBackend, KartonBind, KartonMetrics
from .base import KartonBase, KartonServiceBase
from .config import Config
from .exceptions import TaskTimeoutError
from .resource import LocalResource
from .task import Task, TaskState
from .utils import timeout


[docs]class Producer(KartonBase): """ Producer part of Karton. Used for dispatching initial tasks into karton. :param config: Karton configuration object (optional) :type config: :class:`karton.Config` :param identity: Producer name (optional) :type identity: str Usage example: .. code-block:: python from karton.core import Producer producer = Producer(identity="karton.mwdb") task = Task( headers={ "type": "sample", "kind": "raw" }, payload={ "sample": Resource("sample.exe", b"put content here") } ) producer.send_task(task) :param config: Karton config to use for service configuration :param identity: Karton producer identity :param backend: Karton backend to use """ def __init__( self, config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None, ) -> None: super().__init__(config=config, identity=identity, backend=backend)
[docs] def send_task(self, task: Task) -> bool: """ Sends a task to the unrouted task queue. Takes care of logging. Given task will be child of task we are currently handling (if such exists). :param task: Task object to be sent :return: Bool indicating if the task was delivered """ self.log.debug("Dispatched task %s", task.uid) # Complete information about task if self.current_task is not None: task.set_task_parent(self.current_task) task.merge_persistent_payload(self.current_task) task.merge_persistent_headers(self.current_task) task.priority = self.current_task.priority task.last_update = time.time() task.headers.update({"origin": self.identity}) # Ensure all local resources have good buckets for resource in task.iterate_resources(): if isinstance(resource, LocalResource) and not resource.bucket: resource.bucket = self.backend.default_bucket_name # Register new task self.backend.register_task(task) # Upload local resources for resource in task.iterate_resources(): if isinstance(resource, LocalResource): resource.upload(self.backend) # Add task to karton.tasks self.backend.produce_unrouted_task(task) self.backend.increment_metrics(KartonMetrics.TASK_PRODUCED, self.identity) return True
[docs]class Consumer(KartonServiceBase): """ Base consumer class, this is the part of Karton responsible for processing incoming tasks :param config: Karton config to use for service configuration :param identity: Karton service identity :param backend: Karton backend to use """ filters: List[Dict[str, Any]] = [] persistent: bool = True version: Optional[str] = None def __init__( self, config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None, ) -> None: super().__init__(config=config, identity=identity, backend=backend) if self.filters is None: raise ValueError("Cannot bind consumer on Empty binds") self.persistent = ( self.config.getboolean("karton", "persistent", self.persistent) and not self.debug ) self.task_timeout = self.config.getint("karton", "task_timeout") self.current_task: Optional[Task] = None self._pre_hooks: List[Tuple[Optional[str], Callable[[Task], None]]] = [] self._post_hooks: List[ Tuple[ Optional[str], Callable[[Task, Optional[BaseException]], None], ] ] = []
[docs] @abc.abstractmethod def process(self, task: Task) -> None: """ Task processing method. :param task: The incoming task object self.current_task contains task that triggered invocation of :py:meth:`karton.Consumer.process` but you should only focus on the passed task object and shouldn't interact with the field directly. """ raise NotImplementedError()
def internal_process(self, task: Task) -> None: """ The internal side of :py:meth:`Consumer.process` function, takes care of synchronizing the task state, handling errors and running task hooks. :param task: Task object to process :meta private: """ self.current_task = task self.log_handler.set_task(self.current_task) if not self.current_task.matches_filters(self.filters): self.log.info("Task rejected because binds are no longer valid.") self.backend.set_task_status(self.current_task, TaskState.FINISHED) # Task rejected: end of processing return exception_str = None try: self.log.info("Received new task - %s", self.current_task.uid) self.backend.set_task_status(self.current_task, TaskState.STARTED) self._run_pre_hooks() saved_exception = None try: if self.task_timeout: with timeout(self.task_timeout): self.process(self.current_task) else: self.process(self.current_task) except (Exception, TaskTimeoutError) as exc: saved_exception = exc raise finally: self._run_post_hooks(saved_exception) self.log.info("Task done - %s", self.current_task.uid) except (Exception, TaskTimeoutError): exc_info = sys.exc_info() exception_str = traceback.format_exception(*exc_info) self.backend.increment_metrics(KartonMetrics.TASK_CRASHED, self.identity) self.log.exception("Failed to process task - %s", self.current_task.uid) finally: self.backend.increment_metrics(KartonMetrics.TASK_CONSUMED, self.identity) task_state = TaskState.FINISHED # report the task status as crashed # if an exception was caught while processing if exception_str is not None: task_state = TaskState.CRASHED self.current_task.error = exception_str self.backend.set_task_status(self.current_task, task_state) @property def _bind(self) -> KartonBind: return KartonBind( identity=self.identity, info=self.__class__.__doc__, version=__version__, filters=self.filters, persistent=self.persistent, service_version=self.__class__.version, )
[docs] @classmethod def args_parser(cls) -> argparse.ArgumentParser: parser = super().args_parser() # store_false defaults to True, we intentionally want None there parser.add_argument( "--non-persistent", action="store_const", const=False, dest="persistent", help="Run service with non-persistent queue", ) parser.add_argument( "--task-timeout", type=int, help="Limit task execution time", ) return parser
[docs] @classmethod def config_from_args(cls, config: Config, args: argparse.Namespace) -> None: super().config_from_args(config, args) config.load_from_dict( { "karton": { "persistent": args.persistent, "task_timeout": args.task_timeout, } } )
[docs] def add_pre_hook( self, callback: Callable[[Task], None], name: Optional[str] = None ) -> None: """ Add a function to be called before processing each task. :param callback: Function of the form ``callback(task)`` where ``task`` is a :class:`karton.Task` :param name: Name of the pre-hook """ self._pre_hooks.append((name, callback))
[docs] def add_post_hook( self, callback: Callable[[Task, Optional[BaseException]], None], name: Optional[str] = None, ) -> None: """ Add a function to be called after processing each task. :param callback: Function of the form ``callback(task, exception)`` where ``task`` is a :class:`karton.Task` and ``exception`` is an exception thrown by the :meth:`karton.Consumer.process` function or ``None``. :param name: Name of the post-hook """ self._post_hooks.append((name, callback))
def _run_pre_hooks(self) -> None: """ Run registered preprocessing hooks :meta private: """ for name, callback in self._pre_hooks: try: callback(cast(Task, self.current_task)) except Exception: if name: self.log.exception("Pre-hook (%s) failed", name) else: self.log.exception("Pre-hook failed") def _run_post_hooks(self, exception: Optional[BaseException]) -> None: """ Run registered postprocessing hooks :param exception: Exception object that was caught while processing the task :meta private: """ for name, callback in self._post_hooks: try: callback(cast(Task, self.current_task), exception) except Exception: if name: self.log.exception("Post-hook (%s) failed", name) else: self.log.exception("Post-hook failed") def loop(self) -> None: """ Blocking loop that consumes tasks and runs :py:meth:`karton.Consumer.process` as a handler :meta private: """ self.log.info("Service %s started", self.identity) if self.task_timeout is not None: self.log.info(f"Task timeout is set to {self.task_timeout} seconds") # Get the old binds and set the new ones atomically old_bind = self.backend.register_bind(self._bind) if not old_bind: self.log.info("Service binds created.") elif old_bind != self._bind: self.log.info("Binds changed, old service instances should exit soon.") for task_filter in self.filters: self.log.info("Binding on: %s", task_filter) with self.graceful_killer(): while not self.shutdown: if self.backend.get_bind(self.identity) != self._bind: self.log.info("Binds changed, shutting down.") break task = self.backend.consume_routed_task(self.identity) if task: self.internal_process(task)
[docs]class LogConsumer(KartonServiceBase): """ Base class for log consumer subsystems. You can consume logs from specific logger by setting a :py:meth:`logger_filter` class attribute. You can also select logs of specific level via :py:meth:`level` class attribute. :param config: Karton config to use for service configuration :param identity: Karton service identity :param backend: Karton backend to use """ logger_filter: Optional[str] = None level: Optional[str] = None with_service_info = True def __init__( self, config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None, ) -> None: super().__init__(config=config, identity=identity, backend=backend)
[docs] @abc.abstractmethod def process_log(self, event: Dict[str, Any]) -> None: """ The core log handler that should be overwritten in implemented log handlers :param event: Dictionary containing the log event data """ raise NotImplementedError()
def loop(self) -> None: """ Internal loop that consumes the log queues and deals with exceptions and graceful exits :meta private: """ self.log.info("Logger %s started", self.identity) with self.graceful_killer(): for log in self.backend.consume_log( logger_filter=self.logger_filter, level=self.level ): if self.shutdown: # Consumer shutdown has been requested break if not log: # No log record received until timeout, try again. continue try: self.process_log(log) except Exception: """ This is log handler exception, so DO NOT USE self.log HERE! """ import traceback traceback.print_exc()
[docs]class Karton(Consumer, Producer): """ This glues together Consumer and Producer - which is the most common use case """ def __init__( self, config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None, ) -> None: super().__init__(config=config, identity=identity, backend=backend) if self.config.getboolean("signaling", "status", fallback=False): self.log.info("Using status signaling") self.add_pre_hook(self._send_signaling_status_task_begin, "task_begin") self.add_post_hook(self._send_signaling_status_task_end, "task_end") def _send_signaling_status_task_begin(self, task: Task) -> None: """Send a begin status signaling task. :meta private: """ self._send_signaling_status_task("task_begin") def _send_signaling_status_task_end( self, task: Task, ex: Optional[BaseException] ) -> None: """Send a begin status signaling task. :meta private: """ self._send_signaling_status_task("task_end") def _send_signaling_status_task(self, status: str) -> None: """Send a status signaling task. :param status: Status task identifier. :meta private: """ task = Task({"type": "karton.signaling.status", "status": status}) self.send_task(task)