Black magic on top of MQTT.js
Provides MQTT topic listeners with wildcards!
Also, Promise
s.
This is my MQTT.js wrapper. There are many like it, but this one is mine.
MQTT.js is an excellent MQTT client for Node.js & the browser. It is somewhat of a low-level module, however.
There are two (2) main issues which users of MQTT.js will quickly encounter:
- You can't just set a listener function for a subscribed topic; you have to stuff logic into a listener on the
message
event, and figure out what to do with the topic. - Received messages are all
Buffer
objects, and you can only publish aBuffer
orstring
. - (BONUS ISSUE) It doesn't use
Promise
s, which some people prefer over callbacks.
mqttletoad
solves the above problems. These are problems I have, and likely many others have as well.
When subscribing to an MQTT topic, you can use wildcards (+
and #
). If you do, then you have to match an incoming message's topic. Say we subscribed to:
foo/+/baz
with a listener function quux()
. And an we receive a message with topic:
foo/bar/baz
How do we know to execute quux()
?
We need something like a router (think express) to be able to match the topic to the proper listener.
EventEmitter2 does exactly this by supporting wildcards in event names. It's flexible, so you don't need to use Express-style routes (foo/:param/baz
), which is what several other libraries tackling the same problem have done.
What's better is that EventEmitter
s are standardized. They are easy to consume. Think RxJs's Observable.fromEvent()
. This should help those using a "reactive" programming model.
MQTT makes no prescriptions about what a message looks like. It's just a blob.
But as a developer, you know your data. Maybe that message is JSON, maybe it's base64-encoded, or maybe it's just a string. You are unlikely to be surprised about what you get.
Out-of-the-box, mqttletoad
provides several common decoders for received messages, and encoders for publishing messages.
These are:
json
- Convert to/from a JSON representation of an objecttext
- Convert to/from a UTF-8 encoded stringbinary
- Convert to aBuffer
(all received messages areBuffer
s, so no decoding necessary)base64
- Convert to/from a base64 (string) representation of just about anything
To use these, you can specify a default encoder and/or decoder when connecting:
const toad = require('mqttletoad');
(async function () {
const client = await toad.connect('wss://test.mosquitto.org', {
encoder: 'json',
decoder: 'json'
});
await client.subscribe('foo/bar', message => {
console.log(message.baz); // quux
});
// see listener above
await client.publish('foo/bar', {baz: 'quux'});
}());
Or you can do this on a per-subscription/publish basis:
const toad = require('mqttletoad');
(async function () {
// "text" is the default encoder/decoder
const client = await toad.connect('wss://test.mosquitto.org', {
encoder: 'text',
decoder: 'text'
});
await client.subscribe('foo/bar', message => {
console.log(message.baz); // quux
}, {decoder: 'json'});
// see listener above
await client.publish('foo/bar', {baz: 'quux'}, {encoder: 'json'});
}());
You can also provide your own either way:
const toad = require('mqttletoad');
(async function() {
const client = await toad.connect('wss://test.mosquitto.org', {
decoder: parseFloat
});
await client.subscribe('foo/bar', message => {
console.log(message); // 123.4
});
await client.subscribe('foo/bar', message => {
console.log(message); // '123.4'
}, {decoder: 'text'});
// see listeners above
await client.publish('foo/bar', 123.4); // default encoder is "text"
}());
async-mqtt does the same thing here--more or less.
The following functions are promisified:
MqttClient#publish
MqttClient#subscribe
MqttClient#unsubscribe
MqttClient#end
mqttletoad
supports connecting to an MQTT broker running on a named pipe.
Node.js v7.0.0 or greater required.
$ npm install mqttletoad
This is a fancypants wrapper around MQTT.js, so most everything there applies here, except the differences noted above.
const toad = require('mqttletoad');
const myfunc = async () => {
const client = await toad.connect('wss://test.mosquitto.org');
client.on('disconnect', () => {
console.warn('client disconnected');
})
.on('offline', () => {
console.warn('client offline; reconnecting...');
});
// uses default "text" decoder
const suback = await client.subscribe('winken/+/nod', (str, packet) => {
console.log(`topic: "${packet.topic}", message: "${str}"`);
}, {qos: 1});
console.log(`subscribed to ${suback.topic} w/ QoS ${suback.qos}`);
// uses default "text" encoder
await client.publish('winken/blinken/nod', 'foo');
const someOtherListener = (message, packet) => {
// does stuff with MESSAGE
};
// a custom decoder
await client.subscribe('winken/+/nod', someOtherListener, {
decoder: value => String(value).toUpperCase()
});
// remove only this particular listener for this topic;
// no actual unsubscription occurs because this isn't the only listener
// on the topic.
await client.unsubscribe('winken/+/nod', someOtherListener);
// remove ALL listeners from this topic and unsubscribe
await client.unsubscribe('winken/+/nod');
// disconnect
await client.end();
// IPC support (mqtt only; not ws)
const client = await toad.connect({path: '/path/to/my/named/pipe'});
}
- Use
client.subscribe(topic, [opts], listener)
to register a listener for the topic.opts
are the standard optionsMqttClient#subscribe()
supports, includingdecoder
- While
MqttClient#subscribe()
supports anArray
of topics, ourtopic
is singular, and must be a string. - Standard MQTT topic wildcards are supported, and listeners are executed first in order of specificity; i.e.
foo/bar
will take precedence overfoo/+
andfoo/+
will take precedence overfoo/#
.
- Use
client.unsubscribe(topic, listener)
to remove the listener for the topic.- This will not necessarily unsubscribe from the topic (at the broker level), because there may be other listeners, but it will remove the listener.
- If
listener
is omitted, all listeners are removed, which forces unsubscription.
- Use
client.end(force=false)
to disconnect - Use
client.publish(topic, message, [opts])
with standardMqttClient#publish()
options, includingencoder
- Use
connect(url, [opts])
to connect;url
is astring
, or you could just pass anopts
object. Includesencoder
anddecoder
options, which set the default encoder and decoder, respectively. The default istext
in both cases.
- Something something Rollup?
Is this module useful for you? Please let me know. PRs accepted!
Apache-2.0 © 2017 Christopher Hiller