Karton API reference

karton.core.Producer, karton.core.Consumer

class karton.core.Producer(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]

Producer part of Karton. Used for dispatching initial tasks into karton.

Parameters
  • config (karton.Config) – Karton configuration object (optional)

  • identity (str) – Producer name (optional)

Usage example:

from karton.core import Producer

producer = Producer(identity="karton.mwdb")
task = Task(
    headers={
        "type": "sample",
        "kind": "raw"
    },
    payload={
        "sample": Resource("sample.exe", b"put content here")
    }
)
producer.send_task(task)
Parameters
  • config – Karton config to use for service configuration

  • identity – Karton producer identity

  • backend – Karton backend to use

classmethod args_description() str

Return short description for argument parser.

classmethod args_parser() argparse.ArgumentParser

Return ArgumentParser for main() class method.

This method should be overridden and call super methods if you want to add more arguments.

classmethod config_from_args(config: karton.core.config.Config, args: argparse.Namespace) None

Updates configuration with settings from arguments

This method should be overridden and call super methods if you want to add more arguments.

classmethod karton_from_args(args: Optional[argparse.Namespace] = None)

Returns Karton instance configured using configuration files and provided arguments

Used by KartonServiceBase.main() method

property log: logging.Logger

Return Logger instance for Karton service

If you want to use it in code that is outside of the Consumer class, use logging.getLogger():

import logging
logging.getLogger("<identity>")
Returns

Logging.Logger() instance

property log_handler: karton.core.logger.KartonLogHandler

Return KartonLogHandler bound to this Karton service.

Can be used to setup logging on your own by adding this handler to the chosen loggers.

send_task(task: karton.core.task.Task) bool[source]

Sends a task to the unrouted task queue. Takes care of logging. Given task will be child of task we are currently handling (if such exists).

Parameters

task – Task object to be sent

Returns

Bool indicating if the task was delivered

setup_logger(level: Optional[Union[str, int]] = None) None

Setup logger for Karton service (StreamHandler and karton.logs handler)

Called by Consumer.loop(). If you want to use logger for Producer, you need to call it yourself, but remember to set the identity.

Parameters

level – Logging level. Default is logging.INFO (unless different value is set in Karton config)

class karton.core.Consumer(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]

Base consumer class, this is the part of Karton responsible for processing incoming tasks

Parameters
  • config – Karton config to use for service configuration

  • identity – Karton service identity

  • backend – Karton backend to use

add_post_hook(callback: Callable[[karton.core.task.Task, Optional[Exception]], None], name: Optional[str] = None) None[source]

Add a function to be called after processing each task.

Parameters
  • callback – Function of the form callback(task, exception) where task is a karton.Task and exception is an exception thrown by the karton.Consumer.process() function or None.

  • name – Name of the post-hook

add_pre_hook(callback: Callable[karton.core.task.Task, None], name: Optional[str] = None) None[source]

Add a function to be called before processing each task.

Parameters
  • callback – Function of the form callback(task) where task is a karton.Task

  • name – Name of the pre-hook

classmethod args_description() str

Return short description for argument parser.

classmethod args_parser() argparse.ArgumentParser[source]

Return ArgumentParser for main() class method.

This method should be overridden and call super methods if you want to add more arguments.

classmethod config_from_args(config: karton.core.config.Config, args: argparse.Namespace) None[source]

Updates configuration with settings from arguments

This method should be overridden and call super methods if you want to add more arguments.

classmethod karton_from_args(args: Optional[argparse.Namespace] = None)

Returns Karton instance configured using configuration files and provided arguments

Used by KartonServiceBase.main() method

property log: logging.Logger

Return Logger instance for Karton service

If you want to use it in code that is outside of the Consumer class, use logging.getLogger():

import logging
logging.getLogger("<identity>")
Returns

Logging.Logger() instance

property log_handler: karton.core.logger.KartonLogHandler

Return KartonLogHandler bound to this Karton service.

Can be used to setup logging on your own by adding this handler to the chosen loggers.

main() None

Main method invoked from CLI.

abstract process(task: karton.core.task.Task) None[source]

Task processing method.

Parameters

task – The incoming task object

self.current_task contains task that triggered invocation of karton.Consumer.process() but you should only focus on the passed task object and shouldn’t interact with the field directly.

