Lagi tentang BLE, sensor suhu dan Xiaomi - bagian 2 - MQTT

Pada liburan Mei, ada banyak waktu luang, jadi itu hanya perlu untuk melanjutkan percobaan yang dimulai pada artikel sebelumnya . Sebelumnya, saya berhasil mendapatkan data suhu dan kelembaban dari sensor Xiaomi, tetapi sekarang tugasnya diatur untuk mempelajari cara mengirim data ini ke broker MQTT.


Perubahan pada kode akuisisi data


Untuk memulainya, perlu dicatat bahwa beberapa perubahan harus dilakukan pada kode untuk menerima data dari sensor.


Setelah sejumlah tes, ternyata karena yang tidak diketahui (saya ingin mengatakan belum diselidiki, tetapi tidak menjadi kurang terkenal, jadi mari kita tinggalkan seperti itu) alasan untuk mengiklankan paket BLE mulai pecah. Itu jika sebelumnya nama sensor dan datanya datang dalam satu paket, sekarang datanya datang dalam paket terpisah. Karena alasan ini, saya harus menolak untuk memeriksa nama perangkat:


if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
    return; 

, . 10 , Bluetooth, , . , . . , 0xfe95. , , . 0xfe95 (data+4):


if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
    *foundBlockLength = blockLength-3;
    return data+4;
}

, .


MQTT


MQTT-. Eclipse Mosquitto. , docker-. Linux- ( Windows ), :


docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto

, . , ( ).


Windows — MQTT Explorer.


MQTT- ESP32 (, ). PubSubClient, Arduino IDE Library Manager. . Wifi ( , ), :


WiFiClient wifiClient; // wifi client object 
PubSubClient pubsubClient(wifiClient); // MQTT server client object

pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);

, , .



, , . - , - (, MQTT). 2- ( ).


. ESP32 FreeRTOS . — xTaskCreate(). 2 : BLE Wifi. :


TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);

TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);

BLE — :


void taskScanBleDevices(void* pvParameters) {
    while (true) {
        BLEScan * pBLEScan = BLEDevice::getScan();
        pBLEScan->start(g_scanTime, false);
        Serial.println("Scan done!");
        vTaskDelay(2000 / portTICK_RATE_MS);
    }
}

Wifi MQTT .



, , - . WiFi.begin(), Wifi WiFi.status() (). Wifi- , . . Arduino (, ), . switch.


:


  • WifiReconnect — Wifi ;
  • WifiConnecting — Wifi ;
  • WifiConnected — Wifi ;
  • MqttReconnect — MQTT-;
  • MqttConnecting — MQTT-;
  • MqttConnected — MQTT- , ;
  • Error — .

:


, . switch:


void taskWifiActivity(void* pvParameters) {
    Serial.println("Wifi Task Started");
    WifiActivityState state = WifiActivityState::WifiReconnect;
    bool shouldStop = false;
    while (!shouldStop)
    {
        wl_status_t wifiStatus = WiFi.status();
        Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
        switch (state)
        {
            case WifiActivityState::WifiReconnect:
                vTaskDelay(5000 / portTICK_RATE_MS); // reconnect delay, just in case
                WiFi.begin(WifiSsid, WifiPassword);
                state = WifiActivityState::WifiConnecting;
                Serial.println("Connecting...");
            break;

            case WifiActivityState::WifiConnecting:
                switch (wifiStatus)
                {
                    case WL_CONNECTED:
                        state = WifiActivityState::WifiConnected;
                        Serial.println("Wifi Connected");
                        break;
                    case WL_CONNECT_FAILED:
                        state = WifiActivityState::Error;
                        break;
                    default:
                        vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                        break;
                }
                break;

            case WifiActivityState::WifiConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println(WiFi.localIP());
                    state = WifiActivityState::MqttReconnect;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttReconnect:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("Mqtt server connecting");
                    g_pubsubClient.setServer(MqttServer, 1883);
                    g_pubsubClient.connect(MqttClientName);
                    state = WifiActivityState::MqttConnecting;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnecting:
                if (wifiStatus == WL_CONNECTED) {
                    if (g_pubsubClient.connected()) {
                        Serial.println("Mqtt server connected");
                        state = WifiActivityState::MqttConnected;
                    }
                    vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("...Activity...");
                    if (g_pubsubClient.connected()) {
                        prepareSendBuffer();
                        publishEvents();
                        vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
                    } else {
                        Serial.println("Client Disconnected");
                        state = WifiActivityState::MqttReconnect;
                        vTaskDelay(5000/portTICK_RATE_MS);
                    }
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::Error:
                Serial.println("Connection error");
                shouldStop = true; // end task
                // TODO add code for connection retry with increasing time intervals
                break;
            default:
                break;
        }
    }
    vTaskDelete(NULL);
}

state , . Wifi- . , .


, MQTT- prepareSendBuffer() publishEvents(). .



, Bluetooth . Event, ( , , ). vector Event.


std::vector<Event*> g_eventsBuffer;

Bluetooth :


g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));

MQTT- . , , . FreeRTOS — .


. xSemaphoreCreateMutex(), 2 xSemaphoreTake() xSemaphoreGive().


, .


, ( ), , . . , prepareSendBuffer():


void prepareSendBuffer() {
    Serial.println("Send buffer prepare started");
    xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
    if (!g_eventsBuffer.empty()) {
        Serial.println("Found events");
        for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Trying to add event for address %s\n", address.c_str());

            // we should check if we already added that event type for that deviceAddress
            bool found = false;
            if (!g_eventsSendBuffer.empty()) {

                for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
                    if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
                        found = true;
                        break;
                    }
                }
            }
            if (!found) {
                g_eventsSendBuffer.push_back(e);
                Serial.println("Event added");
            } else {
                delete e; // we don't need this event anymore
            }
        }
    }
    g_eventsBuffer.clear();
    xSemaphoreGive(g_eventsBufferMutex);
    Serial.println("Send buffer prepared");
}

, ( ). . .


publishEvents(), :


void publishEvents() {
    Serial.println("Publish events started");
    const int bufferSize = 1000;
    char* topicStringBuffer = new char[bufferSize];
    char* payloadStringBuffer = new char[bufferSize];

    if (!g_eventsSendBuffer.empty()) {
        for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Publishing event for %s\n", address.c_str());
            switch (e->getEventType())
            {
                case EventType::Temperature:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
                    break;
                case EventType::Humidity:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
                    break;
                case EventType::Battery:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
                    break;
                case EventType::VisibleDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/devices");
                    break;
                case EventType::SensorDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
                    break;
                default:
                    continue;
                    break;
            }
            snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
            Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);

            delete e;
            g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
        }
    }
    Serial.println("Publish events DONE");
    g_eventsSendBuffer.clear();

    delete[] topicStringBuffer;
    delete[] payloadStringBuffer;
}

, MQTT . , .



, Xiaomi MQTT-. - :



MQTT- (, - MQTT). , .


Github.


UPD: Di Github, mereka sedikit mengoreksi saya (terima kasih MikalaiR ). Dalam tugas FreeRTOS, Anda seharusnya tidak hanya mencoba untuk keluar dari fungsi karena Anda perlu memanggilnya vTaskDelete(). Berikut adalah lebih tentang hal itu. Kode pada Github dan di sini saya koreksi.


All Articles