C# MQTTNET 服务端+客户端 实现 源码示例

目录

1.演示效果

2.源码下载

3.服务端介绍

4.客户端介绍


1.演示效果

2.源码下载

下载地址:https://download.csdn.net/download/rotion135/89385802

3.服务端介绍

服务端用的控制台程序进行设计,实际使用可以套一层Windows服务的皮,进行服务部署。

调试用控制台显示收发的消息,便于直观

首先安装的MQTTNET 版本是4.3.6.1152 :

自定义了服务的客户端列表数据模型:

    public class MqttClientInfo
    {
        /// <summary>
        /// ID
        /// </summary>
        public string ClientId { get; set; }
        /// <summary>
        /// 客户端名称
        /// </summary>
        public string ClientName { get; set; }
        /// <summary>
        /// 订阅列表
        /// </summary>
        public List<MqttSubscription> Subscriptions { get; set; } = new List<MqttSubscription>();
    }

    public class MqttSubscription
    {
        /// <summary>
        /// 所属客户端
        /// </summary>
        public MqttClientInfo Parent { get; set; }
        /// <summary>
        /// 订阅消息
        /// </summary>
        public string Topic { get; set; }
    }

再对服务端的代码进行封装,添加响应的事件,做一些消息显示到控制台

服务端的代码就这么简单

    public class LSMQTTServer
    {
        MqttServer mqttServer;
        List<MqttClientInfo> MqttClients = new List<MqttClientInfo>();

        /// <summary>
        /// 初始化Mqtt服务并启动服务
        /// </summary>
        /// <param name="ip"></param>
        /// <param name="port"></param>
        public virtual void InitMqttServer(string ip, int port)
        {
            var mqttServerOptions =
                    new MqttServerOptionsBuilder()
                    .WithDefaultEndpoint()
                    .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))//set the ip of the server
                    .WithDefaultEndpointPort(port)//set the port of the server                    
                    .Build();
            mqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
            mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
            mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
            mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
            mqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync;
            mqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync;
            mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
            mqttServer.ClientAcknowledgedPublishPacketAsync += MqttServer_ClientAcknowledgedPublishPacketAsync;
            mqttServer.InterceptingClientEnqueueAsync += MqttServer_InterceptingClientEnqueueAsync;
            mqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync;

            mqttServer.StartAsync();
        }       

        private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            try
            {
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ApplicationMessageNotConsumedAsync", ex);
            }
            return Task.CompletedTask;
        }

        private Task MqttServer_InterceptingClientEnqueueAsync(InterceptingClientApplicationMessageEnqueueEventArgs arg)
        {
            try
            {
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_InterceptingClientEnqueueAsync", ex);
            }
            return Task.CompletedTask;
        }

        private Task MqttServer_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
        {
            try
            {

            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientAcknowledgedPublishPacketAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 消息接收
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            try
            {
                var client = arg.ClientId;
                var topic = arg.ApplicationMessage.Topic;
                var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
                GlobalEvents.OnMessage($"接收到消息:Client[{client}] Topic[{topic}] Message[{content}]");
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_InterceptingPublishAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 关闭Mqtt服务
        /// </summary>
        public async virtual Task StopMqttServer()
        {
            if (mqttServer != null)
            {
                if (mqttServer.IsStarted)
                {
                    await mqttServer.StopAsync();
                    mqttServer.Dispose();
                }
            }
        }

        /// <summary>
        /// 对客户端的连接进行验证
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            try
            {
                //验证ClientId
                if (string.IsNullOrWhiteSpace(arg.ClientId))
                {
                    arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    return Task.CompletedTask;
                }

                //验证用户名和密码
                bool acceptflag = !(string.IsNullOrWhiteSpace(arg.UserName) || string.IsNullOrWhiteSpace(arg.Password));

                if (!acceptflag)
                {
                    //验证失败
                    arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    return Task.CompletedTask;
                }
                arg.ReasonCode = MqttConnectReasonCode.Success;
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ValidatingConnectionAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端连接成功
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            try
            {
                MqttClients.Add(new MqttClientInfo() { ClientId = arg.ClientId, ClientName = arg.UserName });
                GlobalEvents.OnMessage($"客户端上线- ID:【{arg.ClientId}】 Name:【{arg.UserName}】");
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientConnectedAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端断开连接
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        public virtual Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            try
            {
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    MqttClients.Remove(mqttUser);
                    GlobalEvents.OnMessage($"客户端离线- ID:【{mqttUser.ClientId}】");
                }

            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientDisconnectedAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端发布订阅
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            try
            {
                if (arg == null)
                    return Task.CompletedTask;
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    mqttUser.Subscriptions.Add(new MqttSubscription() { Parent = mqttUser, Topic = arg.TopicFilter.Topic });
                    GlobalEvents.OnMessage($"客户端发布订阅- Topic:【{arg.TopicFilter.Topic}】");
                }
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientSubscribedTopicAsync", ex);
            }
            return Task.CompletedTask;
        }


        /// <summary>
        /// 客户端取消订阅
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            try
            {
                if (arg == null)
                    return Task.CompletedTask;
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    MqttSubscription? mqttSubedTopic = mqttUser.Subscriptions.FirstOrDefault(t => t.Topic == arg.TopicFilter);
                    if (mqttSubedTopic != null)
                    {
                        mqttUser.Subscriptions.Remove(mqttSubedTopic);
                        GlobalEvents.OnMessage($"客户端取消订阅- Topic:【{mqttSubedTopic.Topic}】");
                    }
                }
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientUnsubscribedTopicAsync", ex);
            }
            return Task.CompletedTask;
        }


    }

