using System; using System.Linq; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Infrastructure.Extensions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using ZR.Common.MqttHelper; using ZR.Model.dc; using ZR.Model.Dto; using ZR.Model.mes.md; namespace ZR.Service.mqtt { public class MqttService : BaseService, IHostedService, IDisposable { private readonly ILogger _logger; private readonly IConfiguration _configuration; private readonly IMqttClient _mqttClient; private readonly MyMqttConfig _mqttConfig; // 注入配置类 private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1); private readonly CancellationTokenSource _disposeCts = new CancellationTokenSource(); private Timer _reconnectTimer; private bool _disposed = false; private int _reconnectAttempts = 0; // 配置常量 private const string MqttConfigSection = "MqttConfig"; private const string TopicsConfigKey = "Topics"; private const int DefaultReconnectDelaySeconds = 5; private const int MaxReconnectAttempts = 10; private const int MaxReconnectDelaySeconds = 60; public MqttService( ILogger logger, IConfiguration configuration, MyMqttConfig mqttConfig ) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); _mqttConfig = mqttConfig ?? throw new ArgumentNullException(nameof(mqttConfig)); // 注入配置类 // 创建MQTT客户端 var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); // 注册事件处理程序 _mqttClient.ConnectedAsync += OnConnectedAsync; _mqttClient.DisconnectedAsync += OnDisconnectedAsync; _mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync; _logger.LogInformation($"MqttService 实例已创建,哈希值: {GetHashCode()}"); } public async Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("MQTT服务正在启动..."); try { await ConnectAsync(cancellationToken); _logger.LogInformation("MQTT服务已成功启动"); } catch (Exception ex) { _logger.LogCritical(ex, "MQTT服务启动失败,将在后台尝试重连"); ScheduleReconnect(); // 即使启动失败,也应该尝试重连 } } public async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("MQTT服务正在停止..."); // 取消所有计划的重连 _disposeCts.Cancel(); _reconnectTimer?.Dispose(); await DisconnectAsync(cancellationToken); _logger.LogInformation("MQTT服务已停止"); } private async Task ConnectAsync(CancellationToken cancellationToken) { await _connectionLock.WaitAsync(cancellationToken); try { if (_mqttClient.IsConnected) { _logger.LogDebug("MQTT客户端已连接,跳过连接操作"); return; } // 直接使用MyMqttConfig获取客户端选项 var options = _mqttConfig.BuildMqttClientOptions(); _logger.LogInformation($"正在连接到MQTT代理服务器 {options.ChannelOptions}..."); try { await _mqttClient.ConnectAsync(options, cancellationToken); _reconnectAttempts = 0; // 重置重连计数 _logger.LogInformation("已成功连接到MQTT代理服务器"); } catch (OperationCanceledException) { _logger.LogWarning("MQTT连接操作被取消"); throw; } catch (Exception ex) { _reconnectAttempts++; _logger.LogError( ex, $"连接MQTT代理服务器失败 (尝试次数: {_reconnectAttempts}/{MaxReconnectAttempts})" ); if (_reconnectAttempts >= MaxReconnectAttempts) { _logger.LogCritical("达到最大重连次数,停止尝试"); return; } ScheduleReconnect(); throw; } } finally { _connectionLock.Release(); } } private MqttClientOptions BuildMqttClientOptions(IConfigurationSection config) { var builder = new MqttClientOptionsBuilder() .WithClientId(config["ClientId"] ?? Guid.NewGuid().ToString()) .WithTcpServer(config["Server"], config.GetValue("Port", 1883)) .WithCleanSession(); return builder.Build(); } private async Task DisconnectAsync(CancellationToken cancellationToken) { if (!_mqttClient.IsConnected) { _logger.LogDebug("MQTT客户端未连接,跳过断开操作"); return; } try { await _mqttClient.DisconnectAsync(cancellationToken: cancellationToken); _logger.LogInformation("已成功断开与MQTT代理服务器的连接"); } catch (OperationCanceledException) { _logger.LogWarning("MQTT断开操作被取消"); throw; } catch (Exception ex) { _logger.LogError(ex, "断开MQTT连接时出错"); // 即使断开连接失败,也应该释放资源 CleanupClientResources(); } } private async Task OnConnectedAsync(MqttClientConnectedEventArgs e) { _logger.LogInformation($"MQTT连接已建立,会话是否存在: {e.ConnectResult.IsSessionPresent}"); // 无论会话是否存在,都检查并确保订阅配置的主题 // 这解决了配置文件修改后重启服务不订阅新主题的问题 await SubscribeToTopicsAsync(); } private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e) { _logger.LogWarning($"MQTT连接已断开 - 客户端是否之前已连接: {e.ClientWasConnected}"); if (_disposed || _disposeCts.IsCancellationRequested) { _logger.LogDebug("MQTT断开连接是预期行为,正在关闭服务"); return; } if (e.ClientWasConnected) { ScheduleReconnect(); } } private async Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) { try { var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); /*_logger.LogInformation( $"收到MQTT消息 - 主题: {e.ApplicationMessage.Topic}, QoS: {e.ApplicationMessage.QualityOfServiceLevel}" );*/ // 消息处理委托给专用的处理器 await ProcessMessageAsync(e.ApplicationMessage.Topic, payload); } catch (Exception ex) { _logger.LogError(ex, "处理MQTT消息时出错"); } } private async Task ProcessMessageAsync(string topic, string payload) { try { // 这里可以根据主题路由消息到不同的处理程序 switch (topic) { case string t when t.StartsWith("shgx_tz/device/"): await HandleDeviceMessage(topic, payload); break; case "system/alert": await HandleSystemAlert(payload); break; default: _logger.LogDebug($"未处理的MQTT主题: {topic}"); break; } } catch (Exception ex) { _logger.LogError(ex, $"处理主题 '{topic}' 的消息时出错"); } } private readonly HashSet _subscribedTopics = new HashSet(); // 线程安全可加锁 private async Task SubscribeToTopicsAsync() { if (!_mqttClient.IsConnected) { _logger.LogWarning("无法订阅:客户端未连接"); return; } var subscribeOptions = _mqttConfig.GetSubscribeToTopicsAsync(); if (subscribeOptions == null || !subscribeOptions.TopicFilters.Any()) { _logger.LogInformation("无订阅主题配置"); return; } // 提取需要订阅的主题 var topicsToSubscribe = subscribeOptions.TopicFilters.Select(f => f.Topic).ToList(); var topicFilters = subscribeOptions.TopicFilters.ToList(); // 获取当前已订阅的主题 HashSet currentlySubscribedTopics; lock (_subscribedTopics) { currentlySubscribedTopics = new HashSet(_subscribedTopics); } // 检查是否有新的主题需要订阅 var newTopics = topicsToSubscribe.Except(currentlySubscribedTopics).ToList(); var removedTopics = currentlySubscribedTopics.Except(topicsToSubscribe).ToList(); if (!newTopics.Any() && !removedTopics.Any()) { _logger.LogInformation("所有主题已正确订阅,无需变更"); return; } // 处理新增的主题 if (newTopics.Any()) { var newTopicFilters = topicFilters.Where(f => newTopics.Contains(f.Topic)).ToList(); if (newTopicFilters.Any()) { var newSubscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder(); foreach (var topicFilter in newTopicFilters) { newSubscribeOptionsBuilder.WithTopicFilter(topicFilter); } var newSubscribeOptions = newSubscribeOptionsBuilder.Build(); _logger.LogInformation($"订阅新主题: {string.Join(", ", newTopics)}"); var result = await _mqttClient.SubscribeAsync(newSubscribeOptions); foreach (var item in result.Items) { _logger.LogInformation( $"订阅结果:{item.TopicFilter.Topic} -> {item.ResultCode}" ); } } } // 处理移除的主题(可选,根据业务需求决定是否取消订阅) if (removedTopics.Any()) { _logger.LogInformation($"配置中移除的主题: {string.Join(", ", removedTopics)}"); // 注意:这里没有执行取消订阅操作,因为有些场景下可能希望保留历史订阅 // 如果需要取消订阅,请取消注释下面的代码 // 使用构建器模式创建取消订阅选项 var unsubscribeOptionsBuilder = new MqttClientUnsubscribeOptionsBuilder(); // 为每个要取消订阅的主题调用WithTopic方法 foreach (var topic in removedTopics) { unsubscribeOptionsBuilder.WithTopicFilter(topic); } var unsubscribeOptions = unsubscribeOptionsBuilder.Build(); await _mqttClient.UnsubscribeAsync(unsubscribeOptions); _logger.LogInformation($"已取消订阅: {string.Join(", ", removedTopics)}"); } // 更新本地已订阅主题列表 lock (_subscribedTopics) { _subscribedTopics.Clear(); _subscribedTopics.UnionWith(topicsToSubscribe); } } private void ScheduleReconnect() { // 实现指数退避算法 var delaySeconds = Math.Min( DefaultReconnectDelaySeconds * (int)Math.Pow(2, Math.Min(_reconnectAttempts, 5)), MaxReconnectDelaySeconds ); _logger.LogInformation($"计划在 {delaySeconds} 秒后尝试重新连接MQTT代理服务器"); // 使用Timer替代Task.Run,更好地控制资源 _reconnectTimer?.Dispose(); _reconnectTimer = new Timer( async _ => { if (_disposed || _disposeCts.IsCancellationRequested) { _logger.LogDebug("跳过重连:服务正在关闭"); return; } try { await ConnectAsync(_disposeCts.Token); } catch (Exception ex) { _logger.LogError(ex, "计划的MQTT重连失败"); } }, null, TimeSpan.FromSeconds(delaySeconds), Timeout.InfiniteTimeSpan ); } private Task HandleDeviceMessage(string topic, string payload) { //_logger.LogInformation($"处理设备消息: {topic} - {payload}"); // plc网关抓取数据上传 if (topic.Contains("device/data/push")) { DeviceUploadDataGatWayDto deviceUploadDataGatWayDto = JsonSerializer.Deserialize(payload); // 这里添加设备消息处理逻辑 string deviceCode = topic.Split("/")[2]; DeviceUploadData lastDeviceUploadData = Context .Queryable() .Where(it => it.DeviceCode == deviceCode) .OrderByDescending(it => it.UploadTime) .First(); DateTime lastTime = lastDeviceUploadData.UploadTime; // 每一小时上传一下 DeviceUploadData deviceUploadData = new() { FactoryCode = "上海干巷", WorkshopCode = "涂装车间", LineCode = "涂装生产线", DeviceCode = deviceCode, DictCode = "device_dict_plc_001", Remark = "网关采集设备数据", UploadTime = DateTime.Now, CollectionTime = DateTimeOffset .FromUnixTimeMilliseconds(deviceUploadDataGatWayDto.Time) .LocalDateTime, Value01 = deviceUploadDataGatWayDto.DeviceParams.Value01.ToString(), Value02 = deviceUploadDataGatWayDto.DeviceParams.Value02.ToString(), Value03 = deviceUploadDataGatWayDto.DeviceParams.Value03.ToString(), Value04 = deviceUploadDataGatWayDto.DeviceParams.Value04.ToString(), Value05 = deviceUploadDataGatWayDto.DeviceParams.Value05.ToString(), Value06 = deviceUploadDataGatWayDto.DeviceParams.Value06.ToString(), Value07 = deviceUploadDataGatWayDto.DeviceParams.Value07.ToString(), Value08 = deviceUploadDataGatWayDto.DeviceParams.Value08.ToString(), Value09 = deviceUploadDataGatWayDto.DeviceParams.Value09.ToString(), Value10 = deviceUploadDataGatWayDto.DeviceParams.Value10.ToString() }; Context.Insertable(deviceUploadData).ExecuteCommand(); } return Task.CompletedTask; } private Task HandleSystemAlert(string payload) { _logger.LogWarning($"系统警报: {payload}"); // 这里添加系统警报处理逻辑 return Task.CompletedTask; } public async Task PublishAsync( string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, bool retain = false ) { if (string.IsNullOrEmpty(topic)) { throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空"); } if (payload == null) { throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空"); } await _connectionLock.WaitAsync(_disposeCts.Token); try { if (!_mqttClient.IsConnected) { _logger.LogWarning("发布消息前需要连接MQTT代理服务器"); //await ConnectAsync(_disposeCts.Token); throw new Exception("发布消息前需要连接MQTT代理服务器"); } var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) .WithRetainFlag(retain) .Build(); _logger.LogDebug($"准备发布MQTT消息 - 主题: {topic}, QoS: {qos}, 保留: {retain}"); try { var result = await _mqttClient.PublishAsync(message, _disposeCts.Token); _logger.LogInformation($"已发布MQTT消息 - 主题: {topic}, 结果: {result.ReasonCode}"); } catch (OperationCanceledException) { _logger.LogWarning("MQTT发布操作被取消"); throw; } } catch (Exception ex) { _logger.LogError(ex, $"发布MQTT消息失败 - 主题: {topic}"); throw; } finally { _connectionLock.Release(); } } private void CleanupClientResources() { try { _mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceivedAsync; _mqttClient.ConnectedAsync -= OnConnectedAsync; _mqttClient.DisconnectedAsync -= OnDisconnectedAsync; if (_mqttClient.IsConnected) { _mqttClient.DisconnectAsync().GetAwaiter().GetResult(); } _mqttClient.Dispose(); } catch (Exception ex) { _logger.LogError(ex, "清理MQTT客户端资源时出错"); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (!_disposed) { if (disposing) { _disposeCts.Cancel(); _reconnectTimer?.Dispose(); _connectionLock.Dispose(); CleanupClientResources(); _disposeCts.Dispose(); } _disposed = true; _logger.LogDebug("MQTT服务已释放所有资源"); } } } }