import contextlib
import hashlib
import os
import shutil
import tempfile
import uuid
import zipfile
from io import BytesIO
from typing import IO, TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union, cast
if TYPE_CHECKING:
from .backend import KartonBackend
class ResourceBase(object):
"""
Base resource class, contains the basic logic of both remote and
local resources. If you're not implementing your own resource metatypes
you'll probably want to look at `:py:meth:`LocalResource or
`:py:meth:` RemoteResource instead.
:param name: Name of the resource (e.g. name of file)
:param content: Resource content
:param path: Path of file with resource content
:param bucket: Alternative S3 bucket for resource
:param metadata: Resource metadata
:param sha256: Resource sha256 hash
:param _uid: Alternative S3 resource id
:param _flags: Resource flags
"""
DIRECTORY_FLAG = "Directory"
def __init__(
self,
name: str,
content: Optional[Union[str, bytes]] = None,
path: Optional[str] = None,
bucket: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
sha256: Optional[str] = None,
_uid: Optional[str] = None,
_size: Optional[int] = None,
_flags: Optional[List[str]] = None,
) -> None:
self.name = name
self.bucket = bucket
self.metadata = metadata or {}
# the sha256 identifier can be passed as an argument or inside the metadata
sha256 = sha256 or self.metadata.get("sha256")
calculate_hash = sha256 is None
self._content: Optional[bytes] = None
if content and path:
raise ValueError("Can't set both path and content for resource")
if path:
if not os.path.isfile(path):
raise IOError(
"Path {path} doesn't exist or is not a file".format(path=path)
)
if calculate_hash:
sha256_hash = hashlib.sha256()
with open(path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
sha256 = sha256_hash.hexdigest()
elif content:
if isinstance(content, str):
self._content = content.encode()
elif isinstance(content, bytes):
self._content = content
else:
raise TypeError("Content can be bytes or str only")
if calculate_hash and self._content:
sha256 = hashlib.sha256(self._content).hexdigest()
# Empty Resource is possible here (e.g. RemoteResource)
self.metadata["sha256"] = sha256
self._uid = _uid or str(uuid.uuid4())
self._path = path
self._size = _size
# Flags needed by 3.x.x Karton services
self._flags = _flags or []
@property
def uid(self) -> str:
"""
Resource identifier (UUID)
:return: Resource identifier
"""
return self._uid
@property
def content(self) -> bytes:
"""
Resource content. Must be overriden in a derived class.
:return: Resource contents
"""
raise NotImplementedError()
@property
def size(self) -> int:
"""
Resource size
:return: Resource size
"""
if self._size is None:
if self._path:
self._size = os.path.getsize(self._path)
elif self._content:
self._size = len(self._content)
return cast(int, self._size)
@property
def sha256(self) -> Optional[str]:
"""
Resource sha256
:return: Hexencoded resource SHA256 hash
"""
return self.metadata.get("sha256")
def to_dict(self) -> Dict[str, Any]:
# Internal serialization method
return {
"uid": self.uid,
"name": self.name,
"bucket": self.bucket,
"size": self.size,
"metadata": self.metadata,
"flags": self._flags,
"sha256": self.sha256,
}
[docs]class LocalResource(ResourceBase):
"""
Represents local resource with arbitrary binary data e.g. file contents.
Local resources will be uploaded to object hub (S3) during
task dispatching.
.. code-block:: python
# Creating resource from bytes
sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\\
PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...")
# Creating resource from path
sample = Resource("original_name.exe", path="sample/original_name.exe")
:param name: Name of the resource (e.g. name of file)
:param content: Resource content
:param path: Path of file with resource content
:param bucket: Alternative S3 bucket for resource
:param metadata: Resource metadata
:param uid: Alternative S3 resource id
:param sha256: Resource sha256 hash
:param fd: Seekable file descriptor
:param _flags: Resource flags
:param _close_fd: Close file descriptor after upload (default: False)
"""
def __init__(
self,
name: str,
content: Optional[Union[str, bytes]] = None,
path: Optional[str] = None,
bucket: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
uid: Optional[str] = None,
sha256: Optional[str] = None,
fd: Optional[IO[bytes]] = None,
_flags: Optional[List[str]] = None,
_close_fd: bool = False,
) -> None:
if len(list(filter(None, [path, content, fd]))) != 1:
raise ValueError("You must exclusively provide a path, content or fd")
super(LocalResource, self).__init__(
name,
content=content,
path=path,
bucket=bucket,
metadata=metadata,
sha256=sha256,
_uid=uid,
_flags=_flags,
)
self.fd = fd
self._close_fd = _close_fd
@property
def content(self) -> bytes:
"""
Resource content. Reads the file if the file was not read before.
:return: Content bytes
"""
if self._content is None:
if self._path is not None:
with open(self._path, "rb") as local_file:
self._content = local_file.read()
elif self.fd is not None:
self._content = self.fd.read()
return cast(bytes, self._content)
@property
def size(self) -> int:
"""
Resource size
:return: Resource size
"""
if self._size is None and self.fd is not None:
current_pos = self.fd.tell()
self.fd.seek(0, os.SEEK_END)
self._size = self.fd.tell()
self.fd.seek(current_pos, os.SEEK_SET)
return super().size
[docs] @classmethod
def from_directory(
cls,
name: str,
directory_path: str,
compression: int = zipfile.ZIP_DEFLATED,
in_memory: bool = False,
bucket: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
uid: Optional[str] = None,
) -> "LocalResource":
"""
Resource extension, allowing to pass whole directory as a zipped resource.
Reads all files contained in directory_path recursively and packs them
into zip file.
.. code-block:: python
# Creating zipped resource from path
dumps = LocalResource.from_directory("dumps", directory_path="dumps/")
:param name: Name of the resource (e.g. name of file)
:param directory_path: Path of the resource directory
:param compression: Compression level (default is zipfile.ZIP_DEFLATED)
:param in_memory: Don't create temporary file and make in-memory zip file \
(default: False)
:param bucket: Alternative S3 bucket for resource
:param metadata: Resource metadata
:param uid: Alternative S3 resource id
:return: :class:`LocalResource` instance with zipped contents
"""
out_stream: IO[bytes] = BytesIO() if in_memory else tempfile.TemporaryFile()
# Recursively zips all files in directory_path keeping relative paths
# File is zipped into provided out_stream
with zipfile.ZipFile(out_stream, "w", compression=compression) as zipf:
for root, dirs, files in os.walk(directory_path):
for filename in files:
abs_path = os.path.join(root, filename)
zipf.write(abs_path, os.path.relpath(abs_path, directory_path))
# Ensure out_stream is not closed and seeked to the first byte
assert not out_stream.closed
out_stream.seek(0, os.SEEK_SET)
# Flag is required by Karton 3.x.x services to recognize that resource
# as DirectoryResource
flags = [ResourceBase.DIRECTORY_FLAG]
if in_memory:
return cls(
name,
content=cast(BytesIO, out_stream).getvalue(),
bucket=bucket,
metadata=metadata,
uid=uid,
_flags=flags,
)
else:
return cls(
name,
fd=out_stream,
bucket=bucket,
metadata=metadata,
uid=uid,
_flags=flags,
_close_fd=True,
)
def _upload(self, backend: "KartonBackend") -> None:
"""Internal function for uploading resources
:param backend: KartonBackend to use while uploading the resource
:meta private:
"""
# Note: never transform resource into Remote
# Multiple task dispatching with same local, in that case resource
# can be deleted between tasks.
if self.bucket is None:
raise RuntimeError(
"Resource object can't be uploaded because its bucket is not set"
)
if self._content:
# Upload contents
backend.upload_object(self.bucket, self.uid, self._content)
elif self.fd:
if self.fd.tell() != 0:
raise RuntimeError(
f"Resource object can't be uploaded: "
f"file descriptor must point at first byte "
f"(fd.tell = {self.fd.tell()})"
)
# Upload contents from fd
backend.upload_object(self.bucket, self.uid, self.fd)
# If file descriptor is managed by Resource, close it after upload
if self._close_fd:
self.fd.close()
elif self._path:
# Upload file provided by path
backend.upload_object_from_file(self.bucket, self.uid, self._path)
def upload(self, backend: "KartonBackend") -> None:
"""Internal function for uploading resources
:param backend: KartonBackend to use while uploading the resource
:meta private:
"""
if not self._content and not self._path and not self.fd:
raise RuntimeError("Can't upload resource without content")
self._upload(backend)
Resource = LocalResource
[docs]class RemoteResource(ResourceBase):
"""
Keeps reference to remote resource object shared between subsystems
via object storage (S3)
Should never be instantiated directly by subsystem, but can be directly passed to
outgoing payload.
:param name: Name of the resource (e.g. name of file)
:param bucket: Alternative S3 bucket for resource
:param metadata: Resource metadata
:param uid: Alternative S3 resource id
:param size: Resource size
:param backend: :py:meth:`KartonBackend` to bind to this resource
:param sha256: Resource sha256 hash
:param _flags: Resource flags
"""
def __init__(
self,
name: str,
bucket: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
uid: Optional[str] = None,
size: Optional[int] = None,
backend: Optional["KartonBackend"] = None,
sha256: Optional[str] = None,
_flags: Optional[List[str]] = None,
) -> None:
super(RemoteResource, self).__init__(
name,
bucket=bucket,
metadata=metadata,
sha256=sha256,
_uid=uid,
_size=size,
_flags=_flags,
)
self.backend = backend
[docs] def loaded(self) -> bool:
"""
Checks whether resource is loaded into memory
:return: Flag indicating if the resource is loaded or not
"""
return self._content is not None
@classmethod
def from_dict(
cls, dict: Dict[str, Any], backend: Optional["KartonBackend"]
) -> "RemoteResource":
"""
Internal deserialization method for remote resources
:param dict: Serialized information about resource
:param backend: KartonBackend object
:return: Deserialized :py:meth:`RemoteResource` object
:meta private:
"""
# Backwards compatibility
metadata = dict.get("metadata", {})
if "sha256" in dict:
metadata["sha256"] = dict["sha256"]
return cls(
name=dict["name"],
metadata=metadata,
bucket=dict["bucket"],
uid=dict["uid"],
size=dict.get("size"), # Backwards compatibility (2.x.x)
backend=backend,
_flags=dict.get("flags"), # Backwards compatibility (3.x.x)
)
@property
def content(self) -> bytes:
"""
Resource content. Performs download when resource was not loaded before.
:return: Content bytes
"""
if self._content is None:
return self.download()
return self._content
[docs] def unload(self) -> None:
"""
Unloads resource object from memory
"""
self._content = None
def remove(self) -> None:
"""
Internal remote resource remove method
:meta private:
"""
if self.backend is None:
raise RuntimeError(
"Resource object can't be removed because it's not bound to the backend"
)
if self.bucket is None:
raise RuntimeError(
"Resource object can't be removed because its bucket is not set"
)
self.backend.remove_object(self.bucket, self.uid)
[docs] def download(self) -> bytes:
"""
Downloads remote resource content from object hub into memory.
.. code-block:: python
sample = self.current_task.get_resource("sample")
# Ensure that resource will be downloaded before it will be
# passed to processing method
sample.download()
self.process_sample(sample)
:return: Downloaded content bytes
"""
if self.backend is None:
raise RuntimeError(
(
"Resource object can't be downloaded because it's not bound to "
"the backend"
)
)
if self.bucket is None:
raise RuntimeError(
"Resource object can't be downloaded because its bucket is not set"
)
self._content = self.backend.download_object(self.bucket, self.uid)
return self._content
[docs] def download_to_file(self, path: str) -> None:
"""
Downloads remote resource into file.
.. code-block:: python
sample = self.current_task.get_resource("sample")
sample.download_to_file("sample/sample.exe")
with open("sample/sample.exe", "rb") as f:
contents = f.read()
:param path: Path to download the resource into
"""
if self.backend is None:
raise RuntimeError(
(
"Resource object can't be downloaded because it's not bound to "
"the backend"
)
)
if self.bucket is None:
raise RuntimeError(
"Resource object can't be downloaded because its bucket is not set"
)
self.backend.download_object_to_file(self.bucket, self.uid, path)
[docs] @contextlib.contextmanager
def download_temporary_file(self, suffix=None) -> Iterator[IO[bytes]]:
"""
Downloads remote resource into named temporary file.
.. code-block:: python
sample = self.current_task.get_resource("sample")
with sample.download_temporary_file() as f:
contents = f.read()
path = f.name
# Temporary file is deleted after exitting the "with" scope
:return: ContextManager with the temporary file
"""
# That tempfile-fu is necessary because minio.fget_object removes file
# under provided path and renames its own part-file with downloaded content
# under previously deleted path
# Weird move, but ok...
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.close()
try:
self.download_to_file(tmp.name)
with open(tmp.name, "rb") as f:
yield f
finally:
os.remove(tmp.name)
[docs] @contextlib.contextmanager
def zip_file(self) -> Iterator[zipfile.ZipFile]:
"""
If resource contains a Zip file, downloads it to the temporary file
and wraps it with ZipFile object.
.. code-block:: python
dumps = self.current_task.get_resource("dumps")
with dumps.zip_file() as zipf:
print("Fetched dumps: ", zipf.namelist())
By default: method downloads zip into temporary file, which is deleted after
leaving the context. If you want to load zip into memory,
call :py:meth:`RemoteResource.download` first.
If you want to pre-download Zip under specified path and open it using
zipfile module, you need to do this manually:
.. code-block:: python
dumps = self.current_task.get_resource("dumps")
# Download zip file
zip_path = "./dumps.zip"
dumps.download_to_file(zip_path)
zipf = zipfile.Zipfile(zip_path)
:return: ContextManager with zipfile
"""
if self._content:
yield zipfile.ZipFile(BytesIO(self._content))
else:
with self.download_temporary_file() as f:
yield zipfile.ZipFile(f)