Nochmals zu BLE-, Temperatur- und Xiaomi-Sensoren - Teil 2 - MQTT

In den Maiferien gab es viel Freizeit, so dass es einfach notwendig war, das im vorherigen Artikel begonnene Experiment fortzusetzen . FrĂŒher gelang es mir, Temperatur- und Feuchtigkeitsdaten von einem Xiaomi-Sensor abzurufen, jetzt wurde die Aufgabe festgelegt, zu lernen, wie diese Daten an einen MQTT-Broker gesendet werden.


Änderungen am Datenerfassungscode


ZunĂ€chst ist anzumerken, dass einige Änderungen am Code fĂŒr den Empfang von Daten von Sensoren vorgenommen werden mussten.


Nach einer bestimmten Anzahl von Tests stellte sich heraus, dass aufgrund des Unbekannten (ich wollte sagen, unerforscht, aber es wird nicht weniger bekannt, also lassen wir es so) der Grund fĂŒr die Werbung fĂŒr BLE-Pakete zu brechen beginnt. Jene. Wenn frĂŒher der Name des Sensors und seine Daten in einem Paket enthalten waren, werden die Daten jetzt in separaten Paketen geliefert. Aus diesem Grund musste ich mich weigern, den GerĂ€tenamen zu ĂŒberprĂŒfen:


if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
    return; 

, . 10 , Bluetooth, , . , . . , 0xfe95. , , . 0xfe95 (data+4):


if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
    *foundBlockLength = blockLength-3;
    return data+4;
}

, .


MQTT


MQTT-. Eclipse Mosquitto. , docker-. Linux- ( Windows ), :


docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto

, . , ( ).


Windows — MQTT Explorer.


MQTT- ESP32 (, ). PubSubClient, Arduino IDE Library Manager. . Wifi ( , ), :


WiFiClient wifiClient; // wifi client object 
PubSubClient pubsubClient(wifiClient); // MQTT server client object

pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);

, , .



, , . - , - (, MQTT). 2- ( ).


. ESP32 FreeRTOS . — xTaskCreate(). 2 : BLE Wifi. :


TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);

TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);

BLE — :


void taskScanBleDevices(void* pvParameters) {
    while (true) {
        BLEScan * pBLEScan = BLEDevice::getScan();
        pBLEScan->start(g_scanTime, false);
        Serial.println("Scan done!");
        vTaskDelay(2000 / portTICK_RATE_MS);
    }
}

Wifi MQTT .



, , - . WiFi.begin(), Wifi WiFi.status() (). Wifi- , . . Arduino (, ), . switch.


:


  • WifiReconnect — Wifi ;
  • WifiConnecting — Wifi ;
  • WifiConnected — Wifi ;
  • MqttReconnect — MQTT-;
  • MqttConnecting — MQTT-;
  • MqttConnected — MQTT- , ;
  • Error — .

:


, . switch:


void taskWifiActivity(void* pvParameters) {
    Serial.println("Wifi Task Started");
    WifiActivityState state = WifiActivityState::WifiReconnect;
    bool shouldStop = false;
    while (!shouldStop)
    {
        wl_status_t wifiStatus = WiFi.status();
        Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
        switch (state)
        {
            case WifiActivityState::WifiReconnect:
                vTaskDelay(5000 / portTICK_RATE_MS); // reconnect delay, just in case
                WiFi.begin(WifiSsid, WifiPassword);
                state = WifiActivityState::WifiConnecting;
                Serial.println("Connecting...");
            break;

            case WifiActivityState::WifiConnecting:
                switch (wifiStatus)
                {
                    case WL_CONNECTED:
                        state = WifiActivityState::WifiConnected;
                        Serial.println("Wifi Connected");
                        break;
                    case WL_CONNECT_FAILED:
                        state = WifiActivityState::Error;
                        break;
                    default:
                        vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                        break;
                }
                break;

            case WifiActivityState::WifiConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println(WiFi.localIP());
                    state = WifiActivityState::MqttReconnect;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttReconnect:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("Mqtt server connecting");
                    g_pubsubClient.setServer(MqttServer, 1883);
                    g_pubsubClient.connect(MqttClientName);
                    state = WifiActivityState::MqttConnecting;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnecting:
                if (wifiStatus == WL_CONNECTED) {
                    if (g_pubsubClient.connected()) {
                        Serial.println("Mqtt server connected");
                        state = WifiActivityState::MqttConnected;
                    }
                    vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("...Activity...");
                    if (g_pubsubClient.connected()) {
                        prepareSendBuffer();
                        publishEvents();
                        vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
                    } else {
                        Serial.println("Client Disconnected");
                        state = WifiActivityState::MqttReconnect;
                        vTaskDelay(5000/portTICK_RATE_MS);
                    }
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::Error:
                Serial.println("Connection error");
                shouldStop = true; // end task
                // TODO add code for connection retry with increasing time intervals
                break;
            default:
                break;
        }
    }
    vTaskDelete(NULL);
}

state , . Wifi- . , .


, MQTT- prepareSendBuffer() publishEvents(). .



, Bluetooth . Event, ( , , ). vector Event.


std::vector<Event*> g_eventsBuffer;

Bluetooth :


g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));

MQTT- . , , . FreeRTOS — .


. xSemaphoreCreateMutex(), 2 xSemaphoreTake() xSemaphoreGive().


, .


, ( ), , . . , prepareSendBuffer():


void prepareSendBuffer() {
    Serial.println("Send buffer prepare started");
    xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
    if (!g_eventsBuffer.empty()) {
        Serial.println("Found events");
        for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Trying to add event for address %s\n", address.c_str());

            // we should check if we already added that event type for that deviceAddress
            bool found = false;
            if (!g_eventsSendBuffer.empty()) {

                for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
                    if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
                        found = true;
                        break;
                    }
                }
            }
            if (!found) {
                g_eventsSendBuffer.push_back(e);
                Serial.println("Event added");
            } else {
                delete e; // we don't need this event anymore
            }
        }
    }
    g_eventsBuffer.clear();
    xSemaphoreGive(g_eventsBufferMutex);
    Serial.println("Send buffer prepared");
}

, ( ). . .


publishEvents(), :


void publishEvents() {
    Serial.println("Publish events started");
    const int bufferSize = 1000;
    char* topicStringBuffer = new char[bufferSize];
    char* payloadStringBuffer = new char[bufferSize];

    if (!g_eventsSendBuffer.empty()) {
        for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Publishing event for %s\n", address.c_str());
            switch (e->getEventType())
            {
                case EventType::Temperature:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
                    break;
                case EventType::Humidity:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
                    break;
                case EventType::Battery:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
                    break;
                case EventType::VisibleDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/devices");
                    break;
                case EventType::SensorDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
                    break;
                default:
                    continue;
                    break;
            }
            snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
            Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);

            delete e;
            g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
        }
    }
    Serial.println("Publish events DONE");
    g_eventsSendBuffer.clear();

    delete[] topicStringBuffer;
    delete[] payloadStringBuffer;
}

, MQTT . , .



, Xiaomi MQTT-. - :



MQTT- (, - MQTT). , .


Github.


UPD: Auf Github haben sie mich ein wenig korrigiert (danke MikalaiR ). In FreeRTOS-Aufgaben sollten Sie nicht nur versuchen, die Funktion zu beenden, da Sie sie aufrufen mĂŒssen vTaskDelete(). Hier ist mehr darĂŒber. Den Code auf Github und hier habe ich korrigiert.


All Articles