然后在控制台运行的时候,对服务进行实例化

var ip = IPHelper.GetLocalIP();
int port = 3303;
LSMQTTServer server = new LSMQTTServer();
server.InitMqttServer(ip,port);
GlobalEvents.OnMessage($"MQTT服务启动,IP:{ip},Port{port}");

4.客户端介绍

客户端设计,用的WPF,将每个客户端连接,用自定义控件进行封装,界面添加多个控件即表示多个客户端,添加订阅后,即可以显示收到的消息,便于多个客户端之间的消息调试

首先对连接的客户端也进行了类封装,包括连接,订阅,取消订阅和消息接收等

    public class LSMQTTClient
    {
        MqttClient mqttClient;
        public delegate void DelegateOutMessage(string message);
        public event DelegateOutMessage OnOutMessage;

        public void InitMqttClient(string serverIp, int serverPort, string clientId, string userName, string password)
        {
            try
            {
                var options = new MqttClientOptionsBuilder()
                     .WithCleanSession(true)
                     .WithCredentials(userName, password)
                     .WithClientId(clientId)
                     .WithTcpServer(serverIp,serverPort)
                     .Build();

                ConnectMQTTServer(options);
            }
            catch (Exception ex)
            {
                LogOperate.Error("InitMqttClient 发生异常", ex);
            }
        }

        /// <summary>
        /// 判断是否已连接服务
        /// </summary>
        /// <returns></returns>
        public bool IsConnect()
        {
            if (mqttClient == null)
                return false;
            if(!mqttClient.IsConnected)
                return false;
            return true;
        }


        public async void ConnectMQTTServer(MqttClientOptions options)
        {
            MqttFactory factory = new MqttFactory();
            if (mqttClient == null)
            {
                mqttClient = (MqttClient)factory.CreateMqttClient();
                mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; ;
                mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; ;
                mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; ;
            }

            await mqttClient.ConnectAsync(options);
        }


        /// <summary>
        /// 断开服务连接
        /// </summary>
        public void DisConnectMQTTServer()
        {
            if(mqttClient!=null && mqttClient.IsConnected)
            {
                mqttClient.DisconnectAsync();
            }
        }

        /// <summary>
        /// 添加订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task<BaseResult> AddSubscription(string topic)
        {
            if(!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.SubscribeAsync(topic);
            return BaseResult.Successed;
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task<BaseResult> UnSubscription(string topic)
        {
            if (!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.UnsubscribeAsync(topic);
            return BaseResult.Successed;
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="content"></param>
        /// <returns></returns>
        public async Task<BaseResult> SendMessage(string topic,string content)
        {
            if (!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.PublishStringAsync(topic, content);
            return BaseResult.Successed;
        }

        private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            OnOutMessage?.Invoke("已断开服务连接");
            return Task.CompletedTask;
        }

        private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            if (arg.ConnectResult.ResultCode == MqttClientConnectResultCode.Success)
                OnOutMessage?.Invoke("已连接到服务");
            else
                OnOutMessage?.Invoke($"连接服务失败【{arg.ConnectResult.ReasonString}】");
            return Task.CompletedTask;
        }

        private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            if (arg.ApplicationMessage.PayloadSegment.Array != null)
            {
                var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);// BitConverter.ToString(arg.ApplicationMessage.PayloadSegment.Array,0, arg.ApplicationMessage.PayloadSegment.Count);
                OnOutMessage?.Invoke($"[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}]-收到消息>>{content}");
            }
            return Task.CompletedTask;
        }
    }

再设计自定义控件,将连接属性单独实例化

自定义控件包括XAML和ViewModel的设计,详细的可以下载源码进行查看,此处不展示太多了,代码量也确实有一些些,无非就是 连接的各个参数,如IP、端口,客户端ID,用户名、密码等等

然后再主界面设计两个按钮,添加自定义控件和清理自定义控件

界面设计的东西不介绍太多了,因为客户端可以有很多种设计的方式,但通讯那一块就已经在上边展示的代码里边了;

到此服务端+客户端就已经实现了,是不是没有想象中那么复杂。

最近更新

  1. TCP协议是安全的吗?

    2024-06-06 18:04:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-06 18:04:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-06 18:04:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-06 18:04:05       20 阅读

热门阅读

  1. 糖尿病相关的数据集

    2024-06-06 18:04:05       10 阅读
  2. ActiViz中的纹理映射

    2024-06-06 18:04:05       9 阅读
  3. 搭建python环境以及pip

    2024-06-06 18:04:05       8 阅读
  4. 什么是CSTP测试认证,如何通过CSTP认证?

    2024-06-06 18:04:05       7 阅读
  5. 全文检索&ElasticSearch简介

    2024-06-06 18:04:05       8 阅读