Skip to content

artcom/mqtt-topping

Repository files navigation

mqtt-topping

Wraps the MQTT.js client to multiplex incoming messages to the subscribed handlers and supports querying retained topics via HTTP.

MQTT.js events can also be registered on the mqtt-topping client.

Expects that the default MQTT message payload is formatted as JSON.

MQTT Client

Features

  • Subscribe and unsubscribe handler callbacks to individual (wildcard) topics
  • JSON.stringify all published payloads
  • JSON.parse all incoming payloads
  • Ignore non-JSON payloads
  • Decide whether to retain a message or not depending on the topic name (retained unless topic is prefixed with on or do)
  • Publishes and subscriptions are sent with quality-of-service 2

Connect, Subscribe, Publish, Unpublish and Register Event "offline"

const { connectAsync } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")

  client.on("offline", () => console.error("Client is offline. Trying to reconnect."))

  await client.subscribe("my/topic", (payload, topic, packet) => {
    console.log(
      "Received Payload " + payload + " for Topic " + topic + " (retained = " + packet.retain + ")"
    )
  })

  await client.publish("my/topic", "myPayload")

  await client.unpublish("my/topic")
}

HTTP Client

Features

  • Works with the broker plugin "HiveMQ Retained Message Query Plugin"
  • Supports single and batch queries including wildcard topics, additional options are:
    • parseJson: Parse the result.payload as JSON. Default is true.
    • depth: Specifies the recursive depth of the query. A depth > 0 returns subtopics in result.children. Default is 0.
    • flatten: Flattens all topics into a flat array. Default is false.
  • Supports single and batch json queries which:
    • return entire topic trees (topics with subtopics) as one JSON object
    • ignore topic payloads if subtopics exist

Single Query

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("my/topic", "myPayload")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.query({ topic: "my", depth: 1 })

  // {
  // "topic": "my",
  // "children": [
  //     {
  //         "topic": "my/topic",
  //         "payload": "myPayload"
  //     }
  //   ]
  // }
}

Batch Query

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("my/topic1", "myPayload1")
  await client.publish("my/topic2", "myPayload2")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.queryBatch([{ topic: "my/topic1" }, { topic: "my/topic2" }])

  // [
  //   {
  //       "topic": "my/topic1",
  //       "payload": "myPayload1"
  //   },
  //   {
  //       "topic": "my/topic2",
  //       "payload": "myPayload2"
  //   }
  // ]
}

QueryJson

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("my/topic", "myPayload")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.queryJson("my")

  // {
  //   "topic": "myPayload"
  // }
}

QueryJsonBatch

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("january/first", "eat")
  await client.publish("january/second", "sleep")
  await client.publish("february/first", "work")
  await client.publish("february/second", "repeat")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.queryJsonBatch(["january", "february"])

  // [
  //   {
  //     "first": "eat"
  //     "second": "sleep"
  //   },
  //   {
  //     "first": "work"
  //     "second": "repeat"
  //   }
  // ]
}

Unpublish Recursively

const { connectAsync, HttpClient, unpublishRecursively } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("january/first", "eat")
  await client.publish("january/second", "sleep")
  await client.publish("february/first", "work")
  await client.publish("february/second", "repeat")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await unpublishRecursively(mqttClient, httpClient, "february")

  // remaining published topics on the broker
  // january/first: "eat"
  // january/second: "sleep"
}

Development

Build

npm install
npm run build

Test

The tests require a running MQTT broker instance with the "HiveMQ Retained Message Query Plugin".

npm install
npm run build
npm run test