рдлрд┐рд░ рд╕реЗ BLE, рддрд╛рдкрдорд╛рди рдФрд░ Xiaomi рд╕реЗрдВрд╕рд░ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ - рднрд╛рдЧ 2 - MQTT

рдордИ рдХреА рдЫреБрдЯреНрдЯрд┐рдпреЛрдВ рдореЗрдВ, рдмрд╣реБрдд рдЦрд╛рд▓реА рд╕рдордп рдерд╛, рдЗрд╕рд▓рд┐рдП рдкрд┐рдЫрд▓реЗ рд▓реЗрдЦ рдореЗрдВ рд╢реБрд░реВ рдХрд┐рдП рдЧрдП рдкреНрд░рдпреЛрдЧ рдХреЛ рдЬрд╛рд░реА рд░рдЦрдиреЗ рдХреЗ рд▓рд┐рдП рдмрд╕ рдЖрд╡рд╢реНрдпрдХ рдерд╛ ред рдкрд╣рд▓реЗ, рдореИрдВ Xiaomi рд╕реЗрдВрд╕рд░ рд╕реЗ рддрд╛рдкрдорд╛рди рдФрд░ рдЖрд░реНрджреНрд░рддрд╛ рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдореЗрдВ рдХрд╛рдордпрд╛рдм рд░рд╣рд╛, рд▓реЗрдХрд┐рди рдЕрдм рдпрд╣ рдХрд╛рд░реНрдп рдпрд╣ рдЬрд╛рдирдиреЗ рдХреЗ рд▓рд┐рдП рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛ рдХрд┐ рдЗрд╕ рдбреЗрдЯрд╛ рдХреЛ MQTT рдмреНрд░реЛрдХрд░ рдХреЛ рдХреИрд╕реЗ рднреЗрдЬрд╛ рдЬрд╛рдПред


рдбреЗрдЯрд╛ рдЕрдзрд┐рдЧреНрд░рд╣рдг рдХреЛрдб рдореЗрдВ рдкрд░рд┐рд╡рд░реНрддрди


рд╢реБрд░реВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдпрд╣ рдзреНрдпрд╛рди рджреЗрдиреЗ рдпреЛрдЧреНрдп рд╣реИ рдХрд┐ рд╕реЗрдВрд╕рд░ рд╕реЗ рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдХреЛрдб рдореЗрдВ рдХрдИ рдмрджрд▓рд╛рд╡ рдХрд┐рдП рдЬрд╛рдиреЗ рдереЗ ред


рдПрдХ рдирд┐рд╢реНрдЪрд┐рдд рд╕рдВрдЦреНрдпрд╛ рдореЗрдВ рдкрд░реАрдХреНрд╖рдгреЛрдВ рдХреЗ рдмрд╛рдж, рдпрд╣ рдкрддрд╛ рдЪрд▓рд╛ рдХрд┐ рдЕрдЬреНрдЮрд╛рдд рдХреЗ рдХрд╛рд░рдг (рдореИрдВ рдЕрд╕реНрдкрд╖реНрдЯ рдмреЛрд▓рдирд╛ рдЪрд╛рд╣рддрд╛ рдерд╛, рд▓реЗрдХрд┐рди рдпрд╣ рдХрдо рдкреНрд░рд╕рд┐рджреНрдз рдирд╣реАрдВ рд╣реИ, рдЗрд╕рд▓рд┐рдП рдЗрд╕реЗ рдРрд╕реЗ рд╣реА рдЫреЛрдбрд╝ рджреЗрдВ) рд╡рд┐рдЬреНрдЮрд╛рдкрди BLE рдкреИрдХреЗрдЯ рдХрд╛ рдХрд╛рд░рдг рдЯреВрдЯрдирд╛ рд╢реБрд░реВ рд╣реЛ рдЬрд╛рддрд╛ рд╣реИред рдЙрдиред рдпрджрд┐ рдкрд╣рд▓реЗ рд╕реЗрдВрд╕рд░ рдФрд░ рдЙрд╕рдХреЗ рдбреЗрдЯрд╛ рдХрд╛ рдирд╛рдо рдПрдХ рдкреИрдХреЗрдЯ рдореЗрдВ рдЖрддрд╛ рдерд╛, рддреЛ рдЕрдм рдбреЗрдЯрд╛ рдЕрд▓рдЧ-рдЕрд▓рдЧ рдкреИрдХреЗрдЯ рдореЗрдВ рдЖрддрд╛ рд╣реИред рдЗрд╕ рдХрд╛рд░рдг рд╕реЗ, рдореБрдЭреЗ рдбрд┐рд╡рд╛рдЗрд╕ рдХрд╛ рдирд╛рдо рдЬрд╛рдБрдЪрдиреЗ рд╕реЗ рдЗрдВрдХрд╛рд░ рдХрд░рдирд╛ рдкрдбрд╝рд╛:


