基本概念:
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计。以下是MQTT实现收发消息的基本原理:
- 客户端-服务器模型:
MQTT基于客户端-服务器模型工作。客户端(可以是发布者或订阅者)发起连接请求到服务器(也称为代理Broker),并可以订阅主题、发布消息到主题或从订阅的主题接收消息。 - 发布/订阅模式:
MQTT使用发布/订阅模式进行消息传递。发布者(Publisher)将消息发布到特定的主题(Topic),而订阅者(Subscriber)通过订阅这些主题来接收消息。这种解耦的通信方式使得数据分发变得非常灵活高效。 - 消息结构:
MQTT传输的消息分为两部分:主题(Topic)和负载(Payload)。主题可以理解为消息的类型,而负载是消息的具体内容。订阅者订阅某个主题后,就会收到该主题下的消息内容。 - 服务质量(QoS):
MQTT协议支持三种不同的QoS级别,分别是至多一次(QoS 0)、至少一次(QoS 1)和恰好一次(QoS 2)。这些级别的选择能够保证消息传递的可靠性和效率。例如,QoS 1确保消息至少被传送一次,但可能会导致消息重复;而QoS 2则确保消息只被传送一次,避免了消息的重复或丢失。 - 保留消息:
发布者发布的消息可以被设置为保留消息。当订阅者订阅该主题时,它会接收到最新的保留消息。这确保了订阅者不会错过任何重要的信息,即使在它离线期间也有消息发布。 - 遗嘱消息:
发布者可以设置一个遗嘱消息(Last Will and Testament,LWT)。当发布者意外断开连接时,MQTT代理会自动发布这个遗嘱消息到指定的主题上。这样,即使发布者离线,订阅者也能接收到相关通知。 - 会话保持:
MQTT客户端和代理之间的连接具有会话保持机制。这确保了在网络不稳定或客户端重启的情况下,连接状态和订阅信息能够得到保持,从而保证了消息传递的稳定性和连续性。
综上所述,MQTT通过客户端-服务器模型、发布/订阅模式、主题和负载结构、服务质量选择、保留消息、遗嘱消息以及会话保持等机制实现了高效可靠的消息收发功能。这些特性使得MQTT在物联网(IoT)领域得到了广泛应用,如智能家居、工业自动化等场景。
组件封装(mqttComp.vue)
@vue/cli-plugin-babel 版本<5 —> npm i mqtt@4.3.7 --save
@vue/cli-plugin-babel 版本>=5 —> npm i mqtt@5.6.2 --save
注意: 在mqtt4.x版本的时候,请求路径后面的"/mqtt"是直接在 brokerUrl中配置,即本文的(tailPath)
但是在mqtt5+版本的时候,请求路径后面的"/mqtt"是在opts中配置的,即本文的(mqttOpts.path)
请根据你的版本选择对应的拼接方式
<!--
* @Description: mqtt连接和消息发送组件 页面
* @Author: mhf
* @Date: 2024-05-25 00:49:23
* @params:
-->
<template>
<div></div>
</template>
<script>
import mqtt from "mqtt";
export default {
name: "mqttComp",
props: {
topic: String, // 订阅主题
mqttUrl: {
type: Object,
default: () => {
return {
head: "ws", // 必须是 ws 或 wss (mqtt:// 或 mqtts:// 必须让后端开放websocket服务)
host: "xx.xxx.xx.xxx", // ip地址
port: 8083, // 服务端口
tailPath: "mqtt", // 服务路径
};
},
}, // 服务地址
mqttOpts: {
type: Object,
default: () => {
return {
keepalive: 60,
clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
username: "username",
password: "password",
connectTimeout: 10 * 1000,
path: "/mqtt"
};
},
}, // 连接配置
},
data() {
return {
client: "",
clientCount: 0,
receivedMessage: null, // 用于存储接收到的消息
};
},
watch: {
topic(newTopic) {
if (newTopic && this.client) {
this.client.unsubscribe(this.topic);
this.client.subscribe(newTopic);
}
},
},
methods: {
async initMqtt() {
let opts = JSON.parse(JSON.stringify(this.mqttOpts));
this.client = mqtt.connect(
`${this.mqttUrl.head}://${this.mqttUrl.host}:${this.mqttUrl.port}/${this.mqttUrl.tailPath}`,
opts
);
this.client.on("connect", this.handleConnect);
this.client.on("message", this.handleMessage);
this.client.on("reconnect", this.handleReconnect);
this.client.on("error", this.handleError);
},
handleConnect() {
console.log("mqtt_连接成功");
this.client.subscribe(this.topic);
},
handleMessage(topic, message) {
this.receivedMessage = JSON.parse(message?.toString() || {});
this.$emit("message-received", this.receivedMessage);
},
handleReconnect(error) {
console.log(`正在第${this.clientCount}重连`, error);
this.clientCount++;
if (this.clientCount >= 10) {
this.client.end();
}
},
handleError(error) {
console.log("连接失败", error);
},
/**
* MQTT实现发送消息
* @param: topic: 主题
* @param: message: 消息体
* @author: mhf
* @time: 2024-05-24 14:26:51
**/
mqttPublish(topic, message) {
this.client.publish(topic, JSON.stringify(message));
},
},
async mounted() {
await this.initMqtt();
},
beforeDestroy() {
this.$emit("mqtt-close") // 关闭mqtt链接需要的前置操作
// 使用延迟确保消息发送完成后再关闭连接
setTimeout(() => {
this.client.end(true, {}, () => {
console.log("MQTT连接已成功关闭");
});
}, 100); // 延迟时间根据实际情况调整,确保消息发送完成
// this.client.end();
},
};
</script>
<style lang="scss" scoped></style>
组件使用
<template>
<div>
<mqttComp ref="mqttComp" :mqttUrl="mqttUrl" :mqttOpts="mqttOpts" :topic="parentTopic"
@message-received="getMqttMessage"
@mqtt-close="mqttClose"
/>
<el-button @click="sendMqttMessage">发送MQTT消息</el-button>
</div>
</template>
<script>
import mqttComp from "./mqttComp.vue";
export default {
components: {
mqttComp,
},
data() {
return {
parentTopic: "aaa",
mqttUrl: {}, // todo 填写你的配置信息
mqttOpts: {}, // todo 填写你的配置信息
};
},
methods: {
getMqttMessage(receivedMessage) {
console.log("mqtt接收到的消息是:", receivedMessage);
},
sendMqttMessage() {
this.$refs.mqttComp.mqttPublish("parentTopic", "来自父组件的消息");
},
mqttClose() {
this.$refs.mqttComp.mqttPublish("parentTopic", "关闭mqtt前发送的消息");
}
},
};
</script>