Encore une fois sur les capteurs BLE, de température et Xiaomi - partie 2 - MQTT

Aux vacances de mai, il y avait beaucoup de temps libre, il fallait donc simplement continuer l'expérience commencée dans l' article précédent . Auparavant, j'ai réussi à obtenir des données de température et d'humidité à partir d'un capteur Xiaomi, mais maintenant la tùche était définie pour apprendre à envoyer ces données à un courtier MQTT.


Modifications du code d'acquisition de données


Pour commencer, il convient de noter que plusieurs modifications ont dĂ» ĂȘtre apportĂ©es au code pour recevoir les donnĂ©es des capteurs.


AprÚs un certain nombre de tests, il s'est avéré qu'en raison de l'inconnu (je voulais dire inexploré, mais cela ne devient pas moins connu, alors laissons-le comme ça) la raison de la publicité pour les paquets BLE commence à casser. Ceux. si auparavant le nom du capteur et ses données venaient dans un seul paquet, maintenant les données viennent dans des paquets séparés. Pour cette raison, j'ai dû refuser de vérifier le nom de l'appareil:


if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
    return; 

, . 10 , Bluetooth, , . , . . , 0xfe95. , , . 0xfe95 (data+4):


if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
    *foundBlockLength = blockLength-3;
    return data+4;
}

, .


MQTT


MQTT-. Eclipse Mosquitto. , docker-. Linux- ( Windows ), :


docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto

, . , ( ).


Windows — MQTT Explorer.


MQTT- ESP32 (, ). PubSubClient, Arduino IDE Library Manager. . Wifi ( , ), :


WiFiClient wifiClient; // wifi client object 
PubSubClient pubsubClient(wifiClient); // MQTT server client object

pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);

, , .



, , . - , - (, MQTT). 2- ( ).


. ESP32 FreeRTOS . — xTaskCreate(). 2 : BLE Wifi. :


TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);

TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);

BLE — :


void taskScanBleDevices(void* pvParameters) {
    while (true) {
        BLEScan * pBLEScan = BLEDevice::getScan();
        pBLEScan->start(g_scanTime, false);
        Serial.println("Scan done!");
        vTaskDelay(2000 / portTICK_RATE_MS);
    }
}

Wifi MQTT .



, , - . WiFi.begin(), Wifi WiFi.status() (). Wifi- , . . Arduino (, ), . switch.


:


  • WifiReconnect — Wifi ;
  • WifiConnecting — Wifi ;
  • WifiConnected — Wifi ;
  • MqttReconnect — MQTT-;
  • MqttConnecting — MQTT-;
  • MqttConnected — MQTT- , ;
  • Error — .

:


, . switch:


void taskWifiActivity(void* pvParameters) {
    Serial.println("Wifi Task Started");
    WifiActivityState state = WifiActivityState::WifiReconnect;
    bool shouldStop = false;
    while (!shouldStop)
    {
        wl_status_t wifiStatus = WiFi.status();
        Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
        switch (state)
        {
            case WifiActivityState::WifiReconnect:
                vTaskDelay(5000 / portTICK_RATE_MS); // reconnect delay, just in case
                WiFi.begin(WifiSsid, WifiPassword);
                state = WifiActivityState::WifiConnecting;
                Serial.println("Connecting...");
            break;

            case WifiActivityState::WifiConnecting:
                switch (wifiStatus)
                {
                    case WL_CONNECTED:
                        state = WifiActivityState::WifiConnected;
                        Serial.println("Wifi Connected");
                        break;
                    case WL_CONNECT_FAILED:
                        state = WifiActivityState::Error;
                        break;
                    default:
                        vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                        break;
                }
                break;

            case WifiActivityState::WifiConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println(WiFi.localIP());
                    state = WifiActivityState::MqttReconnect;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttReconnect:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("Mqtt server connecting");
                    g_pubsubClient.setServer(MqttServer, 1883);
                    g_pubsubClient.connect(MqttClientName);
                    state = WifiActivityState::MqttConnecting;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnecting:
                if (wifiStatus == WL_CONNECTED) {
                    if (g_pubsubClient.connected()) {
                        Serial.println("Mqtt server connected");
                        state = WifiActivityState::MqttConnected;
                    }
                    vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("...Activity...");
                    if (g_pubsubClient.connected()) {
                        prepareSendBuffer();
                        publishEvents();
                        vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
                    } else {
                        Serial.println("Client Disconnected");
                        state = WifiActivityState::MqttReconnect;
                        vTaskDelay(5000/portTICK_RATE_MS);
                    }
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::Error:
                Serial.println("Connection error");
                shouldStop = true; // end task
                // TODO add code for connection retry with increasing time intervals
                break;
            default:
                break;
        }
    }
    vTaskDelete(NULL);
}

state , . Wifi- . , .


, MQTT- prepareSendBuffer() publishEvents(). .



, Bluetooth . Event, ( , , ). vector Event.


std::vector<Event*> g_eventsBuffer;

Bluetooth :


g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));

MQTT- . , , . FreeRTOS — .


. xSemaphoreCreateMutex(), 2 xSemaphoreTake() xSemaphoreGive().


, .


, ( ), , . . , prepareSendBuffer():


void prepareSendBuffer() {
    Serial.println("Send buffer prepare started");
    xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
    if (!g_eventsBuffer.empty()) {
        Serial.println("Found events");
        for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Trying to add event for address %s\n", address.c_str());

            // we should check if we already added that event type for that deviceAddress
            bool found = false;
            if (!g_eventsSendBuffer.empty()) {

                for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
                    if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
                        found = true;
                        break;
                    }
                }
            }
            if (!found) {
                g_eventsSendBuffer.push_back(e);
                Serial.println("Event added");
            } else {
                delete e; // we don't need this event anymore
            }
        }
    }
    g_eventsBuffer.clear();
    xSemaphoreGive(g_eventsBufferMutex);
    Serial.println("Send buffer prepared");
}

, ( ). . .


publishEvents(), :


void publishEvents() {
    Serial.println("Publish events started");
    const int bufferSize = 1000;
    char* topicStringBuffer = new char[bufferSize];
    char* payloadStringBuffer = new char[bufferSize];

    if (!g_eventsSendBuffer.empty()) {
        for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Publishing event for %s\n", address.c_str());
            switch (e->getEventType())
            {
                case EventType::Temperature:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
                    break;
                case EventType::Humidity:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
                    break;
                case EventType::Battery:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
                    break;
                case EventType::VisibleDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/devices");
                    break;
                case EventType::SensorDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
                    break;
                default:
                    continue;
                    break;
            }
            snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
            Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);

            delete e;
            g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
        }
    }
    Serial.println("Publish events DONE");
    g_eventsSendBuffer.clear();

    delete[] topicStringBuffer;
    delete[] payloadStringBuffer;
}

, MQTT . , .



, Xiaomi MQTT-. - :



MQTT- (, - MQTT). , .


Github.


UPD: Sur Github, ils m'ont un peu corrigé (merci MikalaiR ). Dans les tùches FreeRTOS, vous ne devez pas simplement essayer de quitter la fonction car vous devez l'appeler vTaskDelete(). Ici est plus. Le code sur Github et ici j'ai corrigé.


All Articles