MQTT.js 简述
简介
MQTT
是一种轻量级的消息传输协议,常用于物联网(IoT)设备间的通信。mqtt.js
是一个 JavaScript 库,支持在 Node.js 和浏览器中使用,用于通过 MQTT 协议与 MQTT Broker 进行通信。
安装
在 Node.js 中,可以通过以下命令安装:
npm install mqtt --save
在浏览器中使用:
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
基础概念
- Broker(代理): 处理客户端消息的中介服务,管理消息的发布和分发。
- Client(客户端): 消息的发布者或订阅者。
- Topic(主题): 标识消息的逻辑地址。客户端发布消息到某个主题,其他客户端可以订阅该主题来接收消息。
- QoS(服务质量): 确定消息交付的可靠性和次数。
完整示例
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://broker.hivemq.com');
client.on('connect', function () {
console.log('已连接到 MQTT Broker');
client.subscribe('test/topic', { qos: 1 }, function (err, granted) {
if (!err) {
console.log('已成功订阅');
client.publish('test/topic', 'Hello MQTT', { qos: 1, retain: true }, function (err) {
if (!err) {
console.log('消息已发布');
}
});
}
});
});
client.on('message', function (topic, message) {
console.log(`收到消息: ${message.toString()} 来自主题: ${topic}`);
});
client.on('error', function (err) {
console.error('错误:', err);
});
client.end();
MQTT.js API
连接客户端
mqtt.connect()
方法用于连接到 MQTT Broker。此方法可以接受连接 URL 和配置对象作为参数。
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://broker.hivemq.com');
// 连接成功时的回调
client.on('connect', function () {
console.log('已连接到 MQTT Broker');
});
mqtt.connect(brokerUrl, options)
-
brokerUrl
:string
,指定 MQTT 服务器的地址。支持以下协议:mqtt://
: 无加密的 MQTT 协议。mqtts://
: 加密的 MQTT 协议。ws://
: WebSocket 协议。wss://
: 加密的 WebSocket 协议。
-
options
:object
(可选),配置对象,包含多个配置项。参数 类型 默认值 说明 clientId
string
自动生成 客户端的唯一标识符。默认为 ‘mqttjs_’ + Math.random().toString(16).substr(2, 8),可自定义修改 username
string
无 连接时的用户名。 password
string
无 连接时的密码。 keepalive
number
60 保持连接的心跳间隔,单位为秒。 clean
boolean
true
是否清除会话信息。 false
时,断线重连后可继续接收未到达的消息。reconnectPeriod
number
1000 断开后重连的时间间隔,单位为毫秒。当设置为 0 以后将取消自动重连 connectTimeout
number
30 * 1000 连接超时的时间,单位为毫秒。 will
object
无 客户端的遗嘱消息。当客户端异常断开时,broker 将会发送该消息。 protocolId
string
MQTT
MQTT 协议名称。 protocolVersion
number
4
MQTT 协议版本, 4
表示 MQTT v3.1.1。resubscribe
boolean
true
断线重连后是否自动重新订阅之前的主题。
options.will
对象配置:
topic
:string
,遗嘱消息发布的主题。payload
:string
,遗嘱消息的内容。qos
:number
,遗嘱消息的 QoS 等级。retain
:boolean
,是否保留遗嘱消息。
will: {
topic: 'WillMsg', // 发布消息的主题
payload: 'Connection Closed abnormally!', // 要发布的消息
qos: 0, // 消息等级
retain: false // 保留消息标识
},
订阅主题
subscribe()
方法用于订阅主题,接收该主题下发布的消息。
client.subscribe('test/topic', { qos: 1 }, function (err, granted) {
if (err) {
console.log('订阅失败:', err);
} else {
console.log('已成功订阅主题:', granted);
}
});
client.subscribe(topic, options, callback)
-
topic
:string
或array
,要订阅的主题。可以是单个主题的字符串,也可以是多个主题的数组。 -
options
:object
(可选),订阅配置。qos
:number
,指定服务质量(QoS),取值0
,1
, 或2
。
-
callback
:function (err, granted)
(可选),回调函数。err
:Error
对象,如果订阅失败会返回错误信息。granted
:array
,包含每个订阅的主题及其对应的 QoS 级别。
// granted 示例 [{ topic: 'test/topic', qos: 1 }]
发布消息
publish()
方法用于发布消息到指定的主题。
client.publish('test/topic', 'Hello MQTT', { qos: 1, retain: true }, function (err) {
if (!err) {
console.log('消息已成功发布');
}
});
client.publish(topic, message, options, callback)
topic
:string
,发布消息的目标主题。message
:string
或Buffer
,要发布的消息内容。options
:object
(可选),发布消息的配置选项。qos
:number
,指定服务质量,默认值为0
,可选值为0
,1
,2
。retain
:boolean
,是否保留该消息,默认为false
。如果为true
,此消息会被保留,订阅该主题的客户端在连接后将会收到该消息。
callback
:function (err)
(可选),消息发布后的回调函数。err
:Error
对象,如果消息发送失败,则会返回错误。
断开连接
end()
方法用于断开与 MQTT Broker 的连接。它可以配置是否立即断开或者等待发送完成后再断开。
client.end(false, function () {
console.log('已断开连接');
});
client.end(force, callback)
force
:boolean
(可选),是否强制断开。如果为false
,则客户端会在发送完所有未发出的消息后断开连接。callback
:function (可选)
,断开连接后的回调函数。
处理消息
当客户端收到消息时,message
事件会触发,允许处理接收到的消息。
client.on('message', function (topic, message) {
console.log(`从主题 ${topic} 接收到消息: ${message.toString()}`);
});
client.on('message', callback)
callback
:function (topic, message)
,处理消息的回调函数。topic
:string
,消息所属的主题。message
:Buffer
,消息内容,需要转换为字符串才能正确显示。
处理错误
在客户端通信过程中,如果发生错误,可以通过监听 error
事件来处理错误。
client.on('error', function (err) {
console.error('MQTT 连接出错:', err);
});
client.on('error', callback)
callback
:function (err)
,错误发生时的回调函数。err
:Error
对象,包含错误的详细信息。
高级使用
QoS(服务质量)
QoS
代表消息的传递可靠性:
- QoS 0:最多一次传递,消息可能丢失。
- QoS 1:至少一次传递,消息可能重复。
- QoS 2:仅一次传递,确保消息不会丢失或重复。
保持连接和心跳
通过 keepalive
设置,客户端可以定期向 Broker 发送心跳包,以保持连接。默认的 keepalive
时间为 60 秒。
const client = mqtt.connect('mqtt://broker.hivemq.com', {
keepalive: 120 // 每 120 秒发送一次心跳
});
遗嘱消息
遗嘱消息在客户端非正常断开连接时由 Broker 发送。可以通过 will
参数配置。
const client = mqtt.connect('mqtt://broker.hivemq.com', {
will: {
topic: 'test/lastwill',
payload: 'Client disconnected unexpectedly',
qos: 2,
retain: false
}
});
客户端重连
客户端在断开连接后会根据 reconnectPeriod
参数进行自动重连。默认的重连间隔是 1000 毫秒。
const client = mqtt.connect('mqtt://broker.hivemq.com', {
reconnectPeriod: 5000 // 5 秒后重连
});
完整方法
1. client.subscribe(topic, options, callback)
该方法用于订阅一个或多个主题。
topic
:string
或array
,要订阅的主题。可以是单个主题的字符串或多个主题的数组。options
:object
(可选),订阅选项。qos
:number
,服务质量等级,取值为0
,1
, 或2
。默认为0
。
callback
:function (err, granted)
(可选),订阅完成后的回调函数。err
:Error
对象,表示订阅失败时的错误信息。granted
:array
,表示成功订阅的主题及其qos
级别的数组。
示例:
client.subscribe('test/topic', { qos: 1 }, function (err, granted) {
if (err) {
console.error('订阅失败:', err);
} else {
console.log('已成功订阅:', granted);
}
});
2. client.unsubscribe(topic, callback)
该方法用于取消订阅一个或多个主题。
topic
:string
或array
,要取消订阅的主题。可以是单个主题的字符串或多个主题的数组。callback
:function (err)
(可选),取消订阅后的回调函数。err
:Error
对象,表示取消订阅失败时的错误信息。
示例:
client.unsubscribe('test/topic', function (err) {
if (err) {
console.error('取消订阅失败:', err);
} else {
console.log('已成功取消订阅');
}
});
3. client.publish(topic, message, options, callback)
该方法用于向指定主题发布消息。
topic
:string
,消息发布的主题。message
:string
或Buffer
,要发布的消息内容。options
:object
(可选),发布选项。qos
:number
,服务质量等级,取值为0
,1
, 或2
。默认为0
。retain
:boolean
,是否保留该消息,默认为false
。
callback
:function (err)
(可选),消息发布完成后的回调函数。err
:Error
对象,表示消息发布失败时的错误信息。
示例:
client.publish('test/topic', 'Hello World', { qos: 1, retain: false }, function (err) {
if (err) {
console.error('消息发布失败:', err);
} else {
console.log('消息发布成功');
}
});
4. client.end(force, callback)
该方法用于断开与 MQTT Broker 的连接。
force
:boolean
(可选),是否强制断开连接。默认值为false
,即等待所有消息发送完毕后再断开连接。callback
:function
(可选),连接断开后的回调函数。
示例:
client.end(false, function () {
console.log('已断开连接');
});
5. client.reconnect()
该方法用于手动触发客户端重新连接。此方法通常不需要显式调用,因为客户端默认在断开连接后会自动重连。
示例:
client.reconnect();
6. client.handleMessage(packet, callback)
该方法用于处理接收到的消息。这个方法通常不需要显式调用,除非需要自定义消息处理行为。
packet
:object
,包含消息的原始数据包。callback
:function (err)
,处理完成后的回调函数。
示例:
client.handleMessage(packet, function (err) {
if (err) {
console.error('消息处理失败:', err);
} else {
console.log('消息已处理');
}
});
7. client.removeOutgoingMessage(mid)
该方法用于删除一个正在等待发送的消息(根据消息 ID)。它允许你在消息还未成功发送前将其移除。
mid
:number
,消息的 ID。
示例:
client.removeOutgoingMessage(12345); // 删除消息 ID 为 12345 的未发送消息
8. client.getLastMessageId()
该方法用于获取客户端发送的最后一个消息的 ID。这在调试或管理消息时非常有用。
示例:
const lastMessageId = client.getLastMessageId();
console.log('最后发送的消息 ID:', lastMessageId);
完整事件
1. connect
-
触发时机: 客户端成功连接到 MQTT Broker 时触发。
-
回调参数:
connack
对象,包含有关连接确认的信息。sessionPresent
:boolean
,是否恢复了之前的会话。returnCode
:number
,连接的返回码,0
表示连接成功。
示例:
client.on('connect', function (connack) {
console.log('已连接到 MQTT Broker');
console.log('会话状态:', connack.sessionPresent);
});
2. message
- 触发时机: 当客户端接收到某个订阅主题的消息时触发。
- 回调参数:
topic
:string
,消息所属的主题。message
:Buffer
,消息内容,需通过toString()
转换为字符串。
示例:
client.on('message', function (topic, message) {
console.log(`收到消息: ${message.toString()},来自主题: ${topic}`);
});
3. reconnect
- 触发时机: 当客户端因断开连接而开始重新连接时触发。
- 回调参数: 无。
示例:
client.on('reconnect', function () {
console.log('正在重新连接...');
});
4. close
- 触发时机: 当客户端的连接完全关闭时触发。
- 回调参数: 无。
示例:
client.on('close', function () {
console.log('连接已关闭');
});
5. offline
- 触发时机: 当客户端进入离线状态时触发,通常是因为客户端失去与 Broker 的连接。
- 回调参数: 无。
示例:
client.on('offline', function () {
console.log('客户端处于离线状态');
});
6. error
- 触发时机: 当发生错误时触发。
- 回调参数:
error
对象,包含错误信息。
示例:
client.on('error', function (err) {
console.error('发生错误:', err);
});
7. end
- 触发时机: 当客户端通过调用
end()
方法主动断开连接时触发。 - 回调参数: 无。
示例:
client.on('end', function () {
console.log('客户端连接已断开');
});
8. packetsend
- 触发时机: 当客户端发送数据包时触发。适用于调试网络活动。
- 回调参数:
packet
对象,包含发送的数据包信息。
示例:
client.on('packetsend', function (packet) {
console.log('已发送数据包:', packet);
});
9. packetreceive
- 触发时机: 当客户端接收到数据包时触发。适用于调试网络活动。
- 回调参数:
packet
对象,包含接收到的数据包信息。
示例:
client.on('packetreceive', function (packet) {
console.log('已接收数据包:', packet);
});