Headers, payloads and resources¶
Task consists of two elements: headers and payload.
Task headers¶
Headers specify the purpose of a task and determine how task will be routed by karton-system. They’re defined by flat collection of keys and values.
Example:
task = Task(
headers = {
"type": "sample",
"kind": "runnable",
"platform": "win32",
"extension": "dll"
}
)
Consumers listen for specific set of headers, which is defined by filters.
class GenericUnpacker(Karton):
"""
Performs sample unpacking
"""
identity = "karton.generic-unpacker"
filters = [
{
"type": "sample",
"kind": "runnable"
},
{
"type": "sample",
"kind": "script",
"platform": "win32"
}
]
def process(self, task: Task) -> None:
# Get incoming task headers
headers = task.headers
self.log.info("Got %s sample from %s", headers["kind"], headers["origin"])
If Karton-System finds that a task matches any of subsets defined by consumer queue filters then the task will be routed to that queue.
Following the convention proposed in examples above, it means that GenericUnpacker
will get all tasks contain samples directly runnable in sandboxes (regardless of target platform) or Windows 32-bit only scripts.
Headers can be used to process our input differently, depending on the kind of sample:
class GenericUnpacker(Karton):
...
def process(self, task: Task) -> None:
# Get incoming task headers
headers = task.headers
if headers["kind"] == "runnable":
self.process_runnable()
elif headers["kind"] == "script":
self.process_script()
Few headers have special meaning and are added automatically by Karton to incoming/outgoing tasks.
{"origin": "<identity>"}
specifies the identity of task sender. It can be used for listening for tasks incoming only from predefined identity.{"receiver": "<identity>"}
is added by Karton when task is routed to the consumer queue. On the receiver side, value is always equal toself.identity
Filter patterns¶
New in version 5.0.0.
Filter matching follows two simple rules. If we want task to be routed to the consumer:
task headers must match any of consumer filters
task headers match consumer filter if they match all values defined in filter
Starting from 5.0.0, consumer filters support basic wildcards and exclusions.
Pattern |
Meaning |
|
matches ‘bar’ value of ‘foo’ header |
|
matches any value other than ‘bar’ in ‘foo’ header |
|
matches ‘ba’ value followed by any character |
|
matches ‘ba’ value followed by any substring (including empty) |
|
matches ‘ba’ value followed by ‘r’ or ‘z’ character |
|
matches ‘ba’ value followed by any character other than ‘r’ or ‘z’ |
|
matches any value of ‘foo’ header that doesn’t match to the “bar[!rz]” pattern |
Filter logic can be used to fulfill specific use-cases:
|
Meaning |
|
matches no tasks (no headers allowed). Can be used to turn off queue and consume tasks left. |
|
matches any task (no header conditions). Can be used to intercept all tasks incoming to Karton. |
|
‘foo’ header is required and must have ‘bar’ or ‘baz’ value. |
|
‘foo’ header must be not defined. |
Warning
It’s recommended to use only strings in filter and header values
Although some of non-string types are allowed, they will be converted to string for comparison which may lead to unexpected results.
Task payload¶
Payload is also a dictionary, but it’s not required to be a flat structure like headers are. Its contents do not affect the routing so task semantics must be defined by headers.
task = Task(
headers = ...,
payload = {
"entrypoints": [
"_ExampleFunction@12"
],
"matched_rules": {
...
},
"sample": Resource("original_name.dll", path="uploads/original_name.dll")
}
)
Payload can be accessed by Consumer using Task.get_payload()
method.
class KartonService(Karton):
...
def process(self, task: Task) -> None:
entrypoints = task.get_payload("entrypoints", default=[])
But payload dictionary itself still must be lightweight and JSON-encodable, because it’s stored in Redis along with the whole task definition.
If task operates on binary blob or complex structure, which is probably the most common use-case, payload can still be used to store the reference to that object. The only requirement is that object must be placed in separate, shared storage, available for both Producer and Consumer. That’s exactly how Resource
objects work.
Resource objects¶
Resources are part of a payload that represent a reference to the file or other binary large object. All objects of that kind are stored in S3-compatible storage, which is used as shared object storage between Karton subsystems.
task = Task(
headers = ...,
payload = {
"sample": Resource("original_name.dll", path="uploads/original_name.dll")
}
)
Resource objects created by producer (LocalResource
) are uploaded to S3 and transformed to RemoteResource
objects.
RemoteResource is lazy object that allows to download the object contents via RemoteResource.content
property.
class GenericUnpacker(Karton):
...
def unpack(self, packed_content: bytes) -> bytes:
...
def process(self, task: Task) -> None:
# Get sample resource
sample = task.get_resource("sample")
# Do the job
unpacked = self.unpack(sample.content)
# Publish the results
task = Task(
headers={
"type": "sample",
"kind": "unpacked"
},
payload={
"sample": Resource("unpacked", content=unpacked)
}
)
self.send_task(task)
If expected resource is too big for in-memory processing or we want to launch external tools that need the file system path, resource contents can be downloaded using RemoteResource.download_to_file()
or RemoteResource.download_temporary_file()
.
class KartonService(Karton):
...
def process(self, task: Task) -> None:
archive = task.get_resource("archive")
with archive.download_temporary_file() as f:
# f is file-like named object
archive_path = f.name
If you want to pass original sample along with new task, you can just put a reference back into its payload.
task = Task(
headers={
"type": "sample",
"kind": "unpacked"
},
payload={
"sample": Resource("unpacked", content=unpacked),
"parent": sample # Reference to original (packed) sample
}
)
self.send_task(task)
Each resource has its own metadata store where we can provide additional information about file e.g. SHA-256 checksum
sample = Resource("sample.exe",
content=sample_content,
metadata={
"sha256": hashlib.sha256(sample_content).hexdigest()
})
Starting from v5.0.0, resources can be nested in other objects like lists or dictionaries.
task = Task(
headers={
"type": "analysis",
"kind": "artifacts"
},
payload={
"artifacts": [
Resource("file1", content=file1),
Resource("file2", content=file2),
Resource("file3", content=file3)
]
"parent": sample # Reference to original (packed) sample
}
)
self.send_task(task)
More information about resources can be found in API documentation.
Directory resource objects¶
Resource objects work well for single files, but sometimes we need to deal with bunch of artifacts e.g. process memory dumps from dynamic analysis. Very common way to do that is to pack them into Zip archive using Python zipfile module facilities.
Karton library includes a helper method for that kind of archives, called LocalResource.from_directory()
.
task = Task(
headers={
"type": "analysis"
},
payload={
"dumps": LocalResource.from_directory(analysis_id,
directory_path=f"analyses/{analysis_id}/dumps"),
}
)
self.send_task(task)
Files contained in directory_path
are stored under relative paths to the provided directory path. Default compression level is zipfile.ZIP_DEFLATED
instead of zipfile.ZIP_STORED
.
Directory resources are deserialized to the usual RemoteResource
objects but in contrary to the usual resources they can for example be extracted to directories using RemoteResource.extract_temporary()
class KartonService(Karton):
...
def process(self, task: Task) -> None:
dumps = task.get_resource("dumps")
with dumps.extract_temporary() as dumps_path:
...
If we don’t want to extract all files, we can work directly with zipfile.ZipFile
object, which will be internally downloaded from S3 to the temporary file using RemoteResource.download_temporary_file()
method.
class KartonService(Karton):
...
def process(self, task: Task) -> None:
dumps = task.get_resource("dumps")
with dumps.zip_file() as zipf:
with zipf.open("sample_info.txt") as info:
...
More information about resources can be found in API documentation.
Persistent payload¶
Part of payload that is propagated to the whole task subtree. The common use-case is to keep information related not with single artifact but the whole analysis, so they’re available everywhere even if not explicitly passed by the Karton Service.
task = Task(
headers=...,
payload=...,
payload_persistent={
"uploader": "psrok1"
}
)
Incoming persistent payload (task received by Karton Service) is merged by Karton library with the outgoing tasks (result tasks sent by Karton Service). Karton service can’t overwrite or delete the incoming payload keys.
class KartonService(Karton):
...
def process(self, task: Task) -> None:
uploader = task.get_payload("uploader")
assert task.is_payload_persistent("uploader")
task = Task(
headers=...,
payload=...
)
# Outgoing task also contains "uploader" key
self.send_task(task)
Regular payloads and persistent payload keys have common namespace so persistent payload can’t be overwritten by regular payload as well e.g.
task = Task(
headers=...,
payload={
"common_key": "<this will be ignored>"
},
payload_persistent={
"common_key": "<and this value will be used>"
}
)
Warning
Because merging strategy is quite aggressive, it’s not recommended to overuse that feature. They should be treated as “analysis-wide payload”. It’s recommended to set them only in initial task.
Don’t store any references to resources or other heavy objects here, unless you need to. Persistent payload is, as the name says, persistent, so it is propagated to the whole task subtree and can’t be removed during analysis. Resource referenced by persistent payload won’t be garbage-collected until the whole analysis (task subtree) ends, even if it’s not needed by further analysis steps.
Persistent headers¶
New in version 5.2.0.
Headers that are propagated to the whole task subtree, so consumers don’t need to remember about passing these values to child tasks.
Using persistent headers you can mark properties that are crucial for routing and should be kept for analysis artifacts as well:
Analysis volatility if we don’t want to report and persist artifacts from analysis, so tasks are not routed to reporter services
Analysis confidentiality if we shouldn’t pass artifacts to 3rd party services and they should be considered internal
Marking analysis as test cases, so we can pass only testing analyses to testing consumers
Semantics are similar to persistent payload:
task = Task(
headers=...,
payload=...,
headers_persistent={
"uploader": "psrok1"
}
)
headers_persistent
passed to Task are merged with self.headers
with keys marked internally as persistent.
Headers precedence is as follows:
headers_persistent
from parent task (most important)headers_persistent
from current taskheaders
from current task (least important)
Following these rules: persistent headers propagate to the whole subtree and always override other headers with the same key.