Spring引入Mqtt消息组件

基础概念

云消息队列 MQTT 版

  • *QoS*

    • QoS(Quality of Service)指消息传输的服务质量。分别可在消息发送端和消息消费端设置。

      • 发送端的QoS设置:影响发送端发送消息到云消息队列 MQTT 版的传输质量。
      • 消费端的QoS设置:影响云消息队列 MQTT 版服务端投递消息到消费端的传输质量。

      QoS包括以下级别:

      • QoS0:代表最多分发一次。
      • QoS1:代表至少达到一次。
      • QoS2:代表仅分发一次。
  • *cleanSession*

    • cleanSession标志是MQTT协议中对一个消费者客户端建立TCP连接后是否关心之前状态的定义,与消息发送端的设置无关。具体语义如下:
      • cleanSession=true:消费者客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息。
      • cleanSession=false:消费者客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效。

QoS和cleanSession搭配使用时需注意以下几点:

  • MQTT要求每个客户端每次连接时的cleanSession标志必须固定,不允许动态变化,否则会导致离线消息的判断有误。
  • MQTT目前对外QoS2消息不支持非cleanSession,如果客户端以QoS2方式订阅消息,即使设置cleanSession=false也不会生效。
  • P2P消息的cleanSession判断以接收方客户端的配置为准。

消费端QoS和cleanSession的不同组合产生的结果如QoS和cleanSession的组合关系所示。

QoS级别 cleanSession=true cleanSession=false
QoS0 无离线消息,在线消息只尝试推一次。 无离线消息,在线消息只尝试推一次。
QoS1 无离线消息,在线消息保证可达。 有离线消息,所有消息保证可达。
QoS2 无离线消息,在线消息保证可达且只接收一次。 暂不支持。

spring接入mqtt

  1. 添加依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
  2. 添加mq配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Configuration
    @ConfigurationProperties(prefix = "mqtt")
    @Data
    @FieldDefaults(level = AccessLevel.PRIVATE)
    public class MqttConfigProperties {
    String host;
    String user;
    String password;
    String clientId;
    String topic;
    String sendTopic1;
    String sendTopic2;
    Integer qos;
    Integer timeout;
    Integer keepalive;
    }

    对应的配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    mqtt:
    host: tcp://172.16.1.1:1883
    user: mqtt_user_test
    password: mqtt_user_test
    qos: 0
    clientId: sys_manager
    topic: sys/position
    timeout: 10
    keepalive: 60
  3. 注入配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    @Configuration
    @Slf4j
    public class MqttConfiguration {
    @Resource
    private MqttConfigProperties mqttConfigProperties;
    @Bean
    public MessageChannel mqttInputChannel() {
    return new DirectChannel();
    }
    @Bean
    public MessageChannel mqttOutputChannel() {
    return new DirectChannel();
    }
    /***
    * mqtt连接配置
    */
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
    DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setUserName(mqttConfigProperties.getUser());
    mqttConnectOptions.setPassword(mqttConfigProperties.getPassword().toCharArray());
    mqttConnectOptions.setConnectionTimeout(mqttConfigProperties.getTimeout());
    mqttConnectOptions.setKeepAliveInterval(mqttConfigProperties.getKeepalive());
    mqttConnectOptions.setServerURIs(new String[]{mqttConfigProperties.getHost()});
    mqttConnectOptions.setCleanSession(false);
    defaultMqttPahoClientFactory.setConnectionOptions(mqttConnectOptions);
    return defaultMqttPahoClientFactory;
    }

    /**
    * 出站配置
    */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler messageOutHandler() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "outChannel", mqttPahoClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultQos(0);
    messageHandler.setDefaultTopic(mqttConfigProperties.getTopic());
    DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
    defaultPahoMessageConverter.setPayloadAsBytes(true);
    messageHandler.setConverter(defaultPahoMessageConverter);
    return messageHandler;
    }

    /**
    * 入站配置
    */
    @Bean
    public MessageProducer inbound() {
    String uuid = UUID.randomUUID().toString();
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
    mqttConfigProperties.getClientId() + uuid, mqttPahoClientFactory(), mqttConfigProperties.getTopic());
    adapter.setCompletionTimeout(5000);
    adapter.setQos(0);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
    }

    /**
    * 处理入站消息
    */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
    return message -> {
    log.info("message payload {}", String.valueOf(message.getPayload()));
    };
    }

    }
  4. 发送消息

    1
    2
    3
    4
    @MessagingGateway(defaultRequestChannel ="mqttOutputChannel")
    public interface MqttGateway {
    void sentToMqtt(String data);
    }

spring 调用emqx的api

点击左侧系统设置菜单下的 API 密钥,可以来到 API 密钥页面。如果需要 API 密钥来创建一些脚本调用 HTTP API,可以在此页面进行创建获取操作。点击页面右上角创建按钮打开创建 API 密钥弹框,填写 API 密钥相关数据,如果到期时间未填写 API 密钥将永不过期,点击确定提交数据,提交成功后页面上将提供此次创建的 API 密钥的 API Key 和 Secret Key,其中 Secret Key 后续将不再显示,用户需立即将 API Key 和 Secret Key 保存至安全的地方;保存数据完毕可点击关闭按钮关闭弹框。

在 API 密钥页面上,您可以按照以下步骤生成用于访问 HTTP API 的 API 密钥和 Secret key。

  1. 单击页面右上角的**+ 创建**按钮,弹出创建 API 密钥的对话框。

  2. 在创建 API 密钥对话框上,配置 API 密钥的详细信息。

    如果到期时间文本框留空,API 密钥将永不过期。

  3. 单击确认按钮,API 密钥和密钥将被创建并显示在创建成功对话框中。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import cn.com.hcytech.config.MqttConfigProperties;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Resource;
import java.util.Base64;

@Component
@Data
@Slf4j
public class EMqxApiClient {

@Resource
private MqttConfigProperties mqttConfigProperties;

public JSONObject makeApiRequestGet(String apiUrl) {
RestTemplate restTemplate = new RestTemplate();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
httpHeaders.setBasicAuth(generateAuthorizationHeader(mqttConfigProperties.getApiKey(), mqttConfigProperties.getApiSecret()));
HttpEntity<String> request = new HttpEntity<>(httpHeaders);
ResponseEntity<String> resp = restTemplate.exchange(mqttConfigProperties.getApiService() + apiUrl, HttpMethod.GET, request, String.class);
if (resp.getStatusCode() == HttpStatus.OK) {
String responseData = resp.getBody();
log.debug("EMqx Response: " + responseData);
return JSONObject.parseObject(responseData);
} else {
log.error("API request failed with status code: " + resp.getStatusCode());
}
return new JSONObject();
}

private String generateAuthorizationHeader(String apiKey, String secretKey) {
String credentials = apiKey + ":" + secretKey;
return Base64.getEncoder().encodeToString(credentials.getBytes());
}

public Boolean getClientStatus(Long tenantId, String deviceSN) {
boolean isConnect = false;
try {
String url = "/clients/" + tenantId + "_" + deviceSN;
JSONObject result = makeApiRequestGet(url);
isConnect = result.getBoolean("connected");
} catch (Exception e) {
log.error("查询设备MQTT连接状态:{}",e.getMessage());
}
return isConnect;
}
}