setup_logger(level: Optional[Union[str, int]] = None) None

Setup logger for Karton service (StreamHandler and karton.logs handler)

Called by Consumer.loop(). If you want to use logger for Producer, you need to call it yourself, but remember to set the identity.

Parameters

level – Logging level. Default is logging.INFO (unless different value is set in Karton config)

class karton.core.Karton(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]

This glues together Consumer and Producer - which is the most common use case

karton.core.LogConsumer

class karton.core.LogConsumer(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]

Base class for log consumer subsystems.

You can consume logs from specific logger by setting a logger_filter() class attribute.

You can also select logs of specific level via level() class attribute.

Parameters
  • config – Karton config to use for service configuration

  • identity – Karton service identity

  • backend – Karton backend to use

abstract process_log(event: Dict[str, Any]) None[source]

The core log handler that should be overwritten in implemented log handlers

Parameters

event – Dictionary containing the log event data

karton.core.Resource

karton.core.resource.Resource

alias of karton.core.resource.LocalResource

class karton.core.resource.LocalResource(name: str, content: Optional[Union[str, bytes]] = None, path: Optional[str] = None, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None, sha256: Optional[str] = None, fd: Optional[IO[bytes]] = None, _flags: Optional[List[str]] = None, _close_fd: bool = False)[source]

Represents local resource with arbitrary binary data e.g. file contents.

Local resources will be uploaded to object hub (S3) during task dispatching.

# Creating resource from bytes
sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\
PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...")

# Creating resource from path
sample = Resource("original_name.exe", path="sample/original_name.exe")
Parameters
  • name – Name of the resource (e.g. name of file)

  • content – Resource content

  • path – Path of file with resource content

  • bucket – Alternative S3 bucket for resource

  • metadata – Resource metadata

  • uid – Alternative S3 resource id

  • sha256 – Resource sha256 hash

  • fd – Seekable file descriptor

  • _flags – Resource flags

  • _close_fd – Close file descriptor after upload (default: False)

property content: bytes

Resource content. Reads the file if the file was not read before.

Returns

Content bytes

classmethod from_directory(name: str, directory_path: str, compression: int = 8, in_memory: bool = False, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None) karton.core.resource.LocalResource[source]

Resource extension, allowing to pass whole directory as a zipped resource.

Reads all files contained in directory_path recursively and packs them into zip file.

# Creating zipped resource from path
dumps = LocalResource.from_directory("dumps", directory_path="dumps/")
Parameters
  • name – Name of the resource (e.g. name of file)

  • directory_path – Path of the resource directory

  • compression – Compression level (default is zipfile.ZIP_DEFLATED)

  • in_memory – Don’t create temporary file and make in-memory zip file (default: False)

  • bucket – Alternative S3 bucket for resource

  • metadata – Resource metadata

  • uid – Alternative S3 resource id

Returns

LocalResource instance with zipped contents

property sha256: Optional[str]

Resource sha256

Returns

Hexencoded resource SHA256 hash

property size: int

Resource size

Returns

Resource size

property uid: str

Resource identifier (UUID)

Returns

Resource identifier

class karton.core.resource.RemoteResource(name: str, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None, size: Optional[int] = None, backend: Optional[KartonBackend] = None, sha256: Optional[str] = None, _flags: Optional[List[str]] = None)[source]

Keeps reference to remote resource object shared between subsystems via object storage (S3)

Should never be instantiated directly by subsystem, but can be directly passed to outgoing payload.

Parameters
  • name – Name of the resource (e.g. name of file)

  • bucket – Alternative S3 bucket for resource

  • metadata – Resource metadata

  • uid – Alternative S3 resource id

  • size – Resource size

  • backendKartonBackend() to bind to this resource

  • sha256 – Resource sha256 hash

  • _flags – Resource flags

property content: bytes

Resource content. Performs download when resource was not loaded before.

Returns

Content bytes

download() bytes[source]

Downloads remote resource content from object hub into memory.

sample = self.current_task.get_resource("sample")

# Ensure that resource will be downloaded before it will be
# passed to processing method
sample.download()

self.process_sample(sample)
Returns

Downloaded content bytes

