At the May holidays, there was a lot of free time, so it was simply necessary to continue the experiment begun in the previous article . Previously, I managed to get temperature and humidity data from a Xiaomi sensor, but now the task was set to learn how to send this data to an MQTT broker.

Changes to the data acquisition code
To begin with, it is worth noting that several changes had to be made to the code for receiving data from sensors.
After a certain number of tests, it turned out that due to the unknown (I wanted to say unexplored, but it does not become less well-known, so let’s leave it like that) the reason for advertising BLE packets begin to break. Those. if earlier the name of the sensor and its data came in one packet, now the data comes in separate packets. For this reason, I had to refuse to check the device name:
if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
return;
, . 10 , Bluetooth, , . , . . , 0xfe95
. , , . 0xfe95
(data+4
):
if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
*foundBlockLength = blockLength-3;
return data+4;
}
, .
MQTT
MQTT-. Eclipse Mosquitto. , docker-. Linux- ( Windows ), :
docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto
, . , ( ).
Windows — MQTT Explorer.
MQTT- ESP32 (, ). PubSubClient, Arduino IDE Library Manager. . Wifi ( , ), :
WiFiClient wifiClient;
PubSubClient pubsubClient(wifiClient);
pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);
, , .
, , . - , - (, MQTT). 2- ( ).
. ESP32 FreeRTOS . — xTaskCreate()
. 2 : BLE Wifi. :
TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);
TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);
BLE — :
void taskScanBleDevices(void* pvParameters) {
while (true) {
BLEScan * pBLEScan = BLEDevice::getScan();
pBLEScan->start(g_scanTime, false);
Serial.println("Scan done!");
vTaskDelay(2000 / portTICK_RATE_MS);
}
}
Wifi MQTT .
, , - . WiFi.begin()
, Wifi WiFi.status()
(). Wifi- , . . Arduino (, ), . switch
.
:
WifiReconnect
— Wifi ;WifiConnecting
— Wifi ;WifiConnected
— Wifi ;MqttReconnect
— MQTT-;MqttConnecting
— MQTT-;MqttConnected
— MQTT- , ;Error
— .
:

, . switch
:
void taskWifiActivity(void* pvParameters) {
Serial.println("Wifi Task Started");
WifiActivityState state = WifiActivityState::WifiReconnect;
bool shouldStop = false;
while (!shouldStop)
{
wl_status_t wifiStatus = WiFi.status();
Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
switch (state)
{
case WifiActivityState::WifiReconnect:
vTaskDelay(5000 / portTICK_RATE_MS);
WiFi.begin(WifiSsid, WifiPassword);
state = WifiActivityState::WifiConnecting;
Serial.println("Connecting...");
break;
case WifiActivityState::WifiConnecting:
switch (wifiStatus)
{
case WL_CONNECTED:
state = WifiActivityState::WifiConnected;
Serial.println("Wifi Connected");
break;
case WL_CONNECT_FAILED:
state = WifiActivityState::Error;
break;
default:
vTaskDelay(1000 / portTICK_RATE_MS);
break;
}
break;
case WifiActivityState::WifiConnected:
if (wifiStatus == WL_CONNECTED) {
Serial.println(WiFi.localIP());
state = WifiActivityState::MqttReconnect;
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::MqttReconnect:
if (wifiStatus == WL_CONNECTED) {
Serial.println("Mqtt server connecting");
g_pubsubClient.setServer(MqttServer, 1883);
g_pubsubClient.connect(MqttClientName);
state = WifiActivityState::MqttConnecting;
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::MqttConnecting:
if (wifiStatus == WL_CONNECTED) {
if (g_pubsubClient.connected()) {
Serial.println("Mqtt server connected");
state = WifiActivityState::MqttConnected;
}
vTaskDelay(1000 / portTICK_RATE_MS);
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::MqttConnected:
if (wifiStatus == WL_CONNECTED) {
Serial.println("...Activity...");
if (g_pubsubClient.connected()) {
prepareSendBuffer();
publishEvents();
vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
} else {
Serial.println("Client Disconnected");
state = WifiActivityState::MqttReconnect;
vTaskDelay(5000/portTICK_RATE_MS);
}
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::Error:
Serial.println("Connection error");
shouldStop = true;
break;
default:
break;
}
}
vTaskDelete(NULL);
}
state
, . Wifi- . , .
, MQTT- prepareSendBuffer()
publishEvents()
. .
, Bluetooth . Event
, ( , , ). vector
Event
.
std::vector<Event*> g_eventsBuffer;
Bluetooth :
g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));
MQTT- . , , . FreeRTOS — .
. xSemaphoreCreateMutex()
, 2 xSemaphoreTake()
xSemaphoreGive()
.
, .
, ( ), , . . , prepareSendBuffer()
:
void prepareSendBuffer() {
Serial.println("Send buffer prepare started");
xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
if (!g_eventsBuffer.empty()) {
Serial.println("Found events");
for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
Event* e = *i;
std::string address = e->getDeviceAddress();
Serial.printf("Trying to add event for address %s\n", address.c_str());
bool found = false;
if (!g_eventsSendBuffer.empty()) {
for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) {
if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
found = true;
break;
}
}
}
if (!found) {
g_eventsSendBuffer.push_back(e);
Serial.println("Event added");
} else {
delete e;
}
}
}
g_eventsBuffer.clear();
xSemaphoreGive(g_eventsBufferMutex);
Serial.println("Send buffer prepared");
}
, ( ). . .
publishEvents()
, :
void publishEvents() {
Serial.println("Publish events started");
const int bufferSize = 1000;
char* topicStringBuffer = new char[bufferSize];
char* payloadStringBuffer = new char[bufferSize];
if (!g_eventsSendBuffer.empty()) {
for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) {
Event* e = *i;
std::string address = e->getDeviceAddress();
Serial.printf("Publishing event for %s\n", address.c_str());
switch (e->getEventType())
{
case EventType::Temperature:
snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
break;
case EventType::Humidity:
snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
break;
case EventType::Battery:
snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
break;
case EventType::VisibleDevices:
snprintf(topicStringBuffer, bufferSize, "sensor/devices");
break;
case EventType::SensorDevices:
snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
break;
default:
continue;
break;
}
snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);
delete e;
g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
}
}
Serial.println("Publish events DONE");
g_eventsSendBuffer.clear();
delete[] topicStringBuffer;
delete[] payloadStringBuffer;
}
, MQTT . , .
, Xiaomi MQTT-. - :

MQTT- (, - MQTT). , .
Github.
UPD: On Github, they corrected me a little (thanks MikalaiR ). In FreeRTOS tasks, you shouldn’t just try to exit the function because you need to call it vTaskDelete()
. Here is more about it. The code on Github and here I corrected.