Karton API reference¶
karton.core.Producer, karton.core.Consumer¶
- class karton.core.Producer(config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None)[source]¶
Producer part of Karton. Used for dispatching initial tasks into karton.
- Parameters
config (
karton.Config) – Karton configuration object (optional)identity (str) – Producer name (optional)
Usage example:
from karton.core import Producer producer = Producer(identity="karton.mwdb") task = Task( headers={ "type": "sample", "kind": "raw" }, payload={ "sample": Resource("sample.exe", b"put content here") } ) producer.send_task(task)
- Parameters
config – Karton config to use for service configuration
identity – Karton producer identity
backend – Karton backend to use
- classmethod args_description() str¶
Return short description for argument parser.
- classmethod args_parser() ArgumentParser¶
Return ArgumentParser for main() class method.
This method should be overridden and call super methods if you want to add more arguments.
- classmethod config_from_args(config: Config, args: Namespace) None¶
Updates configuration with settings from arguments
This method should be overridden and call super methods if you want to add more arguments.
- identity: str = ''¶
Karton service identity
- classmethod karton_from_args(args: Optional[Namespace] = None)¶
Returns Karton instance configured using configuration files and provided arguments
Used by
KartonServiceBase.main()method
- property log: Logger¶
Return Logger instance for Karton service
If you want to use it in code that is outside of the Consumer class, use
logging.getLogger():import logging logging.getLogger("<identity>")
- Returns
Logging.Logger()instance
- property log_handler: Handler¶
Return KartonLogHandler bound to this Karton service.
Can be used to setup logging on your own by adding this handler to the chosen loggers.
- send_task(task: Task) bool[source]¶
Sends a task to the unrouted task queue. Takes care of logging. Given task will be child of task we are currently handling (if such exists).
- Parameters
task – Task object to be sent
- Returns
Bool indicating if the task was delivered
- setup_logger(level: Optional[Union[str, int]] = None) None¶
Setup logger for Karton service (StreamHandler and karton.logs handler)
Called by
Consumer.loop(). If you want to use logger for Producer, you need to call it yourself, but remember to set the identity.- Parameters
level – Logging level. Default is logging.INFO (unless different value is set in Karton config)
- version: Optional[str] = None¶
Karton service version
- with_service_info: bool = False¶
Include extended service information for non-consumer services
- class karton.core.Consumer(config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None)[source]¶
Base consumer class, this is the part of Karton responsible for processing incoming tasks
- Parameters
config – Karton config to use for service configuration
identity – Karton service identity
backend – Karton backend to use
task_timeout – The maximum time, in seconds, this consumer will wait for a task to finish processing before being CRASHED on timeout. Set 0 for unlimited, and None for using global value
- add_post_hook(callback: Callable[[Task, Optional[BaseException]], None], name: Optional[str] = None) None[source]¶
Add a function to be called after processing each task.
- Parameters
callback – Function of the form
callback(task, exception)wheretaskis akarton.Taskandexceptionis an exception thrown by thekarton.Consumer.process()function orNone.name – Name of the post-hook
- add_pre_hook(callback: Callable[[Task], None], name: Optional[str] = None) None[source]¶
Add a function to be called before processing each task.
- Parameters
callback – Function of the form
callback(task)wheretaskis akarton.Taskname – Name of the pre-hook
- classmethod args_description() str¶
Return short description for argument parser.
- classmethod args_parser() ArgumentParser[source]¶
Return ArgumentParser for main() class method.
This method should be overridden and call super methods if you want to add more arguments.
- classmethod config_from_args(config: Config, args: Namespace) None[source]¶
Updates configuration with settings from arguments
This method should be overridden and call super methods if you want to add more arguments.
- identity: str = ''¶
Karton service identity
- classmethod karton_from_args(args: Optional[Namespace] = None)¶
Returns Karton instance configured using configuration files and provided arguments
Used by
KartonServiceBase.main()method
- property log: Logger¶
Return Logger instance for Karton service
If you want to use it in code that is outside of the Consumer class, use
logging.getLogger():import logging logging.getLogger("<identity>")
- Returns
Logging.Logger()instance
- property log_handler: Handler¶
Return KartonLogHandler bound to this Karton service.
Can be used to setup logging on your own by adding this handler to the chosen loggers.
- main() None¶
Main method invoked from CLI.
- abstract process(task: Task) None[source]¶
Task processing method.
- Parameters
task – The incoming task object
self.current_task contains task that triggered invocation of
karton.Consumer.process()but you should only focus on the passed task object and shouldn’t interact with the field directly.
- setup_logger(level: Optional[Union[str, int]] = None) None¶
Setup logger for Karton service (StreamHandler and karton.logs handler)
Called by
Consumer.loop(). If you want to use logger for Producer, you need to call it yourself, but remember to set the identity.- Parameters
level – Logging level. Default is logging.INFO (unless different value is set in Karton config)
- version: Optional[str] = None¶
Karton service version
- with_service_info: bool = False¶
Include extended service information for non-consumer services
karton.core.LogConsumer¶
- class karton.core.LogConsumer(config: Optional[Config] = None, identity: Optional[str] = None, backend: Optional[KartonBackend] = None)[source]¶
Base class for log consumer subsystems.
You can consume logs from specific logger by setting a
logger_filter()class attribute.You can also select logs of specific level via
level()class attribute.- Parameters
config – Karton config to use for service configuration
identity – Karton service identity
backend – Karton backend to use
- abstract process_log(event: Dict[str, Any]) None[source]¶
The core log handler that should be overwritten in implemented log handlers
- Parameters
event – Dictionary containing the log event data
- with_service_info: bool = True¶
Include extended service information for non-consumer services
karton.core.Resource¶
- karton.core.resource.Resource¶
alias of
LocalResource
- class karton.core.resource.LocalResource(name: str, content: Optional[Union[str, bytes]] = None, path: Optional[str] = None, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None, sha256: Optional[str] = None, fd: Optional[IO[bytes]] = None, _flags: Optional[List[str]] = None, _close_fd: bool = False)[source]¶
Represents local resource with arbitrary binary data e.g. file contents.
Local resources will be uploaded to object hub (S3) during task dispatching.
# Creating resource from bytes sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...") # Creating resource from path sample = Resource("original_name.exe", path="sample/original_name.exe")
- Parameters
name – Name of the resource (e.g. name of file)
content – Resource content
path – Path of file with resource content
bucket – Alternative S3 bucket for resource
metadata – Resource metadata
uid – Alternative S3 resource id
sha256 – Resource sha256 hash
fd – Seekable file descriptor
_flags – Resource flags
_close_fd – Close file descriptor after upload (default: False)
- property content: bytes¶
Resource content. Reads the file if the file was not read before.
- Returns
Content bytes
- classmethod from_directory(name: str, directory_path: str, compression: int = 8, in_memory: bool = False, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None) LocalResourceBase¶
Resource extension, allowing to pass whole directory as a zipped resource.
Reads all files contained in directory_path recursively and packs them into zip file.
# Creating zipped resource from path dumps = LocalResource.from_directory("dumps", directory_path="dumps/")
- Parameters
name – Name of the resource (e.g. name of file)
directory_path – Path of the resource directory
compression – Compression level (default is zipfile.ZIP_DEFLATED)
in_memory – Don’t create temporary file and make in-memory zip file (default: False)
bucket – Alternative S3 bucket for resource
metadata – Resource metadata
uid – Alternative S3 resource id
- Returns
LocalResourceinstance with zipped contents
- property sha256: Optional[str]¶
Resource sha256
- Returns
Hexencoded resource SHA256 hash
- property size: int¶
Resource size
- Returns
Resource size
- property uid: str¶
Resource identifier (UUID)
- Returns
Resource identifier
- class karton.core.resource.RemoteResource(name: str, bucket: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, uid: Optional[str] = None, size: Optional[int] = None, backend: Optional[KartonBackend] = None, sha256: Optional[str] = None, _flags: Optional[List[str]] = None)[source]¶
Keeps reference to remote resource object shared between subsystems via object storage (S3)
Should never be instantiated directly by subsystem, but can be directly passed to outgoing payload.
- Parameters
name – Name of the resource (e.g. name of file)
bucket – Alternative S3 bucket for resource
metadata – Resource metadata
uid – Alternative S3 resource id
size – Resource size
backend –
KartonBackend()to bind to this resourcesha256 – Resource sha256 hash
_flags – Resource flags
- property content: bytes¶
Resource content. Performs download when resource was not loaded before.
- Returns
Content bytes
- download() bytes[source]¶
Downloads remote resource content from object hub into memory.
sample = self.current_task.get_resource("sample") # Ensure that resource will be downloaded before it will be # passed to processing method sample.download() self.process_sample(sample)
- Returns
Downloaded content bytes
- download_temporary_file(suffix=None) Iterator[IO[bytes]][source]¶
Downloads remote resource into named temporary file.
sample = self.current_task.get_resource("sample") with sample.download_temporary_file() as f: contents = f.read() path = f.name # Temporary file is deleted after exitting the "with" scope
- Returns
ContextManager with the temporary file
- download_to_file(path: str) None[source]¶
Downloads remote resource into file.
sample = self.current_task.get_resource("sample") sample.download_to_file("sample/sample.exe") with open("sample/sample.exe", "rb") as f: contents = f.read()
- Parameters
path – Path to download the resource into
- extract_temporary() Iterator[str][source]¶
If resource contains a Zip file, extracts files contained in Zip to the temporary directory.
Returns path of directory with extracted files. Directory is recursively deleted after leaving the context.
dumps = self.current_task.get_resource("dumps") with dumps.extract_temporary() as dumps_path: print("Fetched dumps:", os.listdir(dumps_path))
By default: method downloads zip into temporary file, which is deleted after extraction. If you want to load zip into memory, call
RemoteResource.download()first.- Returns
ContextManager with the temporary directory
- extract_to_directory(path: str) None[source]¶
If resource contains a Zip file, extracts files contained in Zip into provided path.
By default: method downloads zip into temporary file, which is deleted after extraction. If you want to load zip into memory, call
RemoteResource.download()first.- Parameters
path – Directory path where the resource should be unpacked
- loaded() bool[source]¶
Checks whether resource is loaded into memory
- Returns
Flag indicating if the resource is loaded or not
- property sha256: Optional[str]¶
Resource sha256
- Returns
Hexencoded resource SHA256 hash
- property size: int¶
Resource size
- Returns
Resource size
- property uid: str¶
Resource identifier (UUID)
- Returns
Resource identifier
- zip_file() Iterator[ZipFile][source]¶
If resource contains a Zip file, downloads it to the temporary file and wraps it with ZipFile object.
dumps = self.current_task.get_resource("dumps") with dumps.zip_file() as zipf: print("Fetched dumps: ", zipf.namelist())
By default: method downloads zip into temporary file, which is deleted after leaving the context. If you want to load zip into memory, call
RemoteResource.download()first.If you want to pre-download Zip under specified path and open it using zipfile module, you need to do this manually:
dumps = self.current_task.get_resource("dumps") # Download zip file zip_path = "./dumps.zip" dumps.download_to_file(zip_path) zipf = zipfile.Zipfile(zip_path)
- Returns
ContextManager with zipfile
karton.core.Task¶
- class karton.core.task.Task(headers: Dict[str, Any], payload: Optional[Dict[str, Any]] = None, headers_persistent: Optional[Dict[str, Any]] = None, payload_persistent: Optional[Dict[str, Any]] = None, priority: Optional[TaskPriority] = None, parent_uid: Optional[str] = None, root_uid: Optional[str] = None, orig_uid: Optional[str] = None, uid: Optional[str] = None, error: Optional[List[str]] = None, _status: Optional[TaskState] = None, _last_update: Optional[float] = None)[source]¶
Task representation with headers and resources.
- Parameters
headers – Routing information for other systems, this is what allows for evaluation of given system usefulness for given task. Systems filter by these.
payload – Any instance of
dict- contains resources and additional informationsheaders_persistent – Persistent headers for whole task subtree, propagated from initial task.
payload_persistent – Persistent payload set for whole task subtree, propagated from initial task
priority – Priority of whole task subtree, propagated from initial task like payload_persistent
parent_uid – Id of a routed task that has created this task by a karton with
send_task()root_uid – Id of an unrouted task that is the root of this task’s analysis tree
orig_uid – Id of an unrouted (or crashed routed) task that was forked to create this task
uid – This tasks unique identifier
error – Traceback of a exception that happened while performing this task
- add_payload(name: str, content: Any, persistent: bool = False) None[source]¶
Add payload to task
- Parameters
name – Name of the payload
content – Payload to be added
persistent – Flag if the payload should be persistent
- add_resource(name: str, resource: ResourceBase, persistent: bool = False) None[source]¶
Add resource to task.
Alias for
add_payload()Deprecated since version 3.0.0: Use
add_payload()instead.- Parameters
name – Name of the resource
resource – Resource to be added
persistent – Flag if the resource should be persistent
- derive_task(headers: Dict[str, Any]) Task[source]¶
Creates copy of task with different headers, useful for proxying resource with added metadata.
class MZClassifier(Karton): identity = "karton.mz-classifier" filters = { "type": "sample", "kind": "raw" } def process(self, task: Task) -> None: sample = task.get_resource("sample") if sample.content.startswith(b"MZ"): self.log.info("MZ detected!") task = task.derive_task({ "type": "sample", "kind": "exe" }) self.send_task(task) self.log.info("Not a MZ :<")
Changed in version 3.0.0: Moved from static method to regular method:
Task.derive_task(headers, task)must be ported totask.derive_task(headers)- Parameters
headers – New headers for the task
- Returns
Copy of task with new headers
- static fquid_to_uid(fquid: str) str[source]¶
Gets task uid from fully-qualified fquid ({root_uid}:task_uid)
- Returns
Task uid
- get_payload(name: str, default: Optional[Any] = None) Any[source]¶
Get payload from task
- Parameters
name – name of the payload
default – Value to be returned if payload is not present
- Returns
Payload content
- get_resource(name: str) ResourceBase[source]¶
Get resource from task.
Ensures that payload contains an Resource object. If not - raises
TypeError- Parameters
name – Name of the resource to get
- Returns
karton.ResourceBase- resource with given name
- has_payload(name: str) bool[source]¶
Checks whether payload exists
- Parameters
name – Name of the payload to be checked
- Returns
If tasks payload contains a value with given name
- is_header_persistent(name: str) bool[source]¶
Checks whether header exists and is persistent
- Parameters
name – Name of the header to be checked
- Returns
If tasks header with given name is persistent
- is_payload_persistent(name: str) bool[source]¶
Checks whether payload exists and is persistent
- Parameters
name – Name of the payload to be checked
- Returns
If tasks payload with given name is persistent
- iterate_resources() Iterator[ResourceBase][source]¶
Get list of resource objects bound to Task
- Returns
An iterator over all task resources
- matches_filters(filters: List[Dict[str, Any]]) bool[source]¶
Check if a task matches the given filters
- remove_payload(name: str) None[source]¶
Removes payload for the task
If payload doesn’t exist or is persistent - raises KeyError
- Parameters
name – Payload name to be removed
karton.core.Config¶
- class karton.core.config.Config(path: Optional[str] = None, check_sections: Optional[bool] = True)[source]¶
Simple config loader.
Loads configuration from paths specified below (in provided order):
/etc/karton/karton.ini(global)~/.config/karton/karton.ini(user local)./karton.ini(subsystem local)path from
KARTON_CONFIG_FILEenvironment variable<path>optional, additional path provided in arguments
It is also possible to pass configuration via environment variables. Any variable named KARTON_FOO_BAR is equivalent to setting ‘bar’ variable in section ‘foo’ (note the lowercase names).
Environment variables have higher precedence than those loaded from files.
- Parameters
path – Path to additional configuration file
check_sections – Check if sections
redisands3are defined in the configuration
- append_to_list(section_name: str, option_name: str, value: Any) None[source]¶
Appends value to a list in configuration
- get(section_name: str, option_name: str, fallback: Optional[Any] = None) Any[source]¶
Gets value from configuration or returns
fallback(None by default) if value was not set.
- getboolean(section_name: str, option_name: str, fallback: bool) bool[source]¶
- getboolean(section_name: str, option_name: str) Optional[bool]
Gets value from configuration or returns
fallback(None by default) if value was not set. Value is coerced to bool type.
- getint(section_name: str, option_name: str, fallback: int) int[source]¶
- getint(section_name: str, option_name: str) Optional[int]
- getint(section_name: str, option_name: str, fallback: Optional[int]) Optional[int]
Gets value from configuration or returns
fallback(None by default) if value was not set. Value is coerced to int type.
- load_from_dict(data: Dict[str, Dict[str, Any]]) None[source]¶
Updates configuration values from dictionary compatible with
ConfigParser.read_dict. Accepts value in native type, so you don’t need to convert them to string.None values are treated like missing value and are not added.
{ "section-name": { "option-name": "value" } }