download_temporary_file(suffix=None) Iterator[IO[bytes]][source]

Downloads remote resource into named temporary file.

sample = self.current_task.get_resource("sample")

with sample.download_temporary_file() as f:
    contents = f.read()
    path = f.name

# Temporary file is deleted after exitting the "with" scope
Returns

ContextManager with the temporary file

download_to_file(path: str) None[source]

Downloads remote resource into file.

sample = self.current_task.get_resource("sample")

sample.download_to_file("sample/sample.exe")

with open("sample/sample.exe", "rb") as f:
    contents = f.read()
Parameters

path – Path to download the resource into

extract_temporary() Iterator[str][source]

If resource contains a Zip file, extracts files contained in Zip to the temporary directory.

Returns path of directory with extracted files. Directory is recursively deleted after leaving the context.

dumps = self.current_task.get_resource("dumps")

with dumps.extract_temporary() as dumps_path:
    print("Fetched dumps:", os.listdir(dumps_path))

By default: method downloads zip into temporary file, which is deleted after extraction. If you want to load zip into memory, call RemoteResource.download() first.

Returns

ContextManager with the temporary directory

extract_to_directory(path: str) None[source]

If resource contains a Zip file, extracts files contained in Zip into provided path.

By default: method downloads zip into temporary file, which is deleted after extraction. If you want to load zip into memory, call RemoteResource.download() first.

Parameters

path – Directory path where the resource should be unpacked

loaded() bool[source]

Checks whether resource is loaded into memory

Returns

Flag indicating if the resource is loaded or not

property sha256: Optional[str]

Resource sha256

Returns

Hexencoded resource SHA256 hash

property size: int

Resource size

Returns

Resource size

property uid: str

Resource identifier (UUID)

Returns

Resource identifier

unload() None[source]

Unloads resource object from memory

zip_file() Iterator[zipfile.ZipFile][source]

If resource contains a Zip file, downloads it to the temporary file and wraps it with ZipFile object.

dumps = self.current_task.get_resource("dumps")

with dumps.zip_file() as zipf:
    print("Fetched dumps: ", zipf.namelist())

By default: method downloads zip into temporary file, which is deleted after leaving the context. If you want to load zip into memory, call RemoteResource.download() first.

If you want to pre-download Zip under specified path and open it using zipfile module, you need to do this manually:

dumps = self.current_task.get_resource("dumps")

# Download zip file
zip_path = "./dumps.zip"
dumps.download_to_file(zip_path)

zipf = zipfile.Zipfile(zip_path)
Returns

ContextManager with zipfile

karton.core.Task

class karton.core.task.Task(headers: Dict[str, Any], payload: Optional[Dict[str, Any]] = None, payload_persistent: Optional[Dict[str, Any]] = None, priority: Optional[karton.core.task.TaskPriority] = None, parent_uid: Optional[str] = None, root_uid: Optional[str] = None, orig_uid: Optional[str] = None, uid: Optional[str] = None, error: Optional[List[str]] = None)[source]

Task representation with headers and resources.

Parameters
  • headers – Routing information for other systems, this is what allows for evaluation of given system usefulness for given task. Systems filter by these.

  • payload – Any instance of dict - contains resources and additional informations

  • payload_persistent – Persistent payload set for whole task subtree, propagated from initial task

  • priority – Priority of whole task subtree, propagated from initial task like payload_persistent

  • parent_uid – Id of a routed task that has created this task by a karton with send_task()

  • root_uid – Id of an unrouted task that is the root of this task’s analysis tree

  • orig_uid – Id of an unrouted (or crashed routed) task that was forked to create this task

  • uid – This tasks unique identifier

  • error – Traceback of a exception that happened while performing this task

add_payload(name: str, content: Any, persistent: bool = False) None[source]

Add payload to task

Parameters
  • name – Name of the payload

  • content – Payload to be added

  • persistent – Flag if the payload should be persistent

add_resource(name: str, resource: karton.core.resource.ResourceBase, persistent: bool = False) None[source]

Add resource to task.

Alias for add_payload()

Deprecated since version 3.0.0: Use add_payload() instead.

Parameters
  • name – Name of the resource

  • resource – Resource to be added

  • persistent – Flag if the resource should be persistent

