"""OS File Management module."""
# Copyright 2020 WolkAbout Technology s.r.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import math
import os
import shutil
from inspect import signature
from tempfile import NamedTemporaryFile
from threading import Timer
from typing import Callable
from typing import Dict
from typing import IO
from typing import List
from typing import Optional
from typing import Union
from urllib.parse import urlparse
import requests
from wolk import logger_factory
from wolk.interface.file_management import FileManagement
from wolk.model.file_management_error_type import FileManagementErrorType
from wolk.model.file_management_status import FileManagementStatus
from wolk.model.file_management_status_type import FileManagementStatusType
from wolk.model.file_transfer_package import FileTransferPackage
[docs]class OSFileManagement(FileManagement):
"""
File transfer manager.
Enables device to transfer files from WolkAbout IoT Platform
package by package or/and URL download as well as report list of files
currently on device and delete them on request.
"""
def __init__(
self,
status_callback: Callable[[str, FileManagementStatus], None],
packet_request_callback: Callable[[str, int], None],
url_status_callback: Callable[
[str, FileManagementStatus, Optional[str]], None
],
) -> None:
"""
Enable file management for device.
:param status_callback: Reporting current file transfer status
:type status_callback: Callable[[FileManagementStatus], None]
:param packet_request_callback: Request file packet from Platform
:type packet_request_callback: Callable[[str, int], None]
:param url_status_callback: Report file URL download status
:type url_status_callback: Callable[[str, FileManagementStatus, Optional[str]], None]
"""
self.logger = logger_factory.logger_factory.get_logger(
str(self.__class__.__name__)
)
self.status_callback = status_callback
self.packet_request_callback = packet_request_callback
self.url_status_callback = url_status_callback
self.url_downloader = self.url_download
self.preferred_package_size = 0
self.file_directory = ""
self.current_status: Optional[FileManagementStatus] = None
self.max_retries = 3
self.next_package_index: Optional[int] = None
self.expected_number_of_packages: Optional[int] = None
self.retry_count = 0
self.request_timeout: Optional[Timer] = None
self.last_package_hash = 32 * b"\x00"
self.temp_file: Optional[IO[bytes]] = None
self.file_name: Optional[str] = None
self.file_size: Optional[int] = None
self.file_hash: Optional[str] = None
self.file_url: Optional[str] = None
[docs] def get_preffered_package_size(self) -> int:
"""
Return preffered package size for file transfer.
:returns: preferred_package_size
:rtype: int
"""
return self.preferred_package_size
[docs] def set_custom_url_downloader(
self, downloader: Callable[[str, str], bool]
) -> None:
"""
Set the URL file downloader to a custom implementation.
Default implementation uses `requests` and is available as a
static method within this class.
:param downloader: Function that will download the file from the URL
:type downloader: Callable[[[Arg(str, 'file_url'), Arg(str, 'file_path')], bool]
"""
self.logger.debug(f"Setting custom URL downloader to {downloader}")
if not callable(downloader):
self.logger.warning(f"{downloader} is not a callable!") # type: ignore
return
if len(signature(downloader).parameters) != 2:
self.logger.warning(f"{downloader} invalid signature!")
return
self.url_downloader = downloader # type: ignore
[docs] def supports_url_download(self) -> bool:
"""
Return if the file management module supports URL download.
:returns: supports_url_download
:rtype: bool
"""
return self.url_downloader is not None
[docs] def handle_upload_initiation(
self, file_name: str, file_size: int, file_hash: str
) -> None:
"""
Start making package requests and set status to file transfer.
:param file_name: File name
:type file_name: str
:param file_size: Size in bytes
:type file_size: int
:param file_hash: MD5 hash of file
:type file_hash: str
"""
if self.current_status is not None:
self.logger.warning(
"Not in idle state, ignoring file upload initiation"
)
return
self.logger.info("Starting file transfer")
self.logger.info(
f"File name: {file_name} ; "
f"File size: {file_size} ; "
f"File hash: {file_hash}"
)
self.expected_number_of_packages = (
math.ceil(file_size / (self.preferred_package_size * 1024))
if self.preferred_package_size != 0
else 1
)
if os.path.exists(
os.path.join(os.path.abspath(self.file_directory), file_name)
):
valid_file = False
try:
with open(
os.path.join(
os.path.abspath(self.file_directory), file_name
),
"rb",
) as existing_file:
data = existing_file.read()
if not data:
self.logger.error("Read data returned None!")
return
calculated_hash = hashlib.md5(data).hexdigest()
except Exception as e:
self.logger.exception(f"Error opening existing file: {e}")
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.FILE_SYSTEM_ERROR,
)
self.status_callback(file_name, self.current_status)
self.handle_file_upload_abort()
return
valid_file = file_hash == calculated_hash
if valid_file:
self.logger.info(
"File requested for transfer already on device"
)
self.current_status = FileManagementStatus(
FileManagementStatusType.FILE_READY
)
self.status_callback(file_name, self.current_status)
self.current_status = None
self.last_package_hash = 32 * b"\x00"
return
else:
self.logger.warning(
"File requested for transfer already on device, "
"but the hashes do not match!"
)
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.FILE_HASH_MISMATCH,
)
self.status_callback(file_name, self.current_status)
self.current_status = None
self.last_package_hash = 32 * b"\x00"
return
self.current_status = FileManagementStatus(
FileManagementStatusType.FILE_TRANSFER
)
self.temp_file = NamedTemporaryFile(mode="r+b", delete=False)
self.file_name = file_name
self.file_size = file_size
self.file_hash = file_hash
self.next_package_index = 0
self.retry_count = 0
self.logger.info(f"Initializing file transfer: '{file_name}'")
self.logger.info(
"File is expected to be received in "
+ f"{self.expected_number_of_packages}"
+ " packages"
)
self.status_callback(self.file_name, self.current_status)
self.packet_request_callback(self.file_name, self.next_package_index)
self.request_timeout = Timer(60.0, self._timeout)
self.request_timeout.start()
[docs] def handle_file_upload_abort(self) -> None:
"""Abort file upload and revert to idle status."""
self.logger.info("Aborting file upload.")
if self.temp_file:
self.temp_file.close()
self.temp_file = None
self.file_name = None
self.file_size = None
self.file_hash = None
self.current_status = None
self.next_package_index = None
self.expected_number_of_packages = None
self.retry_count = 0
self.request_timeout = None
self.last_package_hash = 32 * b"\x00"
[docs] def handle_file_binary_response(
self, package: FileTransferPackage
) -> None:
"""
Validate received package and store or use callback to request again.
:param package: Package of file being transfered.
:type package: FileTransferPackage
"""
self.logger.debug(
f"Previous hash: {package.previous_hash!r} ; "
f"Data size: {len(package.data)} ; "
f"Current hash: {package.current_hash!r}"
)
if self.current_status is None:
self.logger.warning(
"File transfer not in progress, ignoring package"
)
return
if self.request_timeout:
self.request_timeout.cancel()
valid_package = False
if (
len(package.previous_hash)
+ len(package.data)
+ len(package.current_hash)
< 65
):
self.logger.error("Received package size too small!")
else:
data_hash = hashlib.sha256(package.data).digest()
if package.current_hash != data_hash:
self.logger.error(
f"Data hash '{data_hash!r}' does not match "
f"expected hash '{package.current_hash!r}' !"
)
else:
self.logger.debug(
f"Valid package #{self.next_package_index} received"
)
valid_package = True
if (
not valid_package
or self.last_package_hash != package.previous_hash
):
self.logger.warning("Received invalid file package")
self.retry_count += 1
if self.retry_count >= self.max_retries:
self.logger.error(
"Retry count exceeded, aborting file transfer"
)
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.RETRY_COUNT_EXCEEDED,
)
self.status_callback(self.file_name, self.current_status)
self.handle_file_upload_abort()
return
self.logger.info(
f"Requesting package #{self.next_package_index} again"
)
self.packet_request_callback(
self.file_name, self.next_package_index
)
self.request_timeout = Timer(60.0, self._timeout)
self.request_timeout.start()
return
self.last_package_hash = package.current_hash
try:
self.temp_file.write(package.data)
self.temp_file.flush()
os.fsync(self.temp_file)
except Exception:
self.logger.error(
"Failed to write package, aborting file transfer"
)
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.FILE_SYSTEM_ERROR,
)
self.status_callback(self.file_name, self.current_status)
self.handle_file_upload_abort()
return
self.next_package_index += 1
if self.next_package_index < self.expected_number_of_packages:
self.logger.debug(
f"Stored package, requesting "
f"#{self.next_package_index}/"
f"#{self.expected_number_of_packages}"
)
self.packet_request_callback(
self.file_name,
self.next_package_index,
)
self.current_status = FileManagementStatus(
FileManagementStatusType.FILE_TRANSFER
)
self.request_timeout = Timer(60.0, self._timeout)
self.request_timeout.start()
return
valid_file = False
try:
self.temp_file.seek(0)
data = self.temp_file.read()
if not data:
self.logger.error("Read data from temp file returned None!")
calculated_hash = hashlib.md5(data).hexdigest()
except Exception as e:
self.logger.exception(f"Error while working with temp file: {e}")
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.FILE_SYSTEM_ERROR,
)
self.status_callback(self.file_name, self.current_status)
self.handle_file_upload_abort()
return
valid_file = self.file_hash == calculated_hash
# FIXME: remove after bugfix:
if not valid_file:
self.logger.warning(
"Hash mismatch!\n"
+ f"Received: {self.file_hash}\n"
+ f"Calculated: {calculated_hash}"
)
if len(self.file_hash) > 32:
self.logger.warning("Known bug, ignoring hash diff")
valid_file = True
if not valid_file:
self.logger.error("Invalid file - File hash does not match")
self.current_status.status = FileManagementStatusType.ERROR
self.current_status.error = (
FileManagementErrorType.FILE_HASH_MISMATCH
)
self.status_callback(self.file_name, self.current_status)
self.handle_file_upload_abort()
return
if not os.path.exists(os.path.abspath(self.file_directory)):
os.makedirs(os.path.abspath(self.file_directory))
file_path = os.path.join(
os.path.abspath(self.file_directory),
self.file_name,
)
try:
self.logger.info(
f"Finalizing file transfer for '{self.file_name}'"
)
shutil.copy2(os.path.realpath(self.temp_file.name), file_path)
self.temp_file.close()
except Exception as e:
self.logger.exception(f"Error when storing file: {e}")
if not os.path.exists(file_path):
self.logger.error(f"File failed to store to at: {file_path}")
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.FILE_SYSTEM_ERROR,
)
self.status_callback(self.file_name, self.current_status)
self.handle_file_upload_abort()
return
self.logger.info(f"Received file '{self.file_name}'")
self.current_status = FileManagementStatus(
FileManagementStatusType.FILE_READY
)
self.status_callback(self.file_name, self.current_status)
self.current_status = None
self.retry_count = 0
self.last_package_hash = 32 * b"\x00"
[docs] def handle_file_url_download_initiation(self, file_url: str) -> None:
"""
Start file transfer from specified URL.
:param file_url: URL from where to download file
:type file_url: str
"""
if self.current_status is not None:
self.logger.warning(
"Not in idle state, ignoring file upload initiation"
)
return
if self.url_downloader is None:
self.logger.warning(
"Received URL download, but no downloader is set!"
)
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.TRANSFER_PROTOCOL_DISABLED,
)
self.url_status_callback(file_url, self.current_status, None)
self.handle_file_upload_abort()
return
if not bool(urlparse(file_url).scheme):
self.logger.error(f"Received URL '{file_url}' is not valid!")
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.MALFORMED_URL,
)
self.url_status_callback(file_url, self.current_status, None)
self.handle_file_upload_abort()
return
self.file_url = file_url
self.file_name = self.file_url.split("/")[-1]
file_path = os.path.join(
os.path.abspath(self.file_directory), self.file_name
)
self.current_status = FileManagementStatus(
FileManagementStatusType.FILE_TRANSFER
)
self.url_status_callback(file_url, self.current_status, self.file_name)
self.url_downloader(file_url, file_path)
if not os.path.exists(file_path):
self.logger.error(f"File failed to store to at: {file_path}")
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.FILE_SYSTEM_ERROR,
)
self.url_status_callback(
file_url, self.current_status, self.file_name
)
self.handle_file_url_download_abort()
return
self.logger.info(f"File obtained from URL: '{file_url}'")
self.current_status = FileManagementStatus(
FileManagementStatusType.FILE_READY
)
self.url_status_callback(file_url, self.current_status, self.file_name)
self.current_status = None
[docs] def handle_file_url_download_abort(self) -> None:
"""Abort file URL download."""
self.logger.info("Aborting URL download")
self.file_url = None
self.file_name = None
self.current_status = None
[docs] def get_file_list(self) -> List[Dict[str, Union[str, int]]]:
"""
Return list of files present on device.
Each list item is a dictionary that contains the name of the file,
its size in bytes, and a MD5 checksum of the file.
:returns: file_list
:rtype: List[Dict[str, Union[str, int]]]
"""
files = os.listdir(os.path.abspath(self.file_directory))
files_list = []
for item in files:
file_path = os.path.join(
os.path.abspath(self.file_directory), item
)
if not os.path.isfile(file_path) or item.startswith("."):
continue
with open(file_path, "rb") as file:
data = file.read()
hash = hashlib.md5(data).hexdigest()
size = os.path.getsize(file_path)
files_list.append({"name": item, "size": size, "hash": hash})
self.logger.debug(f"Files on device: {files_list}")
return files_list # type: ignore
[docs] def get_file_path(self, file_name: str) -> Optional[str]:
"""
Return path to file if it exists.
:param file_name: File for which to get path
:type file_name: str
:returns: file_path
:rtype: Optional[str]
"""
self.logger.debug(f"Get file path for file: {file_name}")
file_path = None
for file in self.get_file_list():
if file["name"] == file_name:
file_path = os.path.join(
os.path.abspath(self.file_directory), file_name
)
break
self.logger.debug(f"File path: {file_path}")
return file_path
[docs] def handle_file_delete(self, file_names: List[str]) -> None:
"""
Delete files from device.
:param file_names: Files to be deleted
:type file_names: List[str]
"""
self.logger.info(f"Attempting to delete files: {file_names}")
for file in file_names:
try:
file_path = os.path.join(
os.path.abspath(self.file_directory), file
)
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
self.logger.exception(f"'{file}' failed to delete: {e}")
[docs] def handle_file_purge(self) -> None:
"""Delete all files from device."""
for file in os.listdir(os.path.abspath(self.file_directory)):
if not os.path.isfile(
os.path.join(os.path.abspath(self.file_directory), file)
) or file.startswith("."):
continue
os.remove(os.path.join(os.path.abspath(self.file_directory), file))
def _timeout(self) -> None:
"""Revert to idle state when timeout occurs."""
self.logger.error("Timed out waiting for next package, aborting")
self.current_status = FileManagementStatus(
FileManagementStatusType.ERROR,
FileManagementErrorType.UNKNOWN,
)
self.status_callback(self.file_name, self.current_status)
self.handle_file_upload_abort()
[docs] @staticmethod
def url_download(file_url: str, file_path: str) -> bool:
"""
Attempt to download file from specified URL.
:param file_url: URL from which to download file
:type file_url: str
:param file_path: Path where to store file
:type: file_path: str
:returns: Successful download
:rtype: bool
"""
response = requests.get(file_url)
with open(file_path, "wb") as file:
file.write(response.content)
file.flush()
os.fsync(file)
return os.path.exists(file_path)