Advanced concepts

Routed and unrouted tasks (task forking)

During its lifetime, the task will transfer between various states and its reference will be passed through several queues, a simple way to understand it is to see how the tasks state changes in various moments:

_images/task-lifetime.svg

Each new task is registered in the system by a call to karton.Producer.send_task() and starts its life in the unrouted task queue with a TaskState.Declared state.

All actual task data is stored in the Karton.task namespace and all other (routed and unrouted) queues will be always only holding a reference to a record from this place.

The main broker - karton.System constantly looks over the unrouted (karton.tasks) queue and keeps the tasks running as well as clears up leftover unneeded data.

Because task headers can be accepted by more than one consumer the task has to be forked before it goes to the appropriate consumer (routed) queues. Based on unrouted task, Karton.System generates as many routed tasks as there are matching queues. These tasks are separate, independent instances, so they have different uid than original unrouted task.

Note

While uid of routed and unrouted tasks are different, parent_uid stays the same. parent_uid always identifies the routed task.

Reference to the unrouted task is called orig_uid.

Each registered consumer monitors its (routed) queue and performs analysis on all tasks that appear there. As soon as the consumer starts working on a given task, it sends a signal to the broker to mark the tasks state as TaskState.Started.

If everything goes smoothly, the consumer finishes the tasks and sends a similar signal, this time marking the task as TaskState.Finished. If there is a problem and an exception is thrown within the self.process function, TaskState.Crashed is used instead.

As a part of its housekeeping, Karton.System removes all TaskState.Finished tasks immediately and TaskState.Crashed tasks after a certain grace period to allow for inspection and optional retry.

Task tree (analysis) and task life cycle

Every analysis starts from initial task spawned by karton.Producer. Initial task is consumed by consumers, which then produce next tasks for further processing. These various tasks originating from initial task can be grouped together into a task tree, representing the analysis.

_images/forking-task.svg

Each task is identified by a tuple of four identifiers:

  • uid - unique task identifier

  • parent_uid - identifier of task that spawned current task as a result of processing

  • root_uid - task tree identifier (analysis identifier, derived from uid of initial unrouted task)

  • orig_uid - identifier of the original task that was forked to create this task (unrouted task or retried crashed task)

In order to better understand how those identifiers are inherited and passed between tasks take a look at the following example:

_images/task-tree.svg

Handling logging

By default, all systems inheriting from karton.core.KartonBase() will have a custom logging.Logger() instance exposed as log(). It publishes all logged messages to a special PUBSUB key on the central Redis database.

In order to store the logs into a persistent storage like Splunk or Rsyslog you have to implement a service that will consume the log entries and send them to the final database, for an example of such service see Log consumer.

The logging level can be configured using the standard karton config and setting level in the logging section to appropriate level like "DEBUG", "INFO" or "ERROR".

Consumer queue persistence

Consumer queue is created on the first registration of consumer and it gets new tasks even if all consumer instances are offline. It guarantees that analysis will complete even after short downtime of part of subsystems. Unfortunately, it also blocks completion of the analysis when we connect a Karton Service which is currently developed or temporary.

We can turn off queue persistence using the persistent = False attribute in the Karton subsystem class.

class TemporaryConsumer(Karton):
    identity = "karton.temporary-consumer"
    filters = ...
    persistent = False

    def process(self, task: Task) -> None:
        ...

This is also the (hacky) way to remove persistent queue from the system. Just launch empty consumer with identity you want to remove, wait until all tasks will be consumed and shut down the consumer.

from karton.core import Karton

class DeleteThisConsumer(Karton):
    identity = "karton.identity-to-be-removed"
    filters = {}
    persistent = False

    def process(self, task: Task) -> None:
        pass

DeleteThisConsumer().loop()

Prioritized tasks

Karton allows to set priority for task tree: TaskPriority.HIGH, TaskPriority.NORMAL (default) or TaskPriority.LOW. Priority is determined by producer spawning an initial task.

producer = Producer()
task = Task(
    headers=...,
    priority=TaskPriority.HIGH
)
producer.send_task(task)

All tasks within the same task tree have the same priority, which is derived from the priority of initial task. If consumer will try to set different priority for spawned tasks, new priority settings will be simply ignored.

Extending configuration

During processing we may need to fetch data from external service or use libraries that need to be pre-configured. The simplest approach is to use separate configuration file, but this is a bit messy.

Karton configuration is represented by special object karton.Config, which can be explicitly provided as an argument to the Karton constructor. Config is based on configparser.ConfigParser, so we can extend it with additional sections for custom configuration.

For example, if we need to communicate with MWDB, we can make MWDB binding available via self.config.mwdb

import mwdblib

class MWDBConfig(Config):
    def __init__(self, path=None) -> None:
        super().__init__(path)
        self.mwdb_config = dict(self.config.items("mwdb"))

    def mwdb(self) -> mwdblib.MWDB:
        api_key=self.mwdb_config.get("api_key")
        api_url=self.mwdb_config.get("api_url", mwdblib.api.API_URL)

        mwdb = mwdblib.MWDB(api_key=api_key, api_url=api_url)
        if not api_key:
            mwdb.login(
                self.mwdb_config["username"],
                self.mwdb_config["password"])
        return mwdb

class GenericUnpacker(Karton):
    ...

    def process(self, task: Task) -> None:
        file_hash = task.get_payload("file_hash")
        sample = self.config.mwdb().query_file(file_hash)

if __name__ == "__main__":
    GenericUnpacker(MWDBConfig()).loop()

and provide additional section in karton.ini file:

[minio]
secret_key = <redacted>
access_key = <redacted>
address = 127.0.0.1:9000
bucket = karton
secure = 0

[redis]
host = 127.0.0.1
port = 6379

[mwdb]
api_url = http://127.0.0.1:5000/api
api_key = <redacted>

Karton-wide and instance-wide configuration

By default the configuration is searched in the following locations (by searching order):

  • /etc/karton/karton.ini

  • ~/.config/karton/karton.ini

  • ./karton.ini

  • environment variables

Each next level overrides and merges with the values loaded from the previous path. It means that we can provide karton-wide configuration and specialized instance-wide extended configuration specific for subsystem.

Contents of /etc/karton/karton.ini:

[minio]
secret_key = <redacted>
access_key = <redacted>
address = 127.0.0.1:9000
bucket = karton
secure = 0

[redis]
host = 127.0.0.1
port = 6379

and specialized configuration in the working directory ./karton.ini

[mwdb]
api_url = http://127.0.0.1:5000/api
api_key = <redacted>

Passing tasks to the external queue

Karton can be used to delegate tasks to separate queues e.g. external sandbox. External sandboxes usually have their own concurrency and queueing mechanisms, so Karton subsystem needs to:

  • dispatch task to the external service

  • wait until service ends processing

  • fetch results and spawn result tasks keeping the root_uid and parent_uid

We tried to solve this using asynchronous tasks but it turned out to be very hard to be implemented correctly and didn’t really fit in to with the Karton model.

Busy waiting

The simplest way to do that is to perform all of these actions synchronously, inside the process() method.

def process(self, task: Task) -> None:
    sample = task.get_resource("sample")

    # Dispatch task, getting the analysis_id
    with sample.download_temporary_file() as f:
        analysis_id = sandbox.push_file(f)

    # Wait until analysis finish
    while sandbox.is_finished(analysis_id):
        # Check every 5 seconds
        time.sleep(5)

    # If analysis has been finished: get the results and process them
    analysis = sandbox.get_results(analysis_id)
    self.process_results(analysis)