06月03, 2020

生产实践-MQTT通讯连接异常排查记录

MQTT协议以其轻量级端对端消息传输的特点多用于IoT设备的通信协议,本文记录了在生产环境使用中无规律高频率断连问题排查经过。

Bug起因

因税件服务需在运行环境下动态与不同税控设备交互,为实现端对端双向通信的需求,故选择了IoT通讯协议MQTT。

在实际生产环境下,每个客户税控设备绑定一个唯一的ClientId并注册至RocketMQ Broker后通过RocketMQ向税件服务特定Topic报告设备上线状态与ClientId完成设备税号于MQTT客户端的绑定关系。

alt

So far so good.

直到观察到一天内的MQTT连接情况出现40+次断开连接: alt

客户端日志分析得出断连规律:

  1. 日常空闲时间运行时连接正常
  2. 越是用户批量操作情况下断连频次越高

错误分析

  • 从控制台连接情况看来:

  • 断连与重连的时间间隔较为规律为100s以上

  • 断连的唯一原因是"no heart.",即心跳停止导致的Broker强制断开连接

  • 起初我们根据提示直觉上认为是心跳包过于频繁导致Broker对心跳校验过于敏感,于是尝试调整客户端的心跳间隔即MQTT的KeepAliveIntervel。后发现在调整后依然出现频繁断连且连接时间呈规律性

  • 查询RocketMQ社区与相关资源鲜有相似问题,仅有的一例作者通过对Broker进行代理假装心跳包返回去欺骗客户端。无论从优雅性上和可实时性(我们使用的是公有云版本的MQ,无法轻易Proxy broker)上都无法参考。 https://www.eclipse.org/forums/index.php/t/1083440/

  • 于是决定尝试研究MQTT原生协议 + 模拟抓包的方式解决问题

IoT通信协议MQTT概述

以下内容提炼自MQTT v3.11版本协议内容,与问题相关部分内容添加了注释,具体协议可参考: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180832

MQTT协议是一种基于TCP/IP协议之上的发布/订阅为模型的应用层通信协议,有着轻量级、开放、简单和可订制等优秀特性,因此适合在设备到设备(M2M)及物联网设备(IoT)间对网络环境较敏感的通信场景。

该协议继承了TCP的数据传输有序、防丢包、单连接双向通信等特点:

  • 发布/订阅模型支持一对多消息分发及设备间结偶
  • 消息支持质量等级(QoS),可根据业务场景控制消息传输的完成等级,如在对消息传递不敏感场景使用"At most once"(似UDP)让消息至多只发送一次。

    Three qualities of service for message delivery: "At most once", where messages are delivered according to the best efforts of the operating environment. Message loss can occur. This level could be used, for example, with ambient sensor data where it does not matter if an individual reading is lost as the next one will be published soon after.

    "At least once", where messages are assured to arrive but duplicates can occur.

    "Exactly once", where message are assured to arrive exactly once. This level could be used, for example, with billing systems where duplicate or lost messages could lead to incorrect charges being applied.

  • 轻量级报文体减少网络传输成本

  • 支持设备的连接状态的事件化通知

控制报文(MQTT Control Packets)

控制报文为MQTT的信息传输载体,主要由3部分组成:

  • 固定头信息 (Fixed Header): 存在于所有传输报文中
  • 可变头信息 (Variable Header)
  • 数据体 (Payload)

固定头(Fixed header)

Fixed header 由2个字节构成 alt

第一个字节: 0-3位储存控制报文类型具体数值(Flags specific to each MQTT Control Packet type) 4-7位储存控制报文类型(MQTT Control Packet Type)

第二个字节: 存储剩余报文长度,用于分割数据包(网络传输协议通用做法,类似TCP粘包拆包的解决方案)

控制报文类型(MQTT Control Packet type)

其中主要关注的四个部分(C=客户端 B=MQ Broker):

  • 连接类:

CONNECT C ---> B 连接请求 CONNACK B ---> C 连接确认

  • 消息收发类:

PUBLISH C <---> B 消息发送 PUBACK C <---> B 消息收取确认

  • 消息确认类:

