"""Connectivity service based on MQTT protocol."""
# Copyright 2020 WolkAbout Technology s.r.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from threading import Lock
from time import sleep
from time import time
from typing import Any
from typing import Callable
from typing import List
from typing import Optional
from paho.mqtt import client as mqtt
from wolk import logger_factory
from wolk.interface.connectivity_service import ConnectivityService
from wolk.model.device import Device
from wolk.model.message import Message
MQTT_KEEP_ALIVE_INTERVAL = 90
[docs]class MQTTConnectivityService(ConnectivityService):
"""Handle sending and receiving MQTT messages."""
def __init__(
self,
device: Device,
topics: List[str],
qos: int = 2,
host: str = "insert_host",
port: int = 80, # TODO: insert port
max_retries: int = 3,
ca_cert: Optional[str] = None,
) -> None:
"""
Provide the connection to the WolkAbout IoT Platform.
:param device: Contains device key and password used for authentication
:type device: Device
:param topics: List of topics to subscribe to
:type topics: List[str]
:param max_retries: Number of retries when unexpected disconnect occurs
:type max_retries: int
:param qos: Quality of Service for MQTT connection (0, 1, 2)
:type qos: int
:param host: Address of the MQTT broker
:type host: str
:param port: Port to which to send messages
:type port: int
:param ca_cert: Path to certificate file used to encrypt the connection
:type ca_cert: str or None
"""
self.device = device
self.qos = qos
self.host = host
self.port = port
self.max_retries = max_retries
self.connected = False
self.connected_rc: Optional[int] = None
self.topics = topics
self.ca_cert = ca_cert
self.logger = logger_factory.logger_factory.get_logger(
str(self.__class__.__name__)
)
self.logger.debug(
f"Device key: {self.device.key} ; "
f"Device password: {self.device.password} ;"
f"QoS: {self.qos} ; "
f"Host: {self.host} ; "
f"Port: {self.port} ; "
f"CA certificate: {self.ca_cert}"
)
self.client = mqtt.Client(
client_id=self.device.key, clean_session=True
)
self.inbound_message_listener: Callable[
[Message], None
] = lambda message: print("\n\nNo inbound message listener set!\n\n")
self.timeout: Optional[int] = None
self.timeout_interval = 10
self.mutex = Lock()
[docs] def is_connected(self) -> bool:
"""
Return current connection state.
:returns: connected
:rtype: bool
"""
return self.connected
[docs] def set_inbound_message_listener(
self, listener: Callable[[Message], None]
) -> None:
"""
Set the callback function to handle inbound messages.
:param listener: Function that handles inbound messages
:type listener: Callable[[Message], None]
"""
self.logger.debug(f"Message listener set to: {listener}")
self.inbound_message_listener = listener
[docs] def connect(self) -> bool:
"""
Establish the connection to the WolkAbout IoT platform.
Subscribes to all topics defined by device communication protocol.
Starts a loop to handle inbound messages.
:returns: Connection state, True if connected, False otherwise
:rtype: bool
"""
if self.connected:
return True
self.mutex.acquire()
self.client.on_connect = self._on_mqtt_connect
self.client.on_disconnect = self._on_mqtt_disconnect
self.client.on_message = self._on_mqtt_message
if self.ca_cert:
try:
self.client.tls_set(self.ca_cert)
self.client.tls_insecure_set(True)
except ValueError:
pass # Ignore previously set TLS error
except Exception as exception:
self.logger.exception(
f"Something went wrong when setting TLS: {exception}"
)
self.mutex.release()
return False
self.client.username_pw_set(self.device.key, self.device.password)
self.logger.debug(
f"Connecting with parameters: host='{self.host}', "
f"port={self.port}, ca_cert='{self.ca_cert}', "
f"username='{self.device.key}', "
f"password='{self.device.password}'"
)
try:
self.client.connect(
self.host, self.port, keepalive=MQTT_KEEP_ALIVE_INTERVAL
)
except Exception as exception:
self.logger.exception(
f"Something went wrong while connecting: {exception}"
)
self.mutex.release()
return False
self.client.loop_start()
self.timeout = round(time()) + self.timeout_interval
while True:
if round(time()) > self.timeout:
self.logger.warning("Connection timed out!")
self.timeout = None
self.mutex.release()
return False
if self.connected_rc is None:
sleep(0.1)
continue
if self.connected_rc == 0:
self.logger.info("Connected!")
self.timeout = None
self.connected_rc = None
break
if self.connected_rc == 1:
self.logger.warning(
"Connection refused - incorrect protocol version"
)
self.connected_rc = None
self.timeout = None
self.mutex.release()
return False
if self.connected_rc == 2:
self.logger.warning(
"Connection refused - invalid client identifier"
)
self.connected_rc = None
self.timeout = None
self.mutex.release()
return False
if self.connected_rc == 3:
self.logger.warning("Connection refused - server unavailable")
self.timeout = None
self.connected_rc = None
self.mutex.release()
return False
if self.connected_rc == 4:
self.logger.warning(
"Connection refused - bad username or password"
)
self.connected_rc = None
self.timeout = None
self.mutex.release()
return False
if self.connected_rc == 5:
self.logger.warning("Connection refused - not authorised")
self.connected_rc = None
self.timeout = None
self.mutex.release()
return False
if self.connected_rc not in list(range(6)):
self.logger.warning("Unknown retun code")
self.connected_rc = None
self.timeout = None
self.mutex.release()
return False
self.logger.debug(f"Subscribing to topics: {self.topics}")
for topic in self.topics:
self.client.subscribe(topic, 2)
self.mutex.release()
self.connected = True
return True
[docs] def disconnect(self) -> None:
"""Disconnects the device from the WolkAbout IoT Platform."""
self.logger.info("Disconnecting")
self.client.loop_stop()
self.client.disconnect()
[docs] def publish(self, message: Message) -> bool:
"""
Publish serialized data to WolkAbout IoT Platform.
:param message: Message to be published
:type message: Message
:returns: result
:rtype: bool
"""
if not self.connected:
self.logger.warning(
f"Not connected, unable to publish message: {message}"
)
return False
self.mutex.acquire()
info = self.client.publish(message.topic, message.payload, self.qos)
if info.rc == mqtt.MQTT_ERR_SUCCESS:
self.logger.debug(f"Published message: {message}")
self.mutex.release()
return True
if info.is_published():
self.logger.debug(f"Published message: {message}")
self.mutex.release()
return True
self.logger.warning(f"Failed to publish message: {message}")
self.mutex.release()
return False
def _on_mqtt_message(
self, _client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage
) -> None:
"""
Serialize inbound messages and pass them to inbound message listener.
:param _client: Client that received the message
:type _client: paho.mqtt.Client
:param _userdata: Private user data set in Client()
:type _userdata: str
:param message: Class with members: topic, payload, qos, retain.
:type message: paho.mqtt.MQTTMessage
"""
if not message:
return
received_message = Message(message.topic, message.payload)
if "binary" in received_message.topic: # To skip printing file binary
self.logger.debug(
"Received MQTT message: "
f"{received_message.topic} , "
f"size: {len(received_message.payload)} bytes"
)
else:
self.logger.debug(f"Received MQTT message: {received_message}")
self.inbound_message_listener(received_message)
def _on_mqtt_connect(
self,
_client: mqtt.Client,
_userdata: Any,
_flags: int,
return_code: int,
) -> None:
"""
Handle when the client receives a CONNACK response from the server.
:param _client: Client that received the message
:type _client: paho.mqtt.Client
:param _userdata: private user data set in Client()
:type _userdata: str
:param _flags: Response flags sent by the broker
:type _flags: int
:param return_code: Connection result
:type return_code: int
"""
self.logger.debug(f"Return code: {return_code}")
if return_code == 0: # Connection successful
self.connected = True
self.connected_rc = 0
# Subscribing in on_mqtt_connect() means if we lose the connection
# and reconnect then subscriptions will be renewed.
if self.topics:
for topic in self.topics:
self.client.subscribe(topic, 2)
self.logger.debug(f"Connected : {self.connected}")
elif (
return_code == 1
): # Connection refused - incorrect protocol version
self.connected_rc = 1
elif (
return_code == 2
): # Connection refused - invalid client identifier
self.connected_rc = 2
elif return_code == 3: # Connection refused - server unavailable
self.connected_rc = 3
elif return_code == 4: # Connection refused - bad username or password
self.connected_rc = 4
elif return_code == 5: # Connection refused - not authorised
self.connected_rc = 5
def _on_mqtt_disconnect(
self, _client: mqtt.Client, _userdata: Any, return_code: int
) -> None:
"""
Handle when the client disconnects from the broker.
:param _client: Client that received the message
:type _client: paho.mqtt.Client
:param _userdata: private user data set in Client()
:type _userdata: str
:param return_code: Disconnection result
:type return_code: int
"""
self.connected = False
self.connected_rc = return_code
self.logger.debug(
f"Connected : {self.connected} ;" + f" Return code : {return_code}"
)
if return_code not in [0, 5]:
self.logger.warning("Unexpected disconnect!")
retries = 0
while retries < self.max_retries:
try:
self.logger.info("Attempting to reconnect..")
self.client.reconnect()
return
except Exception as e:
retries += 1
self.logger.exception(f"Reconnect failed: {e}")
self.logger.info("Retrying in 5 seconds..")
sleep(5)
self.logger.warning("Failed to reconnect")