Karton service examples

Here are few examples of common Karton system patterns.

Producer services

import sys
from karton.core import Config, Producer, Task, Resource

config = Config("karton.ini")
producer = Producer(config)

filename = sys.argv[1]
with open(filename, "rb") as f:
    contents = f.read()

resource = Resource(os.path.basename(filename), contents)

task = Task({"type": "sample", "kind": "raw"})

task.add_resource("sample", resource)
task.add_payload("tags", ["simple_producer"])
task.add_payload("additional_info", ["This sample has been added by simple producer example"])

logging.info('pushing file to karton %s, task %s' % (name, task))
producer.send_task(task)

Consumer services

Consumer has to define identity, a name used for identification and binding in RMQ and filters - a list of dicts determining what types of tasks the service wants to process.

Elements in the list are OR’ed and items inside dicts are AND’ed.

import sys
from karton.core import Config, Consumer, Task, Resource

class Reporter(Consumer):
    identity = "karton.reporter"
    filters = [
        {
            "type": "sample",
            "stage": "recognized"
        },
        {
            "type": "sample",
            "stage": "analyzed"
        },
        {
            "type": "config"
        }
    ]

Above example accepts headers like:

{
    "type": "sample",
    "stage": "recognized",
    "kind": "runnable",
    "platform": "win32",
    "extension": "jar"
}

or

{
    "type": "config",
    "kind": "cuckoo1"
}

but not

{
    "type": "sample",
    "stage": "something"
}

Next step is to define process method, this is handler for incoming tasks that match our filters.

def process(self, task: Task) -> None:
   if task.headers["type"] == "sample":
       return self.process_sample(task)
   else:
       return self.process_config(task)

def process_sample(self, task: Task) -> None:
    sample = task.get_resource("sample")
    # ...

def process_config(self, task: Task) -> None:
    config = task.get_payload("config")
    # ...

task.headers gives you information on why task was routed and methods like get_resource or get_payload allow you to get resources or metadata from task.

Finally, we need to run our module, we get this done with loop method, which blocks on listening for new tasks, running process when needed.

if __name__ == "__main__":
    c = Reporter()
    c.loop()

Karton services (Producer + Consumer)

Karton class is simply Producer and Consumer bundled together.

As defined in karton/core/karton.py:

class Karton(Consumer, Producer):
    """
    This glues together Consumer and Producer - which is the most common use case
    """

Receiving data is done exactly like in Consumer. Using producer is no different as well, just use self.send_task.

Full-blown example below.

from karton.core import Karton, Task

class SomeNameKarton(Karton):
    # Define identity and filters as you would in the Consumer class
    identity = "karton.somename"
    filters = [
        {
            "type": "config",
        },
        {
            "type": "analysis",
            "kind": "cuckoo1"
        },
    ]

    # Method called by Karton library
    def process(self, task: Task) -> None:
        # Getting resources we need without downloading them locally
        analysis_resource = task.get_resource('analysis')
        config_resource = task.get_resource('config')

        # Log with self.log
        self.log.info("Got resources, lets analyze them!")
        ...

        # Send our results for further processing or reporting
        # Producer part
        t = Task({"type": "sample"})
        t.add_resource("sample", Resource(filename, content))
        self.send_task(task)

Log consumer

By default, all logs created in Karton systems are published to a specialized log consumer using the Redis PUBSUB pattern.

This is a very simple example of a system that implements the LogConsumer interface and prints logs to stderr.

import sys
from karton.core.karton import LogConsumer


class StdoutLogger(LogConsumer):
    identity = "karton.stdout-logger"

    def process_log(self, event: dict) -> None:
        # there are "log" and "operation" events
        if event.get("type") == "log":
            print(f"{event['name']}: {event['message']}", file=sys.stderr, flush=True)


if __name__ == "__main__":
    StdoutLogger().loop()