Nuevamente sobre los sensores BLE, temperatura y Xiaomi - parte 2 - MQTT

En las vacaciones de mayo, había mucho tiempo libre, por lo que era simplemente necesario continuar con el experimento iniciado en el artículo anterior . Anteriormente, pude obtener datos de temperatura y humedad de un sensor Xiaomi, pero ahora la tarea estaba configurada para aprender cómo enviar estos datos a un agente de MQTT.


Cambios en el código de adquisición de datos.


Para empezar, vale la pena señalar que se tuvieron que hacer varios cambios en el código para recibir datos de los sensores.


Después de un cierto número de pruebas, resultó que debido a lo desconocido (quería decir que no estaba explorado, pero no se vuelve menos conocido, así que dejémoslo así), la razón para anunciar paquetes BLE comienza a romperse. Aquellos. si antes el nombre del sensor y sus datos venían en un paquete, ahora los datos vienen en paquetes separados. Por esta razón, tuve que negarme a verificar el nombre del dispositivo:


if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
    return; 

, . 10 , Bluetooth, , . , . . , 0xfe95. , , . 0xfe95 (data+4):


if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
    *foundBlockLength = blockLength-3;
    return data+4;
}

, .


MQTT


MQTT-. Eclipse Mosquitto. , docker-. Linux- ( Windows ), :


docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto

, . , ( ).


Windows — MQTT Explorer.


MQTT- ESP32 (, ). PubSubClient, Arduino IDE Library Manager. . Wifi ( , ), :


WiFiClient wifiClient; // wifi client object 
PubSubClient pubsubClient(wifiClient); // MQTT server client object

pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);

, , .



, , . - , - (, MQTT). 2- ( ).


. ESP32 FreeRTOS . — xTaskCreate(). 2 : BLE Wifi. :


TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);

TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);

BLE — :


void taskScanBleDevices(void* pvParameters) {
    while (true) {
        BLEScan * pBLEScan = BLEDevice::getScan();
        pBLEScan->start(g_scanTime, false);
        Serial.println("Scan done!");
        vTaskDelay(2000 / portTICK_RATE_MS);
    }
}

Wifi MQTT .



, , - . WiFi.begin(), Wifi WiFi.status() (). Wifi- , . . Arduino (, ), . switch.


:


  • WifiReconnect — Wifi ;
  • WifiConnecting — Wifi ;
  • WifiConnected — Wifi ;
  • MqttReconnect — MQTT-;
  • MqttConnecting — MQTT-;
  • MqttConnected — MQTT- , ;
  • Error — .

:


, . switch:


void taskWifiActivity(void* pvParameters) {
    Serial.println("Wifi Task Started");
    WifiActivityState state = WifiActivityState::WifiReconnect;
    bool shouldStop = false;
    while (!shouldStop)
    {
        wl_status_t wifiStatus = WiFi.status();
        Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
        switch (state)
        {
            case WifiActivityState::WifiReconnect:
                vTaskDelay(5000 / portTICK_RATE_MS); // reconnect delay, just in case
                WiFi.begin(WifiSsid, WifiPassword);
                state = WifiActivityState::WifiConnecting;
                Serial.println("Connecting...");
            break;

            case WifiActivityState::WifiConnecting:
                switch (wifiStatus)
                {
                    case WL_CONNECTED:
                        state = WifiActivityState::WifiConnected;
                        Serial.println("Wifi Connected");
                        break;
                    case WL_CONNECT_FAILED:
                        state = WifiActivityState::Error;
                        break;
                    default:
                        vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                        break;
                }
                break;

            case WifiActivityState::WifiConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println(WiFi.localIP());
                    state = WifiActivityState::MqttReconnect;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttReconnect:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("Mqtt server connecting");
                    g_pubsubClient.setServer(MqttServer, 1883);
                    g_pubsubClient.connect(MqttClientName);
                    state = WifiActivityState::MqttConnecting;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnecting:
                if (wifiStatus == WL_CONNECTED) {
                    if (g_pubsubClient.connected()) {
                        Serial.println("Mqtt server connected");
                        state = WifiActivityState::MqttConnected;
                    }
                    vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("...Activity...");
                    if (g_pubsubClient.connected()) {
                        prepareSendBuffer();
                        publishEvents();
                        vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
                    } else {
                        Serial.println("Client Disconnected");
                        state = WifiActivityState::MqttReconnect;
                        vTaskDelay(5000/portTICK_RATE_MS);
                    }
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::Error:
                Serial.println("Connection error");
                shouldStop = true; // end task
                // TODO add code for connection retry with increasing time intervals
                break;
            default:
                break;
        }
    }
    vTaskDelete(NULL);
}

state , . Wifi- . , .


, MQTT- prepareSendBuffer() publishEvents(). .



, Bluetooth . Event, ( , , ). vector Event.


std::vector<Event*> g_eventsBuffer;

Bluetooth :


g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));

MQTT- . , , . FreeRTOS — .


. xSemaphoreCreateMutex(), 2 xSemaphoreTake() xSemaphoreGive().


, .


, ( ), , . . , prepareSendBuffer():


void prepareSendBuffer() {
    Serial.println("Send buffer prepare started");
    xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
    if (!g_eventsBuffer.empty()) {
        Serial.println("Found events");
        for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Trying to add event for address %s\n", address.c_str());

            // we should check if we already added that event type for that deviceAddress
            bool found = false;
            if (!g_eventsSendBuffer.empty()) {

                for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
                    if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
                        found = true;
                        break;
                    }
                }
            }
            if (!found) {
                g_eventsSendBuffer.push_back(e);
                Serial.println("Event added");
            } else {
                delete e; // we don't need this event anymore
            }
        }
    }
    g_eventsBuffer.clear();
    xSemaphoreGive(g_eventsBufferMutex);
    Serial.println("Send buffer prepared");
}

, ( ). . .


publishEvents(), :


void publishEvents() {
    Serial.println("Publish events started");
    const int bufferSize = 1000;
    char* topicStringBuffer = new char[bufferSize];
    char* payloadStringBuffer = new char[bufferSize];

    if (!g_eventsSendBuffer.empty()) {
        for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Publishing event for %s\n", address.c_str());
            switch (e->getEventType())
            {
                case EventType::Temperature:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
                    break;
                case EventType::Humidity:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
                    break;
                case EventType::Battery:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
                    break;
                case EventType::VisibleDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/devices");
                    break;
                case EventType::SensorDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
                    break;
                default:
                    continue;
                    break;
            }
            snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
            Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);

            delete e;
            g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
        }
    }
    Serial.println("Publish events DONE");
    g_eventsSendBuffer.clear();

    delete[] topicStringBuffer;
    delete[] payloadStringBuffer;
}

, MQTT . , .



, Xiaomi MQTT-. - :



MQTT- (, - MQTT). , .


Github.


UPD: En Github, me corrigieron un poco (gracias MikalaiR ). En las tareas de FreeRTOS, no debe intentar salir de la función solo porque necesita llamarla vTaskDelete(). Aquí hay más al respecto. El código en Github y aquí lo corregí.


All Articles