MQTT测试

张开发
2026/4/9 17:18:39 15 分钟阅读

分享文章

MQTT测试
MQTT笔记ClientId是客户端的唯一标识需要每个客户端都不一样host是IP地址自身IP可以写127.0.0.1端口号默认1883QoS心跳包AspNetCore_Server和服务端互发心跳包AspNetCore_Server是客户端的名字也就是ClientId1775195222: Received PINGREQ from AspNetCore_Server1775195222: Sending PINGRESP to AspNetCore_S在这里插入代码片erver在执行以下命令行(启动mqtt服务器)mosquitto-v报1775189036: Opening ipv4 listen socket on port1883.1775189036: Error: 通常每个套接字地址(协议/网络地址/端口)只允许使用一次。1775189036: Opening ipv6 listen socket on port1883.1775189036: Error: 通常每个套接字地址(协议/网络地址/端口)只允许使用一次。意思是端口 1883 已经被占用了解决命令行输入 找到占用1883的进程最后一列就是PIDnetstat-ano|findstr:1883根据PID杀死线程此处PID为1234taskkill /F /PID1234MQTT服务端测试使用mosiquitto下载安装下载地址https://mosquitto.org/download/默认安装后路径在C:\Program Files\Mosquitto添加用户名密码打开C:\Program Files\Mosquitto\mosquitto.conf添加两行到最后保存# 关闭匿名登录allow_anonymousfalse# 指定密码文件你等下要创建password_file C:\Program Files\Mosquitto\pwfile.txt# 监听所有IP局域网/外部可连listener18830.0.0.03.1. 命名行执行以下命令添加test用户名随后输入密码两次此处密码为123(-c是覆盖文件)mosquitto_passwd-cpwfile.txttest启动配置好的mosquittomosquitto-v-cC:\Program Files\Mosquitto\mosquitto.conf成功后可看到Opening ipv4 listen socket on port1883.▲如果要匿名第三点改为allow_anonymoustruelistener18830.0.0.0MQTT客户端软件测试使用MQTTX配置Host127.0.0.1(本地测试)Client IDmqttx_ad4e1234(随便)UserNametestPassword123新增主题点击New Subscription修改Topic(主题名称)此处为testtopic_1/#其中#为通配符符合的就会发送即testtopic_1/atesttopic_1/b都能发布到这个主题。想主题发送消息C#写MQTT框架.net 6.0asp.net coreNUGET包MQTTNet 4.3.7(最新是5.X此处用4.3.7兼容.net 6.0)MQTTnet.Extensions.ManagedClient代码Program.cs添加让后台服务和控制器 使用 同一个对象// 1. 先注册成单例控制器能找到builder.Services.AddSingletonMqttClientService();// 2. 再让它后台启动builder.Services.AddHostedService(pp.GetRequiredServiceMqttClientService());▲这是 .NET Core 后台服务 控制器共用 唯一标准写法2. MqttClientServiceusingMicrosoft.Extensions.Hosting;usingMicrosoft.Extensions.Logging;usingMQTTnet;usingMQTTnet.Client;usingMQTTnet.Extensions.ManagedClient;usingMQTTnet.Protocol;usingSystem.Threading.Tasks;usingSystem.Threading;usingSystem;namespaceYourNamespace{publicclassMqttClientService:IHostedService{privatereadonlyILoggerMqttClientService_logger;privateIManagedMqttClient_mqttClient;// 【客户对接配置】 privateconststringMqttServer127.0.0.1;privateconstintMqttPort1883;privateconstboolUseSslfalse;privateconststringUserNametest;privateconststringPassword123;// // 客户固定主题格式privatestringPublishTopic$testtopic_1/a;privatestringSubscribeTopic$testtopic_2/a;// 客户端ID唯一privatestringClientId$AspNetCore_Server;publicMqttClientService(ILoggerMqttClientServicelogger){_loggerlogger;}publicasyncTaskStartAsync(CancellationTokencancellationToken){_logger.LogInformation( 启动 MQTT 客户端对接客户IoT平台);varfactorynewMqttFactory();_mqttClientfactory.CreateManagedMqttClient();// 【严格按客户要求构建连接】 varmqttClientOptionsBuildernewMqttClientOptionsBuilder().WithTcpServer(MqttServer,MqttPort).WithCredentials(UserName,Password).WithClientId(ClientId).WithKeepAlivePeriod(TimeSpan.FromSeconds(60))// 心跳60s强制.WithCleanSession(true).WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311);// SSL 端口 8883 自动开启if(UseSsl){mqttClientOptionsBuilder.WithTls();}varoptionsnewManagedMqttClientOptionsBuilder().WithAutoReconnectDelay(TimeSpan.FromSeconds(5)).WithClientOptions(mqttClientOptionsBuilder.Build()).Build();// SubscribeMqttEvents();await_mqttClient.StartAsync(options);awaitSubscribeTopicsAsync();_logger.LogInformation(MQTT 连接成功);_logger.LogInformation(上报主题PublishTopic);_logger.LogInformation(订阅主题SubscribeTopic);}privateasyncTaskSubscribeTopicsAsync(){// 订阅平台下发指令QoS1await_mqttClient.SubscribeAsync(SubscribeTopic,MqttQualityOfServiceLevel.AtLeastOnce);}privatevoidSubscribeMqttEvents(){_mqttClient.ConnectedAsynce{_logger.LogInformation(✅ MQTT 连接成功);returnTask.CompletedTask;};_mqttClient.DisconnectedAsynce{_logger.LogWarning(❌ MQTT 断开e.Reason);returnTask.CompletedTask;};_mqttClient.ApplicationMessageReceivedAsynce{vartopice.ApplicationMessage.Topic;varpayloade.ApplicationMessage.ConvertPayloadToString();_logger.LogInformation( 收到平台指令{Topic} {Payload},topic,payload);returnTask.CompletedTask;};}publicasyncTaskPublishDataAsync(stringjsonData){if(!_mqttClient.IsConnected){_logger.LogWarning(MQTT未连接数据已缓存);}varmessagenewMqttApplicationMessageBuilder().WithTopic(PublishTopic).WithPayload(jsonData).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)// QoS1 强制.WithRetainFlag(false)// Retainfalse 强制.Build();await_mqttClient.EnqueueAsync(message);// 4.3.7 稳定不报错_logger.LogInformation( 上报成功jsonData);}publicasyncTaskStopAsync(CancellationTokencancellationToken){_logger.LogInformation( 停止 MQTT 客户端 );if(_mqttClient!null){await_mqttClient.StopAsync();_mqttClient.Dispose();}}}}发送消息[Route(/param/agvtask/mqtt)][HttpPost]public async TaskActionResultmqtt(){string jsonnew AjaxResult(ResultType.Success,1).ToJson();await _mqtt.PublishDataAsync(json);returnSuccess(mqtt完成);}

更多文章