mirror of
https://github.com/Laxilef/OTGateway.git
synced 2025-12-12 03:04:27 +05:00
mqtt refactoring, change version to 1.4.0-rc.1
* added MqttWriter * added MqttWiFiClient (modified WiFiClient for esp8266) * adaptation HomeAssistantHelper for MqttWriter * adaptation HaHelper for new HomeAssistantHelper
This commit is contained in:
182
src/MqttTask.h
182
src/MqttTask.h
@@ -1,5 +1,7 @@
|
||||
#include <WiFiClient.h>
|
||||
#include <PubSubClient.h>
|
||||
#include <MqttWiFiClient.h>
|
||||
#include <MqttWriter.h>
|
||||
#include "HaHelper.h"
|
||||
|
||||
|
||||
@@ -20,15 +22,20 @@ public:
|
||||
delete this->client;
|
||||
}
|
||||
|
||||
if (this->writer != nullptr) {
|
||||
delete this->writer;
|
||||
}
|
||||
|
||||
if (this->wifiClient != nullptr) {
|
||||
delete this->wifiClient;
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
WiFiClient* wifiClient = nullptr;
|
||||
MqttWiFiClient* wifiClient = nullptr;
|
||||
PubSubClient* client = nullptr;
|
||||
HaHelper* haHelper = nullptr;
|
||||
MqttWriter* writer = nullptr;
|
||||
unsigned short readyForSendTime = 15000;
|
||||
unsigned long lastReconnectTime = 0;
|
||||
unsigned long connectedTime = 0;
|
||||
@@ -57,12 +64,39 @@ protected:
|
||||
void setup() {
|
||||
Log.sinfoln("MQTT", F("Started"));
|
||||
|
||||
this->wifiClient = new MqttWiFiClient();
|
||||
this->wifiClient->setSync(true);
|
||||
|
||||
// client settings
|
||||
this->client = new PubSubClient();
|
||||
this->client->setSocketTimeout(2);
|
||||
this->client->setKeepAlive(5);
|
||||
this->client->setClient(*this->wifiClient);
|
||||
this->client->setSocketTimeout(3);
|
||||
this->client->setKeepAlive(15);
|
||||
this->client->setBufferSize(768);
|
||||
this->client->setCallback(std::bind(&MqttTask::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
|
||||
this->client->setCallback([this] (char* topic, uint8_t* payload, unsigned int length) {
|
||||
this->onMessage(topic, payload, length);
|
||||
});
|
||||
|
||||
// writer
|
||||
this->writer = new MqttWriter(this->client, 256);
|
||||
this->writer->setYieldCallback([this] {
|
||||
this->delay(10);
|
||||
});
|
||||
this->writer->setEventPublishCallback([this] (const char* topic, size_t written, size_t length, bool result) {
|
||||
Log.straceln("MQTT", F("%s publish %u of %u bytes to topic: %s"), result ? F("Successfully") : F("Failed"), written, length, topic);
|
||||
|
||||
this->client->loop();
|
||||
this->delay(100);
|
||||
});
|
||||
this->writer->setEventFlushCallback([this] (size_t, size_t) {
|
||||
if (!this->wifiClient->getSync() && this->wifiClient->connected()) {
|
||||
this->wifiClient->flush();
|
||||
}
|
||||
|
||||
#ifdef ARDUINO_ARCH_ESP8266
|
||||
::yield();
|
||||
#endif
|
||||
});
|
||||
|
||||
// ha helper settings
|
||||
this->haHelper = new HaHelper();
|
||||
@@ -70,44 +104,29 @@ protected:
|
||||
this->haHelper->setDeviceVersion(PROJECT_VERSION);
|
||||
this->haHelper->setDeviceModel(PROJECT_NAME);
|
||||
this->haHelper->setDeviceName(PROJECT_NAME);
|
||||
this->haHelper->setClient(this->client);
|
||||
this->haHelper->setYieldCallback([](void* self) {
|
||||
MqttTask* task = static_cast<MqttTask*>(self);
|
||||
task->client->loop();
|
||||
task->delay(100);
|
||||
}, this);
|
||||
this->haHelper->setWriter(this->writer);
|
||||
|
||||
sprintf(buffer, CONFIG_URL, WiFi.localIP().toString().c_str());
|
||||
this->haHelper->setDeviceConfigUrl(buffer);
|
||||
}
|
||||
|
||||
void loop() {
|
||||
if (this->wifiClient == nullptr || (!this->client->connected() && millis() - this->lastReconnectTime >= MQTT_RECONNECT_INTERVAL)) {
|
||||
Log.sinfoln("MQTT", F("Not connected, state: %d"), this->client->state());
|
||||
|
||||
// bug?
|
||||
// memory leak at random times if this is not done
|
||||
if (this->wifiClient != nullptr) {
|
||||
delete this->wifiClient;
|
||||
this->wifiClient = nullptr;
|
||||
}
|
||||
|
||||
this->wifiClient = new WiFiClient();
|
||||
this->client->setClient(*this->wifiClient);
|
||||
this->client->setServer(settings.mqtt.server, settings.mqtt.port);
|
||||
|
||||
Log.sinfoln("MQTT", F("Connecting to %s:%u..."), settings.mqtt.server, settings.mqtt.port);
|
||||
this->client->connect(settings.hostname, settings.mqtt.user, settings.mqtt.password);
|
||||
|
||||
this->lastReconnectTime = millis();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this->client->connected() && this->connected) {
|
||||
this->connected = false;
|
||||
this->onDisconnect();
|
||||
}
|
||||
|
||||
} else if (this->client->connected() && !this->connected) {
|
||||
if (this->wifiClient == nullptr || (!this->client->connected() && millis() - this->lastReconnectTime >= MQTT_RECONNECT_INTERVAL)) {
|
||||
Log.sinfoln("MQTT", F("Not connected, state: %d"), this->client->state());
|
||||
Log.sinfoln("MQTT", F("Connecting to %s:%u..."), settings.mqtt.server, settings.mqtt.port);
|
||||
|
||||
this->client->setServer(settings.mqtt.server, settings.mqtt.port);
|
||||
this->client->connect(settings.hostname, settings.mqtt.user, settings.mqtt.password);
|
||||
|
||||
this->lastReconnectTime = millis();
|
||||
}
|
||||
|
||||
if (this->client->connected() && !this->connected) {
|
||||
this->connected = true;
|
||||
this->onConnect();
|
||||
}
|
||||
@@ -132,21 +151,19 @@ protected:
|
||||
|
||||
// publish variables and status
|
||||
if (this->newConnection || millis() - this->prevPubVarsTime > settings.mqtt.interval) {
|
||||
this->client->publish(
|
||||
this->getTopicPath("status").c_str(),
|
||||
!vars.states.otStatus ? "offline" : vars.states.fault ? "fault" : "online"
|
||||
this->writer->publish(
|
||||
this->haHelper->getDeviceTopic("status").c_str(),
|
||||
!vars.states.otStatus ? "offline" : vars.states.fault ? "fault" : "online",
|
||||
true
|
||||
);
|
||||
this->client->loop();
|
||||
|
||||
this->publishVariables(this->getTopicPath("state").c_str());
|
||||
this->client->loop();
|
||||
this->publishVariables(this->haHelper->getDeviceTopic("state").c_str());
|
||||
this->prevPubVarsTime = millis();
|
||||
}
|
||||
|
||||
// publish settings
|
||||
if (this->newConnection || millis() - this->prevPubSettingsTime > settings.mqtt.interval * 10) {
|
||||
this->publishSettings(this->getTopicPath("settings").c_str());
|
||||
this->client->loop();
|
||||
this->publishSettings(this->haHelper->getDeviceTopic("settings").c_str());
|
||||
this->prevPubSettingsTime = millis();
|
||||
}
|
||||
|
||||
@@ -173,8 +190,8 @@ protected:
|
||||
Log.sinfoln("MQTT", F("Emergency mode disabled"));
|
||||
}
|
||||
|
||||
this->client->subscribe(this->getTopicPath("settings/set").c_str());
|
||||
this->client->subscribe(this->getTopicPath("state/set").c_str());
|
||||
this->client->subscribe(this->haHelper->getDeviceTopic("settings/set").c_str());
|
||||
this->client->subscribe(this->haHelper->getDeviceTopic("state/set").c_str());
|
||||
}
|
||||
|
||||
void onDisconnect() {
|
||||
@@ -184,7 +201,7 @@ protected:
|
||||
Log.swarningln("MQTT", F("Disconnected (reason: %d uptime: %u s.)"), this->client->state(), uptime);
|
||||
}
|
||||
|
||||
void onMessage(char* topic, byte* payload, unsigned int length) {
|
||||
void onMessage(char* topic, uint8_t* payload, unsigned int length) {
|
||||
if (!length) {
|
||||
return;
|
||||
}
|
||||
@@ -212,37 +229,16 @@ protected:
|
||||
JsonDocument doc;
|
||||
DeserializationError dErr = deserializeJson(doc, payload, length);
|
||||
if (dErr != DeserializationError::Ok || doc.isNull()) {
|
||||
const char* errMsg;
|
||||
switch (dErr.code()) {
|
||||
case DeserializationError::EmptyInput:
|
||||
case DeserializationError::IncompleteInput:
|
||||
case DeserializationError::InvalidInput:
|
||||
errMsg = "invalid input";
|
||||
break;
|
||||
|
||||
case DeserializationError::NoMemory:
|
||||
errMsg = "no memory";
|
||||
break;
|
||||
|
||||
case DeserializationError::TooDeep:
|
||||
errMsg = "too deep";
|
||||
break;
|
||||
|
||||
default:
|
||||
errMsg = "failed";
|
||||
break;
|
||||
}
|
||||
Log.swarningln("MQTT.MSG", F("No deserialization: %s"), errMsg);
|
||||
|
||||
Log.swarningln("MQTT.MSG", F("Error on deserialization: %s"), dErr.f_str());
|
||||
return;
|
||||
}
|
||||
|
||||
if (this->getTopicPath("state/set").compare(topic) == 0) {
|
||||
this->client->publish(this->getTopicPath("state/set").c_str(), NULL, true);
|
||||
if (this->haHelper->getDeviceTopic("state/set").equals(topic)) {
|
||||
this->writer->publish(this->haHelper->getDeviceTopic("state/set").c_str(), nullptr, 0, true);
|
||||
this->updateVariables(doc);
|
||||
|
||||
} else if (this->getTopicPath("settings/set").compare(topic) == 0) {
|
||||
this->client->publish(this->getTopicPath("settings/set").c_str(), NULL, true);
|
||||
} else if (this->haHelper->getDeviceTopic("settings/set").equals(topic)) {
|
||||
this->writer->publish(this->haHelper->getDeviceTopic("settings/set").c_str(), nullptr, 0, true);
|
||||
this->updateSettings(doc);
|
||||
}
|
||||
}
|
||||
@@ -727,27 +723,7 @@ protected:
|
||||
doc["sensors"]["indoor"]["type"] = settings.sensors.indoor.type;
|
||||
doc["sensors"]["indoor"]["offset"] = settings.sensors.indoor.offset;
|
||||
|
||||
|
||||
size_t docSize = measureJson(doc);
|
||||
uint8_t* buffer = (uint8_t*) malloc(docSize * sizeof(*buffer));
|
||||
size_t length = serializeJson(doc, buffer, docSize);
|
||||
|
||||
size_t written = 0;
|
||||
if (length != 0) {
|
||||
if (this->client->beginPublish(topic, docSize, true)) {
|
||||
for (size_t offset = 0; offset < docSize; offset += 128) {
|
||||
size_t packetSize = offset + 128 <= docSize ? 128 : docSize - offset;
|
||||
written += this->client->write(buffer + offset, packetSize);
|
||||
}
|
||||
|
||||
this->client->flush();
|
||||
}
|
||||
}
|
||||
free(buffer);
|
||||
|
||||
Log.straceln("MQTT", "Publish %u of %u bytes to topic: %s", written, docSize, topic);
|
||||
|
||||
return docSize == written;
|
||||
return this->writer->publish(topic, doc, true);
|
||||
}
|
||||
|
||||
bool publishVariables(const char* topic) {
|
||||
@@ -782,30 +758,6 @@ protected:
|
||||
doc["parameters"]["dhwMinTemp"] = vars.parameters.dhwMinTemp;
|
||||
doc["parameters"]["dhwMaxTemp"] = vars.parameters.dhwMaxTemp;
|
||||
|
||||
|
||||
size_t docSize = measureJson(doc);
|
||||
uint8_t* buffer = (uint8_t*) malloc(docSize * sizeof(*buffer));
|
||||
size_t length = serializeJson(doc, buffer, docSize);
|
||||
|
||||
size_t written = 0;
|
||||
if (length != 0) {
|
||||
if (this->client->beginPublish(topic, docSize, true)) {
|
||||
for (size_t offset = 0; offset < docSize; offset += 128) {
|
||||
size_t packetSize = offset + 128 <= docSize ? 128 : docSize - offset;
|
||||
written += this->client->write(buffer + offset, packetSize);
|
||||
}
|
||||
|
||||
this->client->flush();
|
||||
}
|
||||
}
|
||||
free(buffer);
|
||||
|
||||
Log.straceln("MQTT", "Publish %u of %u bytes to topic: %s", written, docSize, topic);
|
||||
|
||||
return docSize == written;
|
||||
}
|
||||
|
||||
static std::string getTopicPath(const char* topic) {
|
||||
return std::string(settings.mqtt.prefix) + "/" + std::string(topic);
|
||||
return this->writer->publish(topic, doc, true);
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user