quarkus+mqtt 静态和动态发布订阅mqtt消息 weir 2023-03-20 12:05:07.0 quarkus,mqtt 248 如果你用过spring上面的mqtt,不知道你是否用的是Spring Integration。 我使用的经验是mqtt可以动态发布,删除,订阅,基本都可以用web界面来完成新的mqtt设备的接入和接受消息的全过程。 首先是mqtt的测试,需要服务器和客户端 https://mqtthq.com/client 这是一个国外的可以用来做broker和客户端测试的。  另外一个就是emqx,这个应该用的比较多,我也基本都是用它来开发和测试的。  介绍完工具之后,工具怎么用大家去学习研究,这个也是我的一贯风格。 先放出来程序代码:https://gitee.com/weir_admin/weirblog-quarkus/tree/master/quarkus-mqtt-demo 然后对照我在B站的视频:https://www.bilibili.com/video/BV1fM411x7ht/?spm_id_from=333.999.0.0&vd_source=d758aa5920656132ff1e144934ff449f 有三个 大家去找 还有quarkus实现mqtt的技术文档:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.4/mqtt/mqtt.html#mqtt-installation 关键的注解:@Incoming @Outgoing 我想再给大家解释什么意思就太不专业了。 @Incoming 就是接收消息注解 @Outgoing 就是发布消息注解 配置解释下: mp.messaging.incoming.topic01.connector=smallrye-mqtt #固定 mp.messaging.incoming.topic01.host=localhost #mqtt broker IP mp.messaging.incoming.topic01.port=1883 #mqtt broker 端口 mp.messaging.incoming.topic01.username=admin #mqtt broker 链接用户名 mp.messaging.incoming.topic01.password=public #mqtt broker 链接密码 mp.messaging.incoming.topic01.topic=topic01 # 这个他们视频上么有 可以不写 mp.messaging.incoming.topic01.auto-generated-client-id=true #自动生成 mp.messaging.outgoing.topic01-send.connector=smallrye-mqtt mp.messaging.outgoing.topic01-send.host=localhost mp.messaging.outgoing.topic01-send.port=1883 mp.messaging.outgoing.topic01-send.username=admin mp.messaging.outgoing.topic01-send.password=public mp.messaging.outgoing.topic01-send.topic=topic01 mp.messaging.outgoing.topic01-send.auto-generated-client-id=true 测试代码: ```java @Incoming("topic01") public void consume(byte[] raw) { System.out.println("-------------------------" + new String(raw)); } private Random random = new Random(); // 方式1 // @Outgoing("topic01-send") public Multi generate() { return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) .onOverflow().drop() .map(tick -> { int price = random.nextInt(100); // System.out.println("Sending price: " + price); return price; }); } ``` 这个是使用随机数来测试的,当然实际的mqtt消息可能是json字符串,二进制都有可能。 静态的mqtt消息基本就这样,这显然不符合实际的应用场景。 实际上我们可能使用分组,类型等形式把多个同类的mqtt设备放在一起, 更方便的是能够动态的添加或移除某个设备零件的mqtt,这样是最好的方式。 quarkus 实现mqtt动态有些没想到,我刚看到他们视频的时候就这感觉:还能这样做? 看看配置: mp.messaging.incoming.topic-dynamic.connector=smallrye-mqtt mp.messaging.incoming.topic-dynamic.host=localhost mp.messaging.incoming.topic-dynamic.port=1883 mp.messaging.incoming.topic-dynamic.username=admin mp.messaging.incoming.topic-dynamic.password=public mp.messaging.incoming.topic-dynamic.topic=topic/dynamic/# mp.messaging.incoming.topic-dynamic.auto-generated-client-id=true mp.messaging.outgoing.send-dynamic.connector=smallrye-mqtt mp.messaging.outgoing.send-dynamic.host=localhost mp.messaging.outgoing.send-dynamic.port=1883 mp.messaging.outgoing.send-dynamic.username=admin mp.messaging.outgoing.send-dynamic.password=public 大家能看出来不同吗? 关键就是 topic/dynamic/# 这个# 号,用它来实现一组下的设备动态。 代码: ```java @ApplicationScoped public class SendTopicDynamicBean { @Inject @Channel("send-dynamic") Emitter emitter; public void send(MessageData data) { String topic = "topic/dynamic/" + data.getClientId(); MqttMessage msg = MqttMessage.of(topic, data); emitter.send(msg); } } ``` ```java @ApplicationScoped public class DynamicResource { @Incoming("topic-dynamic") public CompletionStage consume(MqttMessage message) { String topic = message.getTopic(); byte[] payload = message.getPayload(); System.out.println("------------topic---"+topic+"------------" + new String(payload)); return message.ack(); } } ``` 你能看懂么? 有什么想法可以加我微信交流。