c# – MQTT Subscribing From Host Windows Mosquitto Broker subscribe data to Mosquitto Broker Ubuntu Guest

Background: I am working on a project in Unity and I have these plugins BestMQTT and [BestHTTP][1] and [BestHTTP2][2]. The MQTT script I have in unity needs to subscribe to the broker in a Ubuntu Virtual Machine I have running ThingsBoard and Mosquitto. I have edited the default mosquitto.conf file in Ubuntu to as follows:

 

    # Place your local configuration in /etc/mosquitto/conf.d/
    #
    # A full description of the configuration file is at
    # /usr/share/doc/mosquitto/examples/mosquitto.conf.example
    
    listener 1883
    password_file /etc/mosquitto/passwd
    allow_anonymous false
    
    listener 9001
    protocol websockets
    
    pid_file /var/run/mosquitto.pid
    
    persistence true
    persistence_location /var/lib/mosquitto/
    
    log_dest file /var/log/mosquitto/mosquitto.log
    
    include_dir /etc/mosquitto/conf.d

This is the script I have written in Unity:

using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using BestMQTT;
using BestMQTT.Packets.Builders;
using BestMQTT.Packets;
using System.Text;
public class MQTT : MonoBehaviour
{
    // Location of where the data is.
    private String networkVariablePath = "\\localhost\RT Shared Variables\";

    // Topic name for subscription.
    private static readonly String baseTopic = "v1/devices/me/telemetry";

    // Device ID.
    private static readonly String clientId = "Jupiter-MQTT-Pub";
    MQTTClient client;
    // Start is called before the first frame update
    void Start()
    {
        client = new MQTTClientBuilder()
        .WithOptions(new ConnectionOptionsBuilder().WithTCP("IP OF UBUNTU VM", 1883))
        .WithEventHandler(OnConnected)
        .WithEventHandler(OnDisconnected)
        .WithEventHandler(OnStateChanged)
        .WithEventHandler(OnError)
        .CreateClient();

        client.BeginConnect(ConnectPacketBuilderCallback);

        var session = SessionHelper.Get("IP OF UBUNTU VM");
        Debug.Log(session.ClientId);
    }

    private void OnConnected(MQTTClient client)
    {
        // Sends MQTT packets in smaller batches.
        using (new PacketBufferHelper(client))
        {
            client.AddTopicAlias("v1/devices/me/telemetry");

            // Subscriptes client to topic to get updates.
            client.CreateSubscriptionBuilder("v1/devices/me/telemetry")
    .WithMessageCallback(OnMessage)
    .WithMaximumQoS(QoSLevels.ExactlyOnceDelivery)
    .BeginSubscribe();

        }

        /*    client.CreateApplicationMessageBuilder("best_mqtt/test_topic")
            .WithPayload("Hello MQTT World!")
            .WithQoS(BestMQTT.Packets.QoSLevels.ExactlyOnceDelivery)
            .WithContentType("text/plain; charset=UTF-8")
            .BeginPublish();*/
    }

    private void OnDestroy()
    {
        client?.CreateDisconnectPacketBuilder()
        .WithReasonCode(DisconnectReasonCodes.NormalDisconnection)
        .WithReasonString("Bye")
        .BeginDisconnect();
    }

    private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder)
    {
        // also adds username and password needed for authentication.
        return builder.WithUserNameAndPassword("Jupiter-MQTT-Pub", "");
    }

    private void OnMessage(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
    {
        // Convert the raw payload to a string
        var payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
        Debug.Log("Payload: " + payload);
        Debug.Log($"Content-Type: '{message.ContentType}' Payload: '{payload}'");
   
        client.CreateUnsubscribePacketBuilder("best_mqtt/test_topic")
        .BeginUnsubscribe();
    }

    // Called when the MQTTClient transfered to a new internal state.
    private void OnStateChanged(MQTTClient client, ClientStates oldState, ClientStates newState)
    {
        Debug.Log($"{oldState} => {newState}");
    }
    // Called when the client disconnects from the server. The disconnection can be client or server initiated or because of an error.
    private void OnDisconnected(MQTTClient client, DisconnectReasonCodes code, string reason)
    {
        Debug.Log($"OnDisconnected - code: {code}, reason: '{reason}'");
    }
    // Called when an error happens that the plugin can't recover from. After this event an OnDisconnected event is raised too.
    private void OnError(MQTTClient client, string reason)
    {
        Debug.Log($"OnError reason: '{reason}'");
    }

}

Issue: When I try and connect to the Broker with this C# script two errors occur: 1)


    java.lang.ClassCastException: io.netty.handler.codec.mqtt.MqttMessage cannot be cast to io.netty.handler.codec.mqtt.MqttConnectMessage
        at org.thingsboard.server.transport.mqtt.MqttTransportHandler.processMqttMsg(MqttTransportHandler.java:137)
        at org.thingsboard.server.transport.mqtt.MqttTransportHandler.channelRead(MqttTransportHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
        at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:349)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Unity Debug for 1):

and 2 (when using WebSocket):


    2022-05-04 14:22:10,789 [nioEventLoopGroup-4-3] INFO  o.t.s.t.mqtt.MqttTransportHandler - [192.168.22.171:53794] Invalid message received
    2022-05-04 14:22:10,789 [nioEventLoopGroup-4-3] INFO  o.t.s.t.mqtt.MqttTransportHandler - [553b7958-b42c-4fa2-a0b9-abb52a3b444a] Client disconnected!


Now I found these two links for troubleshooting:

[Link 1 (Stackoverflow)][3]
They discuss using a MQTT Bridge to connect the two, but I am not actually sure if this will resolve it.

[Link2 (GitHub Bug)][4] They mention the issue of the Exception is caused by the payload being too large and one needs to edit the .java config to allow a larger value, but I cannot find this file on my Ubuntu V.M.

So, I am confused on what the issue actually is and this is causing me to be unable to debug it. May I please have some clarification on what is happening here so I cam try and resolve this. Also, I understand the background may seem strange as to how I am trying to subscribe and get the data, but I have to do it like this due to constraints on the project I am working on. 

Any and all assistance will be deeply appreciated. 

Thank you for your time.


  [1]: https://assetstore.unity.com/packages/tools/network/best-mqtt-209238
  [2]: https://assetstore.unity.com/packages/tools/network/best-http-2-155981
  [3]: https://stackoverflow.com/questions/68354383/unable-to-connect-moquitto-broker-from-vm-to-host-machine
  [4]: https://github.com/thingsboard/thingsboard/issues/2565

Leave a Comment