PUBREC PUBREL PUBCOMP C <---> B 消息收取确认,依据消息QoS等级收发不同消息包

  • 心跳类:

PINGREQ C---> B 心跳请求 PINGRESP B ---> C 心跳确认

固定头标志位(Fixed header flags)

alt

在首个字节的0-3位为保留参数,用于对控制报文类型做为补充 目前使用的版本v3.1.1中,只有消息发送该参数有值,如QoS标识其信息传输质量。

巧妙的设计实现单位字节分位储存以节省数据空间

可变头信息(Variable header)

在固定头与数据体之间会存在一个随控制报文类型不同而动态变化的可变头信息。 如可变头:2字节的报文标识符(Packet Identifier)

alt

在官方协议里将标识符称为“高位报文标识符”(Packet Identifier MSB)与“低位报文标识符“(Packet Identifier LSB)

这里的MSB与LSB即Most Significant Bit 和 Least Significant Bit ,在二进制标识最高位与最低位数据。 参考: https://en-support.renesas.com/knowledgeBase/16978449

alt

Packet Identifier可当作是通讯来回的ID且客户端与Broker两端独立维护,如:

C ---> B PUBLISH 0x9527

可能会收到对端返回:

B ---> C PUBLISH 0x9527 B ---> C PUBACK 0x9527

故MQTT协议规定对请求ACK的处理需要验证其PacketIdentifier。

数据体(Payload)

该部分即传输的数据主体,在某些控制报文类型下为必传字段。 如 PUBLISH 下的 Payload 即为发送的信息数据流。

连接关键控制报文——Kepp Alive

KeepAlive报文即MQTT客户端的心跳包相关数据包,协议规定客户端须在特定单位(s)时间内(参数keep-alive-intervel)向Broker周期性发送PINGREQ标识汇报客户端连接正常。

