Skip to content

Commit

Permalink
[FIXED] MQTT PUBREL header incompatibility (#4616)
Browse files Browse the repository at this point in the history
https://hivemq.github.io/mqtt-cli/docs/test/ pointed out the
incompatibility.
  • Loading branch information
levb authored Oct 5, 2023
1 parent 4e414f1 commit beee6fc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/MQTT test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: MQTT Compliance
on: [push, pull_request]

jobs:
test:
strategy:
matrix:
go: ["1.21"]
env:
GOPATH: /home/runner/work/nats-server
GO111MODULE: "on"

runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
path: src/github.com/nats-io/nats-server

- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: ${{matrix.go}}

- name: Install deps
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb
sudo apt install ./mqtt-cli-4.20.0.deb
- name: Run tests
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
set -e
cd src/github.com/nats-io/nats-server/server
go test -v -vet=off --run=TestMQTTCLICompliance
set +e
8 changes: 8 additions & 0 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3915,6 +3915,14 @@ func (c *client) mqttEnqueuePubResponse(packetType byte, pi uint16, trace bool)
proto := [4]byte{packetType, 0x2, 0, 0}
proto[2] = byte(pi >> 8)
proto[3] = byte(pi)

// Bits 3,2,1 and 0 of the fixed header in the PUBREL Control Packet are
// reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat
// any other value as malformed and close the Network Connection [MQTT-3.6.1-1].
if packetType == mqttPacketPubRel {
proto[0] |= 0x2
}

c.mu.Lock()
c.enqueueProto(proto[:4])
c.mu.Unlock()
Expand Down
37 changes: 37 additions & 0 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"math/rand"
"net"
"os"
"os/exec"
"reflect"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -6997,6 +6999,41 @@ func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) {
testMQTTExpectNothing(t, r)
}

func TestMQTTCLICompliance(t *testing.T) {
mqttPath := os.Getenv("MQTT_CLI")
if mqttPath == "" {
if p, err := exec.LookPath("mqtt"); err == nil {
mqttPath = p
}
}
if mqttPath == "" {
t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`)
}

conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: mqtt
jetstream {
store_dir = %q
}
mqtt {
listen: 127.0.0.1:-1
}
`, t.TempDir())))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)

cmd := exec.Command(mqttPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port))

output, err := cmd.CombinedOutput()
t.Log(string(output))
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
t.Fatalf("mqtt cli exited with error: %v", exitError)
}
}
}

//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
Expand Down

0 comments on commit beee6fc

Please sign in to comment.