Karton API reference¶
karton.core.Producer, karton.core.Consumer¶
- class karton.core.Producer(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]¶
Producer part of Karton. Used for dispatching initial tasks into karton.
- Parameters
config (
karton.Config
) – Karton configuration object (optional)identity (str) – Producer name (optional)
Usage example:
from karton.core import Producer producer = Producer(identity="karton.mwdb") task = Task( headers={ "type": "sample", "kind": "raw" }, payload={ "sample": Resource("sample.exe", b"put content here") } ) producer.send_task(task)
- Parameters
config – Karton config to use for service configuration
identity – Karton producer identity
backend – Karton backend to use
- classmethod args_description() str ¶
Return short description for argument parser.
- classmethod args_parser() argparse.ArgumentParser ¶
Return ArgumentParser for main() class method.
This method should be overridden and call super methods if you want to add more arguments.
- classmethod config_from_args(config: karton.core.config.Config, args: argparse.Namespace) None ¶
Updates configuration with settings from arguments
This method should be overridden and call super methods if you want to add more arguments.
- classmethod karton_from_args(args: Optional[argparse.Namespace] = None)¶
Returns Karton instance configured using configuration files and provided arguments
Used by
KartonServiceBase.main()
method
- property log: logging.Logger¶
Return Logger instance for Karton service
If you want to use it in code that is outside of the Consumer class, use
logging.getLogger()
:import logging logging.getLogger("<identity>")
- Returns
Logging.Logger()
instance
- property log_handler: karton.core.logger.KartonLogHandler¶
Return KartonLogHandler bound to this Karton service.
Can be used to setup logging on your own by adding this handler to the chosen loggers.
- send_task(task: karton.core.task.Task) bool [source]¶
Sends a task to the unrouted task queue. Takes care of logging. Given task will be child of task we are currently handling (if such exists).
- Parameters
task – Task object to be sent
- Returns
Bool indicating if the task was delivered
- setup_logger(level: Optional[Union[str, int]] = None) None ¶
Setup logger for Karton service (StreamHandler and karton.logs handler)
Called by
Consumer.loop()
. If you want to use logger for Producer, you need to call it yourself, but remember to set the identity.- Parameters
level – Logging level. Default is logging.INFO (unless different value is set in Karton config)
- class karton.core.Consumer(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]¶
Base consumer class, this is the part of Karton responsible for processing incoming tasks
- Parameters
config – Karton config to use for service configuration
identity – Karton service identity
backend – Karton backend to use
- add_post_hook(callback: Callable[[karton.core.task.Task, Optional[Exception]], None], name: Optional[str] = None) None [source]¶
Add a function to be called after processing each task.
- Parameters
callback – Function of the form
callback(task, exception)
wheretask
is akarton.Task
andexception
is an exception thrown by thekarton.Consumer.process()
function orNone
.name – Name of the post-hook
- add_pre_hook(callback: Callable[[karton.core.task.Task], None], name: Optional[str] = None) None [source]¶
Add a function to be called before processing each task.
- Parameters
callback – Function of the form
callback(task)
wheretask
is akarton.Task
name – Name of the pre-hook
- classmethod args_description() str ¶
Return short description for argument parser.
- classmethod args_parser() argparse.ArgumentParser [source]¶
Return ArgumentParser for main() class method.
This method should be overridden and call super methods if you want to add more arguments.
- classmethod config_from_args(config: karton.core.config.Config, args: argparse.Namespace) None [source]¶
Updates configuration with settings from arguments
This method should be overridden and call super methods if you want to add more arguments.
- classmethod karton_from_args(args: Optional[argparse.Namespace] = None)¶
Returns Karton instance configured using configuration files and provided arguments
Used by
KartonServiceBase.main()
method
- property log: logging.Logger¶
Return Logger instance for Karton service
If you want to use it in code that is outside of the Consumer class, use
logging.getLogger()
:import logging logging.getLogger("<identity>")
- Returns
Logging.Logger()
instance
- property log_handler: karton.core.logger.KartonLogHandler¶
Return KartonLogHandler bound to this Karton service.
Can be used to setup logging on your own by adding this handler to the chosen loggers.
- main() None ¶
Main method invoked from CLI.
- abstract process(task: karton.core.task.Task) None [source]¶
Task processing method.
- Parameters
task – The incoming task object
self.current_task contains task that triggered invocation of
karton.Consumer.process()
but you should only focus on the passed task object and shouldn’t interact with the field directly.
- setup_logger(level: Optional[Union[str, int]] = None) None ¶
Setup logger for Karton service (StreamHandler and karton.logs handler)
Called by
Consumer.loop()
. If you want to use logger for Producer, you need to call it yourself, but remember to set the identity.- Parameters
level – Logging level. Default is logging.INFO (unless different value is set in Karton config)
- class karton.core.Karton(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]¶
This glues together Consumer and Producer - which is the most common use case
karton.core.LogConsumer¶
- class karton.core.LogConsumer(config: Optional[karton.core.config.Config] = None, identity: Optional[str] = None, backend: Optional[karton.core.backend.KartonBackend] = None)[source]¶
Base class for log consumer subsystems.
You can consume logs from specific logger by setting a
logger_filter()
class attribute.You can also select logs of specific level via
level()
class attribute.- Parameters
config – Karton config to use for service configuration
identity – Karton service identity
backend – Karton backend to use
karton.core.Resource¶
- karton.core.resource.Resource¶
alias of
karton.core.resource.LocalResource
- class karton.core.resource.LocalResource(name: str, content: Optional[Union[str, bytes]] = None, path: Optional[str] = None, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None, sha256: Optional[str] = None, fd: Optional[IO[bytes]] = None, _flags: Optional[List[str]] = None, _close_fd: bool = False)[source]¶
Represents local resource with arbitrary binary data e.g. file contents.
Local resources will be uploaded to object hub (S3) during task dispatching.
# Creating resource from bytes sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...") # Creating resource from path sample = Resource("original_name.exe", path="sample/original_name.exe")
- Parameters
name – Name of the resource (e.g. name of file)
content – Resource content
path – Path of file with resource content
bucket – Alternative S3 bucket for resource
metadata – Resource metadata
uid – Alternative S3 resource id
sha256 – Resource sha256 hash
fd – Seekable file descriptor
_flags – Resource flags
_close_fd – Close file descriptor after upload (default: False)
- property content: bytes¶
Resource content. Reads the file if the file was not read before.
- Returns
Content bytes
- classmethod from_directory(name: str, directory_path: str, compression: int = 8, in_memory: bool = False, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None) karton.core.resource.LocalResource [source]¶
Resource extension, allowing to pass whole directory as a zipped resource.
Reads all files contained in directory_path recursively and packs them into zip file.
# Creating zipped resource from path dumps = LocalResource.from_directory("dumps", directory_path="dumps/")
- Parameters
name – Name of the resource (e.g. name of file)
directory_path – Path of the resource directory
compression – Compression level (default is zipfile.ZIP_DEFLATED)
in_memory – Don’t create temporary file and make in-memory zip file (default: False)
bucket – Alternative S3 bucket for resource
metadata – Resource metadata
uid – Alternative S3 resource id
- Returns
LocalResource
instance with zipped contents
- property sha256: Optional[str]¶
Resource sha256
- Returns
Hexencoded resource SHA256 hash
- property size: int¶
Resource size
- Returns
Resource size
- property uid: str¶
Resource identifier (UUID)
- Returns
Resource identifier
- class karton.core.resource.RemoteResource(name: str, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None, size: Optional[int] = None, backend: Optional[KartonBackend] = None, sha256: Optional[str] = None, _flags: Optional[List[str]] = None)[source]¶
Keeps reference to remote resource object shared between subsystems via object storage (S3)
Should never be instantiated directly by subsystem, but can be directly passed to outgoing payload.
- Parameters
name – Name of the resource (e.g. name of file)
bucket – Alternative S3 bucket for resource
metadata – Resource metadata
uid – Alternative S3 resource id
size – Resource size
backend –
KartonBackend()
to bind to this resourcesha256 – Resource sha256 hash
_flags – Resource flags
- property content: bytes¶
Resource content. Performs download when resource was not loaded before.
- Returns
Content bytes
- download() bytes [source]¶
Downloads remote resource content from object hub into memory.
sample = self.current_task.get_resource("sample") # Ensure that resource will be downloaded before it will be # passed to processing method sample.download() self.process_sample(sample)
- Returns
Downloaded content bytes
- download_temporary_file(suffix=None) Iterator[IO[bytes]] [source]¶
Downloads remote resource into named temporary file.
sample = self.current_task.get_resource("sample") with sample.download_temporary_file() as f: contents = f.read() path = f.name # Temporary file is deleted after exitting the "with" scope
- Returns
ContextManager with the temporary file
- download_to_file(path: str) None [source]¶
Downloads remote resource into file.
sample = self.current_task.get_resource("sample") sample.download_to_file("sample/sample.exe") with open("sample/sample.exe", "rb") as f: contents = f.read()
- Parameters
path – Path to download the resource into
- extract_temporary() Iterator[str] [source]¶
If resource contains a Zip file, extracts files contained in Zip to the temporary directory.
Returns path of directory with extracted files. Directory is recursively deleted after leaving the context.
dumps = self.current_task.get_resource("dumps") with dumps.extract_temporary() as dumps_path: print("Fetched dumps:", os.listdir(dumps_path))
By default: method downloads zip into temporary file, which is deleted after extraction. If you want to load zip into memory, call
RemoteResource.download()
first.- Returns
ContextManager with the temporary directory
- extract_to_directory(path: str) None [source]¶
If resource contains a Zip file, extracts files contained in Zip into provided path.
By default: method downloads zip into temporary file, which is deleted after extraction. If you want to load zip into memory, call
RemoteResource.download()
first.- Parameters
path – Directory path where the resource should be unpacked
- loaded() bool [source]¶
Checks whether resource is loaded into memory
- Returns
Flag indicating if the resource is loaded or not
- property sha256: Optional[str]¶
Resource sha256
- Returns
Hexencoded resource SHA256 hash
- property size: int¶
Resource size
- Returns
Resource size
- property uid: str¶
Resource identifier (UUID)
- Returns
Resource identifier
- zip_file() Iterator[zipfile.ZipFile] [source]¶
If resource contains a Zip file, downloads it to the temporary file and wraps it with ZipFile object.
dumps = self.current_task.get_resource("dumps") with dumps.zip_file() as zipf: print("Fetched dumps: ", zipf.namelist())
By default: method downloads zip into temporary file, which is deleted after leaving the context. If you want to load zip into memory, call
RemoteResource.download()
first.If you want to pre-download Zip under specified path and open it using zipfile module, you need to do this manually:
dumps = self.current_task.get_resource("dumps") # Download zip file zip_path = "./dumps.zip" dumps.download_to_file(zip_path) zipf = zipfile.Zipfile(zip_path)
- Returns
ContextManager with zipfile
karton.core.Task¶
- class karton.core.task.Task(headers: Dict[str, Any], payload: Optional[Dict[str, Any]] = None, headers_persistent: Optional[Dict[str, Any]] = None, payload_persistent: Optional[Dict[str, Any]] = None, priority: Optional[karton.core.task.TaskPriority] = None, parent_uid: Optional[str] = None, root_uid: Optional[str] = None, orig_uid: Optional[str] = None, uid: Optional[str] = None, error: Optional[List[str]] = None, _status: Optional[karton.core.task.TaskState] = None, _last_update: Optional[float] = None)[source]¶
Task representation with headers and resources.
- Parameters
headers – Routing information for other systems, this is what allows for evaluation of given system usefulness for given task. Systems filter by these.
payload – Any instance of
dict
- contains resources and additional informationsheaders_persistent – Persistent headers for whole task subtree, propagated from initial task.
payload_persistent – Persistent payload set for whole task subtree, propagated from initial task
priority – Priority of whole task subtree, propagated from initial task like payload_persistent
parent_uid – Id of a routed task that has created this task by a karton with
send_task()
root_uid – Id of an unrouted task that is the root of this task’s analysis tree
orig_uid – Id of an unrouted (or crashed routed) task that was forked to create this task
uid – This tasks unique identifier
error – Traceback of a exception that happened while performing this task
- add_payload(name: str, content: Any, persistent: bool = False) None [source]¶
Add payload to task
- Parameters
name – Name of the payload
content – Payload to be added
persistent – Flag if the payload should be persistent
- add_resource(name: str, resource: karton.core.resource.ResourceBase, persistent: bool = False) None [source]¶
Add resource to task.
Alias for
add_payload()
Deprecated since version 3.0.0: Use
add_payload()
instead.- Parameters
name – Name of the resource
resource – Resource to be added
persistent – Flag if the resource should be persistent
- derive_task(headers: Dict[str, Any]) karton.core.task.Task [source]¶
Creates copy of task with different headers, useful for proxying resource with added metadata.
class MZClassifier(Karton): identity = "karton.mz-classifier" filters = { "type": "sample", "kind": "raw" } def process(self, task: Task) -> None: sample = task.get_resource("sample") if sample.content.startswith(b"MZ"): self.log.info("MZ detected!") task = task.derive_task({ "type": "sample", "kind": "exe" }) self.send_task(task) self.log.info("Not a MZ :<")
Changed in version 3.0.0: Moved from static method to regular method:
Task.derive_task(headers, task)
must be ported totask.derive_task(headers)
- Parameters
headers – New headers for the task
- Returns
Copy of task with new headers
- get_payload(name: str, default: Optional[Any] = None) Any [source]¶
Get payload from task
- Parameters
name – name of the payload
default – Value to be returned if payload is not present
- Returns
Payload content
- get_resource(name: str) karton.core.resource.ResourceBase [source]¶
Get resource from task.
Ensures that payload contains an Resource object. If not - raises
TypeError
- Parameters
name – Name of the resource to get
- Returns
karton.ResourceBase
- resource with given name
- has_payload(name: str) bool [source]¶
Checks whether payload exists
- Parameters
name – Name of the payload to be checked
- Returns
If tasks payload contains a value with given name
- is_header_persistent(name: str) bool [source]¶
Checks whether header exists and is persistent
- Parameters
name – Name of the header to be checked
- Returns
If tasks header with given name is persistent
- is_payload_persistent(name: str) bool [source]¶
Checks whether payload exists and is persistent
- Parameters
name – Name of the payload to be checked
- Returns
If tasks payload with given name is persistent
- iterate_resources() Iterator[karton.core.resource.ResourceBase] [source]¶
Get list of resource objects bound to Task
- Returns
An iterator over all task resources
- remove_payload(name: str) None [source]¶
Removes payload for the task
If payload doesn’t exist or is persistent - raises KeyError
- Parameters
name – Payload name to be removed
karton.core.Config¶
- class karton.core.config.Config(path: Optional[str] = None, check_sections: Optional[bool] = True)[source]¶
Simple config loader.
Loads configuration from paths specified below (in provided order):
/etc/karton/karton.ini
(global)~/.config/karton/karton.ini
(user local)./karton.ini
(subsystem local)<path>
optional, additional path provided in arguments
It is also possible to pass configuration via environment variables. Any variable named KARTON_FOO_BAR is equivalent to setting ‘bar’ variable in section ‘foo’ (note the lowercase names).
Environment variables have higher precedence than those loaded from files.
- Parameters
path – Path to additional configuration file
check_sections – Check if sections
redis
ands3
are defined in the configuration
- append_to_list(section_name: str, option_name: str, value: Any) None [source]¶
Appends value to a list in configuration
- get(section_name: str, option_name: str, fallback: Optional[Any] = None) Any [source]¶
Gets value from configuration or returns
fallback
(None by default) if value was not set.
- getboolean(section_name: str, option_name: str, fallback: bool) bool [source]¶
- getboolean(section_name: str, option_name: str) Optional[bool]
Gets value from configuration or returns
fallback
(None by default) if value was not set. Value is coerced to bool type.
- getint(section_name: str, option_name: str, fallback: int) int [source]¶
- getint(section_name: str, option_name: str) Optional[int]
Gets value from configuration or returns
fallback
(None by default) if value was not set. Value is coerced to int type.
- load_from_dict(data: Dict[str, Dict[str, Any]]) None [source]¶
Updates configuration values from dictionary compatible with
ConfigParser.read_dict
. Accepts value in native type, so you don’t need to convert them to string.None values are treated like missing value and are not added.
{ "section-name": { "option-name": "value" } }