refactor(mqtt): 优化MQTT服务代码格式和数据处理逻辑

- 调整using语句的顺序以符合规范
- 格式化日志输出和代码缩进
- 在设备数据处理中添加时间间隔检查,每小时上传一次数据
- 优化取消订阅逻辑的代码结构
This commit is contained in:
赵正易 2025-09-21 14:44:10 +08:00
parent 3e30d72ef2
commit 657d1b7ab6

View File

@ -1,16 +1,16 @@
using Infrastructure.Extensions; using System;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Extensions;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using System;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using ZR.Common.MqttHelper; using ZR.Common.MqttHelper;
using ZR.Model.dc; using ZR.Model.dc;
using ZR.Model.Dto; using ZR.Model.Dto;
@ -18,7 +18,7 @@ using ZR.Model.mes.md;
namespace ZR.Service.mqtt namespace ZR.Service.mqtt
{ {
public class MqttService :BaseService<DeviceUploadData>, IHostedService, IDisposable public class MqttService : BaseService<DeviceUploadData>, IHostedService, IDisposable
{ {
private readonly ILogger<MqttService> _logger; private readonly ILogger<MqttService> _logger;
private readonly IConfiguration _configuration; private readonly IConfiguration _configuration;
@ -281,20 +281,22 @@ namespace ZR.Service.mqtt
if (newTopicFilters.Any()) if (newTopicFilters.Any())
{ {
var newSubscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder(); var newSubscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder();
foreach (var topicFilter in newTopicFilters) foreach (var topicFilter in newTopicFilters)
{ {
newSubscribeOptionsBuilder.WithTopicFilter(topicFilter); newSubscribeOptionsBuilder.WithTopicFilter(topicFilter);
} }
var newSubscribeOptions = newSubscribeOptionsBuilder.Build(); var newSubscribeOptions = newSubscribeOptionsBuilder.Build();
_logger.LogInformation($"订阅新主题: {string.Join(", ", newTopics)}"); _logger.LogInformation($"订阅新主题: {string.Join(", ", newTopics)}");
var result = await _mqttClient.SubscribeAsync(newSubscribeOptions); var result = await _mqttClient.SubscribeAsync(newSubscribeOptions);
foreach (var item in result.Items) foreach (var item in result.Items)
{ {
_logger.LogInformation($"订阅结果:{item.TopicFilter.Topic} -> {item.ResultCode}"); _logger.LogInformation(
$"订阅结果:{item.TopicFilter.Topic} -> {item.ResultCode}"
);
} }
} }
} }
@ -305,20 +307,19 @@ namespace ZR.Service.mqtt
_logger.LogInformation($"配置中移除的主题: {string.Join(", ", removedTopics)}"); _logger.LogInformation($"配置中移除的主题: {string.Join(", ", removedTopics)}");
// 注意:这里没有执行取消订阅操作,因为有些场景下可能希望保留历史订阅 // 注意:这里没有执行取消订阅操作,因为有些场景下可能希望保留历史订阅
// 如果需要取消订阅,请取消注释下面的代码 // 如果需要取消订阅,请取消注释下面的代码
// 使用构建器模式创建取消订阅选项 // 使用构建器模式创建取消订阅选项
var unsubscribeOptionsBuilder = new MqttClientUnsubscribeOptionsBuilder(); var unsubscribeOptionsBuilder = new MqttClientUnsubscribeOptionsBuilder();
// 为每个要取消订阅的主题调用WithTopic方法 // 为每个要取消订阅的主题调用WithTopic方法
foreach (var topic in removedTopics) foreach (var topic in removedTopics)
{ {
unsubscribeOptionsBuilder.WithTopicFilter(topic); unsubscribeOptionsBuilder.WithTopicFilter(topic);
} }
var unsubscribeOptions = unsubscribeOptionsBuilder.Build(); var unsubscribeOptions = unsubscribeOptionsBuilder.Build();
await _mqttClient.UnsubscribeAsync(unsubscribeOptions); await _mqttClient.UnsubscribeAsync(unsubscribeOptions);
_logger.LogInformation($"已取消订阅: {string.Join(", ", removedTopics)}"); _logger.LogInformation($"已取消订阅: {string.Join(", ", removedTopics)}");
} }
// 更新本地已订阅主题列表 // 更新本地已订阅主题列表
@ -371,30 +372,44 @@ namespace ZR.Service.mqtt
// plc网关抓取数据上传 // plc网关抓取数据上传
if (topic.Contains("device/data/push")) if (topic.Contains("device/data/push"))
{ {
DeviceUploadDataGatWayDto deviceUploadDataGatWayDto = JsonSerializer.Deserialize<DeviceUploadDataGatWayDto>(payload); DeviceUploadDataGatWayDto deviceUploadDataGatWayDto =
JsonSerializer.Deserialize<DeviceUploadDataGatWayDto>(payload);
// 这里添加设备消息处理逻辑 // 这里添加设备消息处理逻辑
string deviceCode = topic.Split("/")[2]; string deviceCode = topic.Split("/")[2];
DeviceUploadData deviceUploadData = new()
{ DeviceUploadData lastDeviceUploadData = Context
FactoryCode = "上海干巷", .Queryable<DeviceUploadData>()
WorkshopCode = "涂装车间", .Where(it => it.DeviceCode == deviceCode)
LineCode = "涂装生产线", .OrderByDescending(it => it.UploadTime)
DeviceCode = deviceCode, .First();
DictCode = "device_dict_plc_001", DateTime lastTime = lastDeviceUploadData.UploadTime;
Remark = "网关采集设备数据",
UploadTime = DateTime.Now, // 每一小时上传一下
CollectionTime = DateTimeOffset.FromUnixTimeMilliseconds(deviceUploadDataGatWayDto.Time).LocalDateTime,
Value01 = deviceUploadDataGatWayDto.DeviceParams.Value01.ToString(), DeviceUploadData deviceUploadData =
Value02 = deviceUploadDataGatWayDto.DeviceParams.Value02.ToString(), new()
Value03 = deviceUploadDataGatWayDto.DeviceParams.Value03.ToString(), {
Value04 = deviceUploadDataGatWayDto.DeviceParams.Value04.ToString(), FactoryCode = "上海干巷",
Value05 = deviceUploadDataGatWayDto.DeviceParams.Value05.ToString(), WorkshopCode = "涂装车间",
Value06 = deviceUploadDataGatWayDto.DeviceParams.Value06.ToString(), LineCode = "涂装生产线",
Value07 = deviceUploadDataGatWayDto.DeviceParams.Value07.ToString(), DeviceCode = deviceCode,
Value08 = deviceUploadDataGatWayDto.DeviceParams.Value08.ToString(), DictCode = "device_dict_plc_001",
Value09 = deviceUploadDataGatWayDto.DeviceParams.Value09.ToString(), Remark = "网关采集设备数据",
Value10 = deviceUploadDataGatWayDto.DeviceParams.Value10.ToString() UploadTime = DateTime.Now,
}; CollectionTime = DateTimeOffset
.FromUnixTimeMilliseconds(deviceUploadDataGatWayDto.Time)
.LocalDateTime,
Value01 = deviceUploadDataGatWayDto.DeviceParams.Value01.ToString(),
Value02 = deviceUploadDataGatWayDto.DeviceParams.Value02.ToString(),
Value03 = deviceUploadDataGatWayDto.DeviceParams.Value03.ToString(),
Value04 = deviceUploadDataGatWayDto.DeviceParams.Value04.ToString(),
Value05 = deviceUploadDataGatWayDto.DeviceParams.Value05.ToString(),
Value06 = deviceUploadDataGatWayDto.DeviceParams.Value06.ToString(),
Value07 = deviceUploadDataGatWayDto.DeviceParams.Value07.ToString(),
Value08 = deviceUploadDataGatWayDto.DeviceParams.Value08.ToString(),
Value09 = deviceUploadDataGatWayDto.DeviceParams.Value09.ToString(),
Value10 = deviceUploadDataGatWayDto.DeviceParams.Value10.ToString()
};
Context.Insertable(deviceUploadData).ExecuteCommand(); Context.Insertable(deviceUploadData).ExecuteCommand();
} }
@ -486,6 +501,7 @@ namespace ZR.Service.mqtt
_logger.LogError(ex, "清理MQTT客户端资源时出错"); _logger.LogError(ex, "清理MQTT客户端资源时出错");
} }
} }
public void Dispose() public void Dispose()
{ {
Dispose(true); Dispose(true);