if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
    return; 

, . 10 , Bluetooth, , . , . . , 0xfe95. , , . 0xfe95 (data+4):


if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
    *foundBlockLength = blockLength-3;
    return data+4;
}

, .


MQTT


MQTT-. Eclipse Mosquitto. , docker-. Linux- ( Windows ), :


docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto

, . , ( ).


Windows тАФ MQTT Explorer.


MQTT- ESP32 (, ). PubSubClient, Arduino IDE Library Manager. . Wifi ( , ), :


WiFiClient wifiClient; // wifi client object 
PubSubClient pubsubClient(wifiClient); // MQTT server client object

pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);

, , .



, , . - , - (, MQTT). 2- ( ).


. ESP32 FreeRTOS . тАФ xTaskCreate(). 2 : BLE Wifi. :


TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);

TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);

BLE тАФ :


void taskScanBleDevices(void* pvParameters) {
    while (true) {
        BLEScan * pBLEScan = BLEDevice::getScan();
        pBLEScan->start(g_scanTime, false);
        Serial.println("Scan done!");
        vTaskDelay(2000 / portTICK_RATE_MS);
    }
}

Wifi MQTT .



, , - . WiFi.begin(), Wifi WiFi.status() (). Wifi- , . . Arduino (, ), . switch.


:


  • WifiReconnect тАФ Wifi ;
  • WifiConnecting тАФ Wifi ;
  • WifiConnected тАФ Wifi ;
  • MqttReconnect тАФ MQTT-;
  • MqttConnecting тАФ MQTT-;
  • MqttConnected тАФ MQTT- , ;
  • Error тАФ .

:


, . switch:


void taskWifiActivity(void* pvParameters) {
    Serial.println("Wifi Task Started");
    WifiActivityState state = WifiActivityState::WifiReconnect;
    bool shouldStop = false;
    while (!shouldStop)
    {
        wl_status_t wifiStatus = WiFi.status();
        Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
        switch (state)
        {
            case WifiActivityState::WifiReconnect:
                vTaskDelay(5000 / portTICK_RATE_MS); // reconnect delay, just in case
                WiFi.begin(WifiSsid, WifiPassword);
                state = WifiActivityState::WifiConnecting;
                Serial.println("Connecting...");
            break;

            case WifiActivityState::WifiConnecting:
                switch (wifiStatus)
                {
                    case WL_CONNECTED:
                        state = WifiActivityState::WifiConnected;
                        Serial.println("Wifi Connected");
                        break;
                    case WL_CONNECT_FAILED:
                        state = WifiActivityState::Error;
                        break;
                    default:
                        vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                        break;
                }
                break;

            case WifiActivityState::WifiConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println(WiFi.localIP());
                    state = WifiActivityState::MqttReconnect;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttReconnect:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("Mqtt server connecting");
                    g_pubsubClient.setServer(MqttServer, 1883);
                    g_pubsubClient.connect(MqttClientName);
                    state = WifiActivityState::MqttConnecting;
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnecting:
                if (wifiStatus == WL_CONNECTED) {
                    if (g_pubsubClient.connected()) {
                        Serial.println("Mqtt server connected");
                        state = WifiActivityState::MqttConnected;
                    }
                    vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::MqttConnected:
                if (wifiStatus == WL_CONNECTED) {
                    Serial.println("...Activity...");
                    if (g_pubsubClient.connected()) {
                        prepareSendBuffer();
                        publishEvents();
                        vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
                    } else {
                        Serial.println("Client Disconnected");
                        state = WifiActivityState::MqttReconnect;
                        vTaskDelay(5000/portTICK_RATE_MS);
                    }
                } else {
                    state = WifiActivityState::WifiReconnect;
                }
                break;

            case WifiActivityState::Error:
                Serial.println("Connection error");
                shouldStop = true; // end task
                // TODO add code for connection retry with increasing time intervals
                break;
            default:
                break;
        }
    }
    vTaskDelete(NULL);
}

state , . Wifi- . , .


, MQTT- prepareSendBuffer() publishEvents(). .



, Bluetooth . Event, ( , , ). vector Event.


std::vector<Event*> g_eventsBuffer;

Bluetooth :


g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));

