"""Enables firmware update for device."""
# Copyright 2020 WolkAbout Technology s.r.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Callable
from typing import Optional
from wolk import logger_factory
from wolk.interface.firmware_handler import FirmwareHandler
from wolk.interface.firmware_update import FirmwareUpdate
from wolk.model.firmware_update_error_type import FirmwareUpdateErrorType
from wolk.model.firmware_update_status import FirmwareUpdateStatus
from wolk.model.firmware_update_status_type import FirmwareUpdateStatusType
[docs]class OSFirmwareUpdate(FirmwareUpdate):
"""Responsible for everything related to the firmware update process."""
def __init__(
self,
firmware_handler: FirmwareHandler,
status_callback: Callable[[FirmwareUpdateStatus], None],
) -> None:
"""
Enable firmware update for device.
:param firmware_handler: Install firmware and provide current version
:type firmware_handler: FirmwareHandler
:param status_callback: Reporting firmware update status
:type status_callback: Callable[[FirmwareUpdateStatus], None]
"""
self.logger = logger_factory.logger_factory.get_logger(
str(self.__class__.__name__)
)
self.logger.debug(f"firmware_handler: {firmware_handler}")
self.status_callback = status_callback
if not isinstance(firmware_handler, FirmwareHandler):
raise ValueError(
f"Given firmware handler {firmware_handler} is not "
"an instance of FirmwareHandler"
)
self.firmware_handler = firmware_handler
self.current_status: Optional[FirmwareUpdateStatus] = None
[docs] def get_current_version(self) -> str:
"""
Return device's current firmware version.
:returns: Firmware version
:rtype: str
"""
return self.firmware_handler.get_current_version()
[docs] def handle_install(self, file_path: str) -> None:
"""
Handle received firmware installation command.
:param file_path: Firmware file to install
:type file_path: str
"""
self.logger.debug(
f"Handling install command with path path: {file_path}"
)
if self.current_status is not None:
self.logger.warning(
"Not in idle status, ignoring install command."
)
return
if os.path.exists("last_firmware_version.txt"):
self.current_status = FirmwareUpdateStatus(
FirmwareUpdateStatusType.ERROR,
FirmwareUpdateErrorType.UNKNOWN,
)
self.logger.error("Previous firmware update did not complete!")
self.status_callback(self.current_status)
self._reset_state()
return
if not os.path.exists(file_path):
self.current_status = FirmwareUpdateStatus(
FirmwareUpdateStatusType.ERROR,
FirmwareUpdateErrorType.UNKNOWN_FILE,
)
self.logger.error("File not present at given path!")
self.status_callback(self.current_status)
self._reset_state()
return
with open("last_firmware_version.txt", "w") as file:
file.write(self.firmware_handler.get_current_version())
self.current_status = FirmwareUpdateStatus(
FirmwareUpdateStatusType.INSTALLING
)
self.logger.info(
"Beginning firmware installation process "
f"with file path: {file_path}"
)
self.status_callback(self.current_status)
self.firmware_handler.install_firmware(file_path)
[docs] def handle_abort(self) -> None:
"""Handle the abort command received from the platform."""
if self.current_status is not None:
self.logger.info("Aborting firmware installation")
self.current_status = FirmwareUpdateStatus(
FirmwareUpdateStatusType.ABORTED
)
self.status_callback(self.current_status)
self._reset_state()
if os.path.exists("last_firmware_version.txt"):
os.remove("last_firmware_version.txt")
[docs] def report_result(self) -> None:
"""Report the result of the firmware installation process."""
self.logger.debug("Reporting firmware update result")
if not os.path.exists("last_firmware_version.txt"):
self.logger.debug("No stored firmware version found")
return
with open("last_firmware_version.txt") as file:
last_firmware_version = file.read()
if (
last_firmware_version
== self.firmware_handler.get_current_version()
):
self.logger.warning(
"Firmware version unchanged, reporting installation failed"
)
self.current_status = FirmwareUpdateStatus(
FirmwareUpdateStatusType.ERROR,
FirmwareUpdateErrorType.INSTALLATION_FAILED,
)
self.status_callback(self.current_status)
self._reset_state()
os.remove("last_firmware_version.txt")
return
self.logger.info(
"Firmware version changed, reporting installation completed"
)
self.current_status = FirmwareUpdateStatus(
FirmwareUpdateStatusType.SUCCESS
)
self.status_callback(self.current_status)
self._reset_state()
os.remove("last_firmware_version.txt")
def _reset_state(self) -> None:
"""Reset the state of the firmware update process."""
self.current_status = None