derive_task(headers: Dict[str, Any]) karton.core.task.Task[source]

Creates copy of task with different headers, useful for proxying resource with added metadata.

class MZClassifier(Karton):
    identity = "karton.mz-classifier"
    filters = {
        "type": "sample",
        "kind": "raw"
    }

    def process(self, task: Task) -> None:
        sample = task.get_resource("sample")
        if sample.content.startswith(b"MZ"):
            self.log.info("MZ detected!")
            task = task.derive_task({
                "type": "sample",
                "kind": "exe"
            })
            self.send_task(task)
        self.log.info("Not a MZ :<")

Changed in version 3.0.0: Moved from static method to regular method:

Task.derive_task(headers, task) must be ported to task.derive_task(headers)

Parameters

headers – New headers for the task

Returns

Copy of task with new headers

get_payload(name: str, default: Optional[Any] = None) Any[source]

Get payload from task

Parameters
  • name – name of the payload

  • default – Value to be returned if payload is not present

Returns

Payload content

get_resource(name: str) karton.core.resource.ResourceBase[source]

Get resource from task.

Ensures that payload contains an Resource object. If not - raises TypeError

Parameters

name – Name of the resource to get

Returns

karton.ResourceBase - resource with given name

has_payload(name: str) bool[source]

Checks whether payload exists

Parameters

name – Name of the payload to be checked

Returns

If tasks payload contains a value with given name

is_payload_persistent(name: str) bool[source]

Checks whether payload exists and is persistent

Parameters

name – Name of the payload to be checked

Returns

If tasks payload with given name is persistent

iterate_resources() Iterator[karton.core.resource.ResourceBase][source]

Get list of resource objects bound to Task

Returns

An iterator over all task resources

remove_payload(name: str) None[source]

Removes payload for the task

If payload doesn’t exist or is persistent - raises KeyError

Parameters

name – Payload name to be removed

walk_payload_bags() Iterator[Tuple[Dict[str, Any], str, Any]][source]

Iterate over all payload bags and direct payloads contained in them

Generates tuples (payload_bag, key, value)

Returns

An iterator over all task payload bags

walk_payload_items() Iterator[Tuple[str, Any]][source]

Iterate recursively over all payload items

Generates tuples (path, value).

Returns

An iterator over all task payload values

karton.core.Config

class karton.core.config.Config(path: Optional[str] = None, check_sections: Optional[bool] = True)[source]

Simple config loader.

Loads configuration from paths specified below (in provided order):

  • /etc/karton/karton.ini (global)

  • ~/.config/karton/karton.ini (user local)

  • ./karton.ini (subsystem local)

  • <path> optional, additional path provided in arguments

It is also possible to pass configuration via environment variables. Any variable named KARTON_FOO_BAR is equivalent to setting ‘bar’ variable in section ‘foo’ (note the lowercase names).

Environment variables have higher precedence than those loaded from files.

Parameters
  • path – Path to additional configuration file

  • check_sections – Check if sections redis and s3 are defined in the configuration

append_to_list(section_name: str, option_name: str, value: Any) None[source]

Appends value to a list in configuration

get(section_name: str, option_name: str, fallback: Optional[Any] = None) Any[source]

Gets value from configuration or returns fallback (None by default) if value was not set.

getboolean(section_name: str, option_name: str, fallback: bool) bool[source]
getboolean(section_name: str, option_name: str) Optional[bool]

Gets value from configuration or returns fallback (None by default) if value was not set. Value is coerced to bool type.

getint(section_name: str, option_name: str, fallback: int) int[source]
getint(section_name: str, option_name: str) Optional[int]

Gets value from configuration or returns fallback (None by default) if value was not set. Value is coerced to int type.

has_option(section_name: str, option_name: str) bool[source]

Checks if configuration value is set

has_section(section_name: str) bool[source]

Checks if configuration section exists

load_from_dict(data: Dict[str, Dict[str, Any]]) None[source]

Updates configuration values from dictionary compatible with ConfigParser.read_dict. Accepts value in native type, so you don’t need to convert them to string.

None values are treated like missing value and are not added.

{
   "section-name": {
       "option-name": "value"
   }
}
set(section_name: str, option_name: str, value: Any) None[source]

Sets value in configuration