1 MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。
 MQTT官网: http://mqtt.org
2 mqtt工作原理

 mqtt分为3种角色:
 1 发布者(Publish)、
 2 代理(Broker)(服务器)、
 3 订阅者(Subscribe)
 就像以前我们订阅天气预报的短信,移动营业厅就是服务器,
- 当有新的天气预报时,气象局就会通知移动营业厅;
 - 客户到移动营业厅办理天气预报的短信业务,当有新的天气预报时,就会收到移动营业厅推动的信息。
 
3 springboot集成mqtt
这里mqtt Broker用的mosquitto, mqtt客户端测试工具mqttBox。
第1步:POM文件引入:
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>
 
查看spring-integration-mqt的POM文件。
 
 从下面可以看出,spring-integration-mqtt已经引入了org.eclipse.paho.client.mqttv3,所以我们没有必要再引入org.eclipse.paho.client.mqttv3了。
 重要的事情说三遍:
 *** 没有必要再引入org.eclipse.paho.client.mqttv3
 *** 没有必要再引入org.eclipse.paho.client.mqttv3
 *** 没有必要再引入org.eclipse.paho.client.mqttv3
第2步:配置application.properties:
#自定义应用开关
mqtt.enabled=true
# 连接地址:
mqtt.serverURIs=tcp://127.0.0.1:1883
# 用户名
mqtt.username=admin
# 密码
mqtt.password=123456
# 心跳
mqtt.connectTimeout=20
# 心跳
mqtt.keep-alive-interval=20
#  MQTT 生产者
# 连接服务器默认客户端ID
mqtt.outChannel.clientId=mqttPublish
# 默认的推送主题,实际可在调用接口时指定
mqtt.outChannel.defaultTopic=air
#  MQTT 消费者
# 连接服务器默认客户端ID
mqtt.inChannel.clientId=mqttSubscribe
# 默认的接收主题,可以订阅多个Topic,逗号分隔
mqtt.inChannel.defaultTopic=weather
 
第3步:mqtt的注解生成。主要包括以下4个类:
 1) MqttBaseConfig
 配置MQTT客户端工厂类DefaultMqttPahoClientFactory
 2) MqttInConfig
 配置Outbound入站,包括:消息通道MessageChannel、消息适配器
 MqttPahoMessageDrivenChannelAdapter和消息处理器MessageHandler
 3) MqttOutConfig
 配置Outbound出站,包括:出站通道 适配器
 4) MqttPublisher
 mqtt消息发布器。
MqttBaseConfig的代码如下:
@Configuration
public class MqttBaseConfig {
    @Value("${mqtt.serverURIs}")
    private String[] serverURIs;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private  char[] password;
    @Value("${mqtt.connectTimeout}")
    private int connectTimeout;
    @Value("${mqtt.keep-alive-interval}")
    private int keepAliveInterval;
    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        // connection参数
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(serverURIs);
        connectOptions.setUserName(username);
        connectOptions.setPassword(password);
        connectOptions.setCleanSession(true);
        connectOptions.setConnectionTimeout(connectTimeout);
        connectOptions.setKeepAliveInterval(keepAliveInterval);
        defaultMqttPahoClientFactory.setConnectionOptions(connectOptions);
        return defaultMqttPahoClientFactory;
    }
}
 
MqttInConfig的代码如下:
@Configuration
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
@Slf4j
public class MqttInConfig {
    @Value("${mqtt.inChannel.clientId}")
    private String inChannelClientId;
    @Value("${mqtt.inChannel.defaultTopic}")
    private String topic;
    /**
     * mqtt消息入站通道
     *
     * @return
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageProducer mqttSubscribe(MqttPahoClientFactory factory) {
        String clientId = inChannelClientId + System.currentTimeMillis();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, topic);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setOutputChannel(mqttInputChannel());
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler mqttMessageHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
                String payload = String.valueOf(message.getPayload());
                log.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
            }
        };
    }
}
 
MqttOutConfig的代码如下:
@Configuration
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
public class MqttOutConfig {
    @Value("${mqtt.outChannel.clientId}")
    private String outChannelClientId;
    /**
     * mqtt出站消息,用于发送出站消息
     * @return
     */
    @Bean
    public MessageChannel mqttPublishChannel(){
        return new DirectChannel();
    }
    /**
     * mqtt消息出站通道的设置
     * @param mqttMessageConverter
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
        String clientId = outChannelClientId + System.currentTimeMillis();;
        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(clientId, factory);
        mqttPahoMessageHandler.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageHandler.setCompletionTimeout(50);
        mqttPahoMessageHandler.setAsync(true);
        return mqttPahoMessageHandler;
    }
}
 
MqttPublisher的代码如下:
@Component
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttPublisher {
    /**
     * 向默认的topic发送mqtt消息
     * @param payload
     */
    void sendMessage(String payload);
    /**
     * 向指定的topic发送mqtt消息
     * @param topic
     * @param payload
     */
    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
    /**
     * 向指定的topic发送mqtt消息,并指定服务质量参数
     * @param topic
     * @param qos
     * @param payload
     */
    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
 
4 测试验证
4.1 订阅验证
MqttInConfig类会定义MessageHandler的类,实现handleMessage接口,会收到消息:
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler mqttMessageHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
                String payload = String.valueOf(message.getPayload());
                log.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
            }
        };
    }
 
MqttBox发布一个topic:weather;内容为{‘temprature’:‘7℃’}
 
 可以看到springboot应用程序控制台 输出一条订阅的消息。
 
4.2 发布验证
我们定义一个http api来触发发布一个触pringboot发布topic:air。
    @RequestMapping(value = "/publish", method = RequestMethod.GET)
    public void publish(@RequestParam(value = "topic") String topic,
                      @RequestParam(value = "message") String message) {
        mqttService.sendMqttMessage(topic, message);
    }
 
Postman发送http get请求来
 http://127.0.0.1:8080/mqtt/publish?topic=air&message=PM=2.5
 
MqttBox订阅一个topic:air,可以看到收到订阅的消息。
 
