shgx_tz_mom/ZR.Service/mqtt/MqttService.cs
赵正易 657d1b7ab6 refactor(mqtt): 优化MQTT服务代码格式和数据处理逻辑
- 调整using语句的顺序以符合规范
- 格式化日志输出和代码缩进
- 在设备数据处理中添加时间间隔检查,每小时上传一次数据
- 优化取消订阅逻辑的代码结构
2025-09-21 14:44:10 +08:00

530 lines
20 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Extensions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using ZR.Common.MqttHelper;
using ZR.Model.dc;
using ZR.Model.Dto;
using ZR.Model.mes.md;
namespace ZR.Service.mqtt
{
public class MqttService : BaseService<DeviceUploadData>, IHostedService, IDisposable
{
private readonly ILogger<MqttService> _logger;
private readonly IConfiguration _configuration;
private readonly IMqttClient _mqttClient;
private readonly MyMqttConfig _mqttConfig; // 注入配置类
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
private readonly CancellationTokenSource _disposeCts = new CancellationTokenSource();
private Timer _reconnectTimer;
private bool _disposed = false;
private int _reconnectAttempts = 0;
// 配置常量
private const string MqttConfigSection = "MqttConfig";
private const string TopicsConfigKey = "Topics";
private const int DefaultReconnectDelaySeconds = 5;
private const int MaxReconnectAttempts = 10;
private const int MaxReconnectDelaySeconds = 60;
public MqttService(
ILogger<MqttService> logger,
IConfiguration configuration,
MyMqttConfig mqttConfig
)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration =
configuration ?? throw new ArgumentNullException(nameof(configuration));
_mqttConfig = mqttConfig ?? throw new ArgumentNullException(nameof(mqttConfig)); // 注入配置类
// 创建MQTT客户端
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 注册事件处理程序
_mqttClient.ConnectedAsync += OnConnectedAsync;
_mqttClient.DisconnectedAsync += OnDisconnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
_logger.LogInformation($"MqttService 实例已创建,哈希值: {GetHashCode()}");
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MQTT服务正在启动...");
try
{
await ConnectAsync(cancellationToken);
_logger.LogInformation("MQTT服务已成功启动");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "MQTT服务启动失败将在后台尝试重连");
ScheduleReconnect(); // 即使启动失败,也应该尝试重连
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MQTT服务正在停止...");
// 取消所有计划的重连
_disposeCts.Cancel();
_reconnectTimer?.Dispose();
await DisconnectAsync(cancellationToken);
_logger.LogInformation("MQTT服务已停止");
}
private async Task ConnectAsync(CancellationToken cancellationToken)
{
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (_mqttClient.IsConnected)
{
_logger.LogDebug("MQTT客户端已连接跳过连接操作");
return;
}
// 直接使用MyMqttConfig获取客户端选项
var options = _mqttConfig.BuildMqttClientOptions();
_logger.LogInformation($"正在连接到MQTT代理服务器 {options.ChannelOptions}...");
try
{
await _mqttClient.ConnectAsync(options, cancellationToken);
_reconnectAttempts = 0; // 重置重连计数
_logger.LogInformation("已成功连接到MQTT代理服务器");
}
catch (OperationCanceledException)
{
_logger.LogWarning("MQTT连接操作被取消");
throw;
}
catch (Exception ex)
{
_reconnectAttempts++;
_logger.LogError(
ex,
$"连接MQTT代理服务器失败 (尝试次数: {_reconnectAttempts}/{MaxReconnectAttempts})"
);
if (_reconnectAttempts >= MaxReconnectAttempts)
{
_logger.LogCritical("达到最大重连次数,停止尝试");
return;
}
ScheduleReconnect();
throw;
}
}
finally
{
_connectionLock.Release();
}
}
private MqttClientOptions BuildMqttClientOptions(IConfigurationSection config)
{
var builder = new MqttClientOptionsBuilder()
.WithClientId(config["ClientId"] ?? Guid.NewGuid().ToString())
.WithTcpServer(config["Server"], config.GetValue<int>("Port", 1883))
.WithCleanSession();
return builder.Build();
}
private async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (!_mqttClient.IsConnected)
{
_logger.LogDebug("MQTT客户端未连接跳过断开操作");
return;
}
try
{
await _mqttClient.DisconnectAsync(cancellationToken: cancellationToken);
_logger.LogInformation("已成功断开与MQTT代理服务器的连接");
}
catch (OperationCanceledException)
{
_logger.LogWarning("MQTT断开操作被取消");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "断开MQTT连接时出错");
// 即使断开连接失败,也应该释放资源
CleanupClientResources();
}
}
private async Task OnConnectedAsync(MqttClientConnectedEventArgs e)
{
_logger.LogInformation($"MQTT连接已建立会话是否存在: {e.ConnectResult.IsSessionPresent}");
// 无论会话是否存在,都检查并确保订阅配置的主题
// 这解决了配置文件修改后重启服务不订阅新主题的问题
await SubscribeToTopicsAsync();
}
private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger.LogWarning($"MQTT连接已断开 - 客户端是否之前已连接: {e.ClientWasConnected}");
if (_disposed || _disposeCts.IsCancellationRequested)
{
_logger.LogDebug("MQTT断开连接是预期行为正在关闭服务");
return;
}
if (e.ClientWasConnected)
{
ScheduleReconnect();
}
}
private async Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
/*_logger.LogInformation(
$"收到MQTT消息 - 主题: {e.ApplicationMessage.Topic}, QoS: {e.ApplicationMessage.QualityOfServiceLevel}"
);*/
// 消息处理委托给专用的处理器
await ProcessMessageAsync(e.ApplicationMessage.Topic, payload);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理MQTT消息时出错");
}
}
private async Task ProcessMessageAsync(string topic, string payload)
{
try
{
// 这里可以根据主题路由消息到不同的处理程序
switch (topic)
{
case string t when t.StartsWith("shgx_tz/device/"):
await HandleDeviceMessage(topic, payload);
break;
case "system/alert":
await HandleSystemAlert(payload);
break;
default:
_logger.LogDebug($"未处理的MQTT主题: {topic}");
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理主题 '{topic}' 的消息时出错");
}
}
private readonly HashSet<string> _subscribedTopics = new HashSet<string>(); // 线程安全可加锁
private async Task SubscribeToTopicsAsync()
{
if (!_mqttClient.IsConnected)
{
_logger.LogWarning("无法订阅:客户端未连接");
return;
}
var subscribeOptions = _mqttConfig.GetSubscribeToTopicsAsync();
if (subscribeOptions == null || !subscribeOptions.TopicFilters.Any())
{
_logger.LogInformation("无订阅主题配置");
return;
}
// 提取需要订阅的主题
var topicsToSubscribe = subscribeOptions.TopicFilters.Select(f => f.Topic).ToList();
var topicFilters = subscribeOptions.TopicFilters.ToList();
// 获取当前已订阅的主题
HashSet<string> currentlySubscribedTopics;
lock (_subscribedTopics)
{
currentlySubscribedTopics = new HashSet<string>(_subscribedTopics);
}
// 检查是否有新的主题需要订阅
var newTopics = topicsToSubscribe.Except(currentlySubscribedTopics).ToList();
var removedTopics = currentlySubscribedTopics.Except(topicsToSubscribe).ToList();
if (!newTopics.Any() && !removedTopics.Any())
{
_logger.LogInformation("所有主题已正确订阅,无需变更");
return;
}
// 处理新增的主题
if (newTopics.Any())
{
var newTopicFilters = topicFilters.Where(f => newTopics.Contains(f.Topic)).ToList();
if (newTopicFilters.Any())
{
var newSubscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder();
foreach (var topicFilter in newTopicFilters)
{
newSubscribeOptionsBuilder.WithTopicFilter(topicFilter);
}
var newSubscribeOptions = newSubscribeOptionsBuilder.Build();
_logger.LogInformation($"订阅新主题: {string.Join(", ", newTopics)}");
var result = await _mqttClient.SubscribeAsync(newSubscribeOptions);
foreach (var item in result.Items)
{
_logger.LogInformation(
$"订阅结果:{item.TopicFilter.Topic} -> {item.ResultCode}"
);
}
}
}
// 处理移除的主题(可选,根据业务需求决定是否取消订阅)
if (removedTopics.Any())
{
_logger.LogInformation($"配置中移除的主题: {string.Join(", ", removedTopics)}");
// 注意:这里没有执行取消订阅操作,因为有些场景下可能希望保留历史订阅
// 如果需要取消订阅,请取消注释下面的代码
// 使用构建器模式创建取消订阅选项
var unsubscribeOptionsBuilder = new MqttClientUnsubscribeOptionsBuilder();
// 为每个要取消订阅的主题调用WithTopic方法
foreach (var topic in removedTopics)
{
unsubscribeOptionsBuilder.WithTopicFilter(topic);
}
var unsubscribeOptions = unsubscribeOptionsBuilder.Build();
await _mqttClient.UnsubscribeAsync(unsubscribeOptions);
_logger.LogInformation($"已取消订阅: {string.Join(", ", removedTopics)}");
}
// 更新本地已订阅主题列表
lock (_subscribedTopics)
{
_subscribedTopics.Clear();
_subscribedTopics.UnionWith(topicsToSubscribe);
}
}
private void ScheduleReconnect()
{
// 实现指数退避算法
var delaySeconds = Math.Min(
DefaultReconnectDelaySeconds * (int)Math.Pow(2, Math.Min(_reconnectAttempts, 5)),
MaxReconnectDelaySeconds
);
_logger.LogInformation($"计划在 {delaySeconds} 秒后尝试重新连接MQTT代理服务器");
// 使用Timer替代Task.Run更好地控制资源
_reconnectTimer?.Dispose();
_reconnectTimer = new Timer(
async _ =>
{
if (_disposed || _disposeCts.IsCancellationRequested)
{
_logger.LogDebug("跳过重连:服务正在关闭");
return;
}
try
{
await ConnectAsync(_disposeCts.Token);
}
catch (Exception ex)
{
_logger.LogError(ex, "计划的MQTT重连失败");
}
},
null,
TimeSpan.FromSeconds(delaySeconds),
Timeout.InfiniteTimeSpan
);
}
private Task HandleDeviceMessage(string topic, string payload)
{
//_logger.LogInformation($"处理设备消息: {topic} - {payload}");
// plc网关抓取数据上传
if (topic.Contains("device/data/push"))
{
DeviceUploadDataGatWayDto deviceUploadDataGatWayDto =
JsonSerializer.Deserialize<DeviceUploadDataGatWayDto>(payload);
// 这里添加设备消息处理逻辑
string deviceCode = topic.Split("/")[2];
DeviceUploadData lastDeviceUploadData = Context
.Queryable<DeviceUploadData>()
.Where(it => it.DeviceCode == deviceCode)
.OrderByDescending(it => it.UploadTime)
.First();
DateTime lastTime = lastDeviceUploadData.UploadTime;
// 每一小时上传一下
DeviceUploadData deviceUploadData =
new()
{
FactoryCode = "上海干巷",
WorkshopCode = "涂装车间",
LineCode = "涂装生产线",
DeviceCode = deviceCode,
DictCode = "device_dict_plc_001",
Remark = "网关采集设备数据",
UploadTime = DateTime.Now,
CollectionTime = DateTimeOffset
.FromUnixTimeMilliseconds(deviceUploadDataGatWayDto.Time)
.LocalDateTime,
Value01 = deviceUploadDataGatWayDto.DeviceParams.Value01.ToString(),
Value02 = deviceUploadDataGatWayDto.DeviceParams.Value02.ToString(),
Value03 = deviceUploadDataGatWayDto.DeviceParams.Value03.ToString(),
Value04 = deviceUploadDataGatWayDto.DeviceParams.Value04.ToString(),
Value05 = deviceUploadDataGatWayDto.DeviceParams.Value05.ToString(),
Value06 = deviceUploadDataGatWayDto.DeviceParams.Value06.ToString(),
Value07 = deviceUploadDataGatWayDto.DeviceParams.Value07.ToString(),
Value08 = deviceUploadDataGatWayDto.DeviceParams.Value08.ToString(),
Value09 = deviceUploadDataGatWayDto.DeviceParams.Value09.ToString(),
Value10 = deviceUploadDataGatWayDto.DeviceParams.Value10.ToString()
};
Context.Insertable(deviceUploadData).ExecuteCommand();
}
return Task.CompletedTask;
}
private Task HandleSystemAlert(string payload)
{
_logger.LogWarning($"系统警报: {payload}");
// 这里添加系统警报处理逻辑
return Task.CompletedTask;
}
public async Task PublishAsync(
string topic,
string payload,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce,
bool retain = false
)
{
if (string.IsNullOrEmpty(topic))
{
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
}
if (payload == null)
{
throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空");
}
await _connectionLock.WaitAsync(_disposeCts.Token);
try
{
if (!_mqttClient.IsConnected)
{
_logger.LogWarning("发布消息前需要连接MQTT代理服务器");
//await ConnectAsync(_disposeCts.Token);
throw new Exception("发布消息前需要连接MQTT代理服务器");
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(retain)
.Build();
_logger.LogDebug($"准备发布MQTT消息 - 主题: {topic}, QoS: {qos}, 保留: {retain}");
try
{
var result = await _mqttClient.PublishAsync(message, _disposeCts.Token);
_logger.LogInformation($"已发布MQTT消息 - 主题: {topic}, 结果: {result.ReasonCode}");
}
catch (OperationCanceledException)
{
_logger.LogWarning("MQTT发布操作被取消");
throw;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"发布MQTT消息失败 - 主题: {topic}");
throw;
}
finally
{
_connectionLock.Release();
}
}
private void CleanupClientResources()
{
try
{
_mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceivedAsync;
_mqttClient.ConnectedAsync -= OnConnectedAsync;
_mqttClient.DisconnectedAsync -= OnDisconnectedAsync;
if (_mqttClient.IsConnected)
{
_mqttClient.DisconnectAsync().GetAwaiter().GetResult();
}
_mqttClient.Dispose();
}
catch (Exception ex)
{
_logger.LogError(ex, "清理MQTT客户端资源时出错");
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_disposeCts.Cancel();
_reconnectTimer?.Dispose();
_connectionLock.Dispose();
CleanupClientResources();
_disposeCts.Dispose();
}
_disposed = true;
_logger.LogDebug("MQTT服务已释放所有资源");
}
}
}
}