MQTT- . , , . FreeRTOS тАФ .


. xSemaphoreCreateMutex(), 2 xSemaphoreTake() xSemaphoreGive().


, .


, ( ), , . . , prepareSendBuffer():


void prepareSendBuffer() {
    Serial.println("Send buffer prepare started");
    xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
    if (!g_eventsBuffer.empty()) {
        Serial.println("Found events");
        for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Trying to add event for address %s\n", address.c_str());

            // we should check if we already added that event type for that deviceAddress
            bool found = false;
            if (!g_eventsSendBuffer.empty()) {

                for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
                    if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
                        found = true;
                        break;
                    }
                }
            }
            if (!found) {
                g_eventsSendBuffer.push_back(e);
                Serial.println("Event added");
            } else {
                delete e; // we don't need this event anymore
            }
        }
    }
    g_eventsBuffer.clear();
    xSemaphoreGive(g_eventsBufferMutex);
    Serial.println("Send buffer prepared");
}

, ( ). . .


publishEvents(), :


void publishEvents() {
    Serial.println("Publish events started");
    const int bufferSize = 1000;
    char* topicStringBuffer = new char[bufferSize];
    char* payloadStringBuffer = new char[bufferSize];

    if (!g_eventsSendBuffer.empty()) {
        for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) { 
            Event* e = *i;
            std::string address = e->getDeviceAddress();
            Serial.printf("Publishing event for %s\n", address.c_str());
            switch (e->getEventType())
            {
                case EventType::Temperature:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
                    break;
                case EventType::Humidity:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
                    break;
                case EventType::Battery:
                    snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
                    break;
                case EventType::VisibleDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/devices");
                    break;
                case EventType::SensorDevices:
                    snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
                    break;
                default:
                    continue;
                    break;
            }
            snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
            Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);

            delete e;
            g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
        }
    }
    Serial.println("Publish events DONE");
    g_eventsSendBuffer.clear();

    delete[] topicStringBuffer;
    delete[] payloadStringBuffer;
}

, MQTT . , .



, Xiaomi MQTT-. - :



MQTT- (, - MQTT). , .


Github.


UPD: рдЧрд┐рддреБрдм рдкрд░, рдЙрдиреНрд╣реЛрдВрдиреЗ рдореБрдЭреЗ рдереЛрдбрд╝рд╛ рдареАрдХ рдХрд┐рдпрд╛ (рдзрдиреНрдпрд╡рд╛рдж рдорд┐рдХрд╛рд▓рд╛рдИ )ред FreeRTOS рдХрд╛рд░реНрдпреЛрдВ рдореЗрдВ, рдЖрдкрдХреЛ рдХреЗрд╡рд▓ рдлрд╝рдВрдХреНрд╢рди рд╕реЗ рдмрд╛рд╣рд░ рдирд┐рдХрд▓рдиреЗ рдХрд╛ рдкреНрд░рдпрд╛рд╕ рдирд╣реАрдВ рдХрд░рдирд╛ рдЪрд╛рд╣рд┐рдП рдХреНрдпреЛрдВрдХрд┐ рдЖрдкрдХреЛ рдЗрд╕реЗ рдХреЙрд▓ рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ vTaskDelete()ред рдпрд╣рд╛рдБ рдЗрд╕рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЕрдзрд┐рдХ рд╣реИред Github рдкрд░ рдХреЛрдб рдФрд░ рдпрд╣рд╛рдБ рдореИрдВрдиреЗ рд╕рд╣реА рдХрд┐рдпрд╛ред


All Articles