Paho
项目用到了PAHO,今天来记录学习一下
PAHO-CLIENT
引入需要的约束
<dependencies>
<!-- Paho -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
MQTT的配置类
@Configuration
public class MqttConfig {
// 成员变量省略
@Bean
public MqttClient mqttClient() throws MqttException {
// 字符串拼接工具,协议,域名,端口,路径
String broker = TextKit.format(
"{0}://{1}:{2}{3}",
this.protocol,
this.host,
this.port,
this.path
);
String clientId = "APP:" + this.username;
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
// mqtt连接的配置
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(this.username);
// 密码需要md5加密后转换为字符数组
options.setPassword(DigestKit.md5(this.password).toCharArray());
// 一系列可选的参数
options.setMqttVersion(this.version);
options.setKeepAliveInterval(this.keepAliveInterval);
options.setCleanSession(this.cleanSession);
options.setConnectionTimeout(this.connectionTimeout);
// 断线重连
options.setAutomaticReconnect(true);
client.connect(options);
// mqtt的回调
client.setCallback(new MqttCallbackImpl());
return client;
}
}
实现MqttCallback接口
定义一系列事件也可以选择MqttCallbackExtended
public class MqttCallbackImpl implements MqttCallback {
@Override
// 连接失败后调用
public void connectionLost(Throwable cause) {
cause.printStackTrace();
}
@Override
// 来自服务器的消息到达时调用
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(topic);
byte[] bytes = message.getPayload();
System.out.println(HexKit.hex(bytes));
}
@Override
// 收到消息令牌时调用
public void deliveryComplete(IMqttDeliveryToken token) {
try {
int mid = token.getMessageId();
MqttWireMessage resp = token.getResponse();
System.out.println(mid);
System.out.println(resp);
} catch (Exception e) {
e.printStackTrace();
}
}
}
调用
@Component
public class MQTTDeviceTask {
@Autowired
private MqttClient client;
@Autowired
private DeviceService deviceService;
@Value("${mqtt.subscribeUrl}")
private String subscribe;
@Value("${mqtt.sendUrl}")
private String send;
@Bean
// 注册,按照业务需求从service拿到所有设备IMEI并注册
// 监听器会持续接受消息,topic为设备号,bytes收到的消息,可以按照业务逻辑将他们存入数据库
public void subscribe() throws MqttException {
List<Device> allDevice = this.deviceService.getAllDevice();
for (Device d : allDevice) {
client.subscribe("$" + this.subscribe + "/" + d.getNbid(), 0, (topic, message) -> {
System.out.println("subscribe ========");
System.out.println(topic);
byte[] bytes = message.getPayload();
System.out.println(this.bestHex(bytes));
});
}
}
// 发送调用这个方法
public void send(String nbid, byte[] body) throws MqttException {
client.publish("$" + this.send + "/" + nbid, body, 1, false);
}
private String bestHex(byte[] bytes) {
String text = HexKit.hex(bytes);
int len = text.length();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < len; i++) {
builder.append(text.charAt(i));
if (i > 0 && (i + 1) % 2 == 0) {
builder.append(' ');
}
}
return builder.toString();
}
}