MQTT协议规定:

  • 客户端在任意时间都可以发送PINGREQ给Broker汇报心跳,并接收来自Broker的PINGRESP的心跳ACK。
  • 客户端有责任在KeepAlive Intervel内向Broker发送PINGREQ,如果在没有任何控制报文通信的情况下,客户端必须在KeepAlive Intervel内向Broker发送PINGREQ

    It is the responsibility of the Client to ensure that the interval between Control Packets being sent does not exceed the Keep Alive value. In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet.

  • 当在KeepAlive Intervel不为0的情况下(即开启心跳) Broker在1.5倍的KeepAlive Intervel里没有收到来自客户端的PINGREQ,必须强制关闭连接

    If the Keep Alive value is non-zero and the Server does not receive a Control Packet from the Client within one and a half times the Keep Alive time period, it MUST disconnect the Network Connection to the Client as if the network had failed.

  • 如果客户端在合理的时间内发送了PINGREQ但是没有收到来自Broker的PINGRESP,客户端应当关闭连接。(协议这里并没有强制客户端对心跳确认的处理,各实现灵活选择,比如我们使用的SDK Eclipse Paho在该种情况下会遵循原则关闭连接

    If a Client does not receive a PINGRESP Packet within a reasonable amount of time after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server.

  • 如果将KeepAlive Intervel设置为0,则Broker将会放弃管理与客户端的连接

    A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. This means that, in this case, the Server is not required to disconnect the Client on the grounds of inactivity. Note that a Server is permitted to disconnect a Client that it determines to be inactive or non-responsive at any time, regardless of the Keep Alive value provided by that Client.

完整报文

alt

alt

抓包分析

准备工作

  • 客户端A (模拟税控设备) 模拟税控设备接收消息并定期检查连接状况
   public static void main(String[] args) throws Exception {

        AliYunMQTTConfig config  = new AliYunMQTTConfig() ;
        // ... 省略若干无关设置代码
        config.setQosLevel(1);
        config.setClientId("tax-stream-master-fat-conn-test");

        // ... 省略若干无关连接配置代码
        // 不维护连接session
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setKeepAliveInterval(10); // 心跳10s
        mqttConnectOptions.setMqttVersion(4); // MQTT协议版本 4 -> v3.11
        // 设置事件方法 简单处理收到信息后打印信息
        mqttClient.setCallback(new DefalutMqttCallbackExtended()); 
        mqttClient.connect(mqttConnectOptions);

        // 每2s检查一次连接情况
        while (true){
            if(!mqttClient.isConnected()){
                System.out.println("mqtt connection has disconnected.");
                break;
            }
            Thread.sleep(2000);
        }
       }
  • 客户端B (模拟税件服务)

模拟税件服务每5s向客户端A发送一次消息

    // ... 省略相同配置代码

   for (int i = 0; i < 100; i++) {
            final String s = "test message for [" + i + "]";
            MqttMessage content = new MqttMessage(s.getBytes());
            content.setQos(1); // QoS1 保证对端设备至少收到一次
            mqttClient.publish("msg-bus/p2p/GID-tax-inter-fat@@@tax-stream-master-fat-conn-test-716",content);
            Thread.sleep(5000);
        }

复现Bug

启动客户端A

启动客户端A后并用抓包工具检测本地网卡MQTT协议流量,发现连接创建后,心跳包每10s收发确认正常。

alt

启动客户端B

启动客户端B并开始周期性发送消息给客户端A

  • 客户端A控制台信息

alt

  • MQTT数据包通信信息

alt

至此我们已经复现bug并且得到几个关键信息:

  • 客户端A从16:58:31接收到第一条信息到16:58:45断连共经历14s(算上毫秒值约为15s)
  • 客户端A断连底层异常为java.io.EOFException,该异常是Inputstream意外的读取到结束,在面向连接的IO通信中(如TCP)为典型的连接断开
  • 在通信过程中,客户端A做为消息接收端,数据包的流向都是从 Broker -> 客户端 (数据包信息的source destination关系)且无PINGREQ数据包发出。

尝试修复

综合上面的数据包通信信息与MQTT协议规范,可推断断线原因:

  • 在客户端B未接收消息时,因为PINGREQ的机制可保证客户端的正常连接
  • 在客户端B接收消息时,因控制包的传输导致PINGREQ包不再发送,在KeepAlive Intervel = 10s的配置下于1.5 * 10 = 15s后因心跳停止被Broker断连
  • 在生产环境下,税控设备依靠MQTT接收来自税件服务的报文命令,而向税件服务发送信息走的是RocketMQ,其通信动作本身就是单向的

所以尝试在客户端单向通信的场景下产生发送控制包报文向Broker,可在不破坏当前业务流程的前提下满足MQTT协议的心跳规范。

修改客户端B

在接收到消息后向Broker发送一条消息Ack,在长时间单向接收报文的场景下以插入发送控制报文的动作来规避KeepAlive Intervel的检查。

int i = 0;
    while (true){
        if(!mqttClient.isConnected()){
            System.out.println("mqtt connection has disconnected.");
            break;
        }

        // 插入发送Ack消息的动作
        final String s = "ack message from broker [" + i +"]";
        MqttMessage content = new MqttMessage(s.getBytes());
        content.setQos(0);
        mqttClient.publish("tax-stream-master-fat-conn-publisher-888",content);
        i+=1;

        Thread.sleep(2000);
    }

再次模拟场景,断连问题得以解决

alt

总结

  • MQTT协议能成为流行于IoT设备的通信协议得益于其设计上灵活的协议组装设计,完备的消息控制能力以及轻量级的消息交互流程。
  • 而“轻量级的消息交互流程”却也成为设计上的缺陷之一——对于心跳包在控制报文交互时的省略,MQTT协议本意上是最小化数据包通讯次数,而在面对特殊场景(如 客户端只收不发)控制包的单方流向又于心跳机制相悖,导致不得不刻意发送控制报文去解决问题。
  • 对于顽固Bug的分析我们应当下潜到技术本质上去找出根本原因再去结合业务实践去给出解决方法。

本文链接:https://check321.net/post/mqtt_disconnect.html

-- EOF --

Comments

请在后台配置评论类型和相关的值。