diff --git a/ZR.Service/mqtt/MqttService.cs b/ZR.Service/mqtt/MqttService.cs index d1b5e918..254e0613 100644 --- a/ZR.Service/mqtt/MqttService.cs +++ b/ZR.Service/mqtt/MqttService.cs @@ -1,16 +1,16 @@ -using Infrastructure.Extensions; +using System; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Infrastructure.Extensions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; -using System; -using System.Linq; -using System.Text; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; using ZR.Common.MqttHelper; using ZR.Model.dc; using ZR.Model.Dto; @@ -18,7 +18,7 @@ using ZR.Model.mes.md; namespace ZR.Service.mqtt { - public class MqttService :BaseService, IHostedService, IDisposable + public class MqttService : BaseService, IHostedService, IDisposable { private readonly ILogger _logger; private readonly IConfiguration _configuration; @@ -281,20 +281,22 @@ namespace ZR.Service.mqtt if (newTopicFilters.Any()) { var newSubscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder(); - + foreach (var topicFilter in newTopicFilters) { newSubscribeOptionsBuilder.WithTopicFilter(topicFilter); } - + var newSubscribeOptions = newSubscribeOptionsBuilder.Build(); - + _logger.LogInformation($"订阅新主题: {string.Join(", ", newTopics)}"); var result = await _mqttClient.SubscribeAsync(newSubscribeOptions); foreach (var item in result.Items) { - _logger.LogInformation($"订阅结果:{item.TopicFilter.Topic} -> {item.ResultCode}"); + _logger.LogInformation( + $"订阅结果:{item.TopicFilter.Topic} -> {item.ResultCode}" + ); } } } @@ -305,20 +307,19 @@ namespace ZR.Service.mqtt _logger.LogInformation($"配置中移除的主题: {string.Join(", ", removedTopics)}"); // 注意:这里没有执行取消订阅操作,因为有些场景下可能希望保留历史订阅 // 如果需要取消订阅,请取消注释下面的代码 - + // 使用构建器模式创建取消订阅选项 var unsubscribeOptionsBuilder = new MqttClientUnsubscribeOptionsBuilder(); - + // 为每个要取消订阅的主题调用WithTopic方法 foreach (var topic in removedTopics) { unsubscribeOptionsBuilder.WithTopicFilter(topic); } - + var unsubscribeOptions = unsubscribeOptionsBuilder.Build(); await _mqttClient.UnsubscribeAsync(unsubscribeOptions); _logger.LogInformation($"已取消订阅: {string.Join(", ", removedTopics)}"); - } // 更新本地已订阅主题列表 @@ -371,30 +372,44 @@ namespace ZR.Service.mqtt // plc网关抓取数据上传 if (topic.Contains("device/data/push")) { - DeviceUploadDataGatWayDto deviceUploadDataGatWayDto = JsonSerializer.Deserialize(payload); + DeviceUploadDataGatWayDto deviceUploadDataGatWayDto = + JsonSerializer.Deserialize(payload); // 这里添加设备消息处理逻辑 string deviceCode = topic.Split("/")[2]; - DeviceUploadData deviceUploadData = new() - { - FactoryCode = "上海干巷", - WorkshopCode = "涂装车间", - LineCode = "涂装生产线", - DeviceCode = deviceCode, - DictCode = "device_dict_plc_001", - Remark = "网关采集设备数据", - UploadTime = DateTime.Now, - CollectionTime = DateTimeOffset.FromUnixTimeMilliseconds(deviceUploadDataGatWayDto.Time).LocalDateTime, - Value01 = deviceUploadDataGatWayDto.DeviceParams.Value01.ToString(), - Value02 = deviceUploadDataGatWayDto.DeviceParams.Value02.ToString(), - Value03 = deviceUploadDataGatWayDto.DeviceParams.Value03.ToString(), - Value04 = deviceUploadDataGatWayDto.DeviceParams.Value04.ToString(), - Value05 = deviceUploadDataGatWayDto.DeviceParams.Value05.ToString(), - Value06 = deviceUploadDataGatWayDto.DeviceParams.Value06.ToString(), - Value07 = deviceUploadDataGatWayDto.DeviceParams.Value07.ToString(), - Value08 = deviceUploadDataGatWayDto.DeviceParams.Value08.ToString(), - Value09 = deviceUploadDataGatWayDto.DeviceParams.Value09.ToString(), - Value10 = deviceUploadDataGatWayDto.DeviceParams.Value10.ToString() - }; + + DeviceUploadData lastDeviceUploadData = Context + .Queryable() + .Where(it => it.DeviceCode == deviceCode) + .OrderByDescending(it => it.UploadTime) + .First(); + DateTime lastTime = lastDeviceUploadData.UploadTime; + + // 每一小时上传一下 + + DeviceUploadData deviceUploadData = + new() + { + FactoryCode = "上海干巷", + WorkshopCode = "涂装车间", + LineCode = "涂装生产线", + DeviceCode = deviceCode, + DictCode = "device_dict_plc_001", + Remark = "网关采集设备数据", + UploadTime = DateTime.Now, + CollectionTime = DateTimeOffset + .FromUnixTimeMilliseconds(deviceUploadDataGatWayDto.Time) + .LocalDateTime, + Value01 = deviceUploadDataGatWayDto.DeviceParams.Value01.ToString(), + Value02 = deviceUploadDataGatWayDto.DeviceParams.Value02.ToString(), + Value03 = deviceUploadDataGatWayDto.DeviceParams.Value03.ToString(), + Value04 = deviceUploadDataGatWayDto.DeviceParams.Value04.ToString(), + Value05 = deviceUploadDataGatWayDto.DeviceParams.Value05.ToString(), + Value06 = deviceUploadDataGatWayDto.DeviceParams.Value06.ToString(), + Value07 = deviceUploadDataGatWayDto.DeviceParams.Value07.ToString(), + Value08 = deviceUploadDataGatWayDto.DeviceParams.Value08.ToString(), + Value09 = deviceUploadDataGatWayDto.DeviceParams.Value09.ToString(), + Value10 = deviceUploadDataGatWayDto.DeviceParams.Value10.ToString() + }; Context.Insertable(deviceUploadData).ExecuteCommand(); } @@ -486,6 +501,7 @@ namespace ZR.Service.mqtt _logger.LogError(ex, "清理MQTT客户端资源时出错"); } } + public void Dispose() { Dispose(true);