Paho

81

项目用到了PAHO,今天来记录学习一下

PAHO-CLIENT

引入需要的约束

<dependencies>
    <!-- Paho -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.2</version>
    </dependency>
</dependencies>

MQTT的配置类

@Configuration
public class MqttConfig {
    // 成员变量省略
    @Bean
    public MqttClient mqttClient() throws MqttException {
        // 字符串拼接工具,协议,域名,端口,路径
        String broker = TextKit.format(
                "{0}://{1}:{2}{3}",
                this.protocol,
                this.host,
                this.port,
                this.path
        );

        String clientId = "APP:" + this.username;
        MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
        // mqtt连接的配置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(this.username);
        // 密码需要md5加密后转换为字符数组
        options.setPassword(DigestKit.md5(this.password).toCharArray());
        // 一系列可选的参数
        options.setMqttVersion(this.version);
        options.setKeepAliveInterval(this.keepAliveInterval);
        options.setCleanSession(this.cleanSession);
        options.setConnectionTimeout(this.connectionTimeout);
        // 断线重连
        options.setAutomaticReconnect(true);
        client.connect(options);
        // mqtt的回调
        client.setCallback(new MqttCallbackImpl());
        return client;
    }
}

实现MqttCallback接口

定义一系列事件也可以选择MqttCallbackExtended

public class MqttCallbackImpl implements MqttCallback {
    @Override
    // 连接失败后调用
    public void connectionLost(Throwable cause) {
        cause.printStackTrace();
    }

    @Override
    // 来自服务器的消息到达时调用
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(topic);
        byte[] bytes = message.getPayload();
        System.out.println(HexKit.hex(bytes));
    }

    @Override
    // 收到消息令牌时调用
    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            int mid = token.getMessageId();
            MqttWireMessage resp = token.getResponse();
            System.out.println(mid);
            System.out.println(resp);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

调用

@Component
public class MQTTDeviceTask {
    @Autowired
    private MqttClient client;
    @Autowired
    private DeviceService deviceService;
    @Value("${mqtt.subscribeUrl}")
    private String subscribe;
    @Value("${mqtt.sendUrl}")
    private String send;

    @Bean
    // 注册,按照业务需求从service拿到所有设备IMEI并注册
    // 监听器会持续接受消息,topic为设备号,bytes收到的消息,可以按照业务逻辑将他们存入数据库
    public void subscribe() throws MqttException {
        List<Device> allDevice = this.deviceService.getAllDevice();
        for (Device d : allDevice) {
            client.subscribe("$" + this.subscribe + "/" + d.getNbid(), 0, (topic, message) -> {
                System.out.println("subscribe ========");
                System.out.println(topic);
                byte[] bytes = message.getPayload();
                System.out.println(this.bestHex(bytes));
            });
        }
    }
    // 发送调用这个方法
    public void send(String nbid, byte[] body) throws MqttException {
        client.publish("$" + this.send + "/" + nbid, body, 1, false);
    }

    private String bestHex(byte[] bytes) {
        String text = HexKit.hex(bytes);
        int len = text.length();
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < len; i++) {
            builder.append(text.charAt(i));
            if (i > 0 && (i + 1) % 2 == 0) {
                builder.append(' ');
            }
        }
        return builder.toString();
    }
}