using MDM.Services.Plant; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RIZO.Admin.WebApi.PLC.Model; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace RIZO.Admin.WebApi.PLC.Service { public class PlcHostedService : BackgroundService, IDisposable { private readonly ILogger _logger; private readonly PlcService _plcService; private Timer _timer; private bool _isRunning; private readonly object _timerLock = new object(); // 1. 按IP隔离信号量(避免单PLC故障阻塞所有) private readonly ConcurrentDictionary _plcSemaphores = new(); // 2. 连接状态增强(分级+失败次数+最后请求时间) private readonly ConcurrentDictionary _connectionStatusCache = new(); // 3. 配置刷新 private Timer _configRefreshTimer; private readonly SemaphoreSlim _configRefreshLock = new(1, 1); // 基础配置(配置驱动+热更新) private readonly IOptionsMonitor _pollingSettingsMonitor; private PlcPollingSettings _currentSettings; private SemaphoreSlim _globalSemaphore; private readonly SemaphoreSlim _timerAsyncLock = new(1, 1); // PLC配置列表(支持动态刷新) private List _plcConfigs = new(); private PlantWorkstationService _plantWorkstationService; /// /// 增强版PLC连接状态 /// private class PlcConnectionStatus { public bool IsConnected { get; set; } public DateTime LastCheckTime { get; set; } public int FailCount { get; set; } // 连续失败次数 public DateTime LastRequestTime { get; set; } // 最后有数据请求的时间 public ConnectionLevel Level { get; set; } = ConnectionLevel.Disconnected; } /// /// 连接状态分级 /// private enum ConnectionLevel { Disconnected = 0, // 断开 Weak = 1, // 弱连接(偶尔失败) Normal = 2 // 正常连接 } public PlcHostedService( ILogger logger, PlcService plcService, IOptionsMonitor pollingSettingsMonitor = null) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _plcService = plcService ?? throw new ArgumentNullException(nameof(plcService)); _plantWorkstationService = new PlantWorkstationService(); // 配置初始化(支持热更新,无配置时用默认值) _pollingSettingsMonitor = pollingSettingsMonitor; _currentSettings = pollingSettingsMonitor?.CurrentValue ?? new PlcPollingSettings(); // 监听配置热更新 if (_pollingSettingsMonitor != null) { _pollingSettingsMonitor.OnChange(newSettings => { _currentSettings = newSettings; _logger.LogInformation("PLC轮询配置已热更新,新配置:{Settings}", Newtonsoft.Json.JsonConvert.SerializeObject(newSettings)); // 配置变更后立即调整定时器频率 AdjustTimerInterval(); // 重新初始化全局信号量(并行度变更时生效) _globalSemaphore?.Dispose(); _globalSemaphore = new SemaphoreSlim( _currentSettings.GlobalParallelDegree, _currentSettings.GlobalParallelDegree); }); } // 初始化全局信号量 _globalSemaphore = new SemaphoreSlim( _currentSettings.GlobalParallelDegree, _currentSettings.GlobalParallelDegree); // 初始化PLC配置 _ = RefreshPlcConfigsAsync(); } /// /// 动态刷新PLC配置(支持热更新) /// private async Task RefreshPlcConfigsAsync() { if (!await _configRefreshLock.WaitAsync(0)) return; try { var newConfigs = new List(); try { newConfigs = _plantWorkstationService.Queryable() .Where(it => it.Status == 1) .Select(it => new PlcConfig { PlcName = it.WorkstationCode, Ip = it.PlcIP, Rack = (short)it.Rack, // 空值兜底 Slot = (short)it.Slot // 空值兜底 }) .ToList(); } catch (Exception ex) { _logger.LogError(ex, "刷新PLC配置异常,使用旧配置"); return; } // 更新配置并初始化状态缓存 if (newConfigs.Any()) { _plcConfigs = newConfigs; foreach (var config in _plcConfigs) { // 初始化IP隔离信号量(从配置读取并发数) _plcSemaphores.TryAdd(config.Ip, new SemaphoreSlim( _currentSettings.MaxConcurrentPerPlc, _currentSettings.MaxConcurrentPerPlc)); // 初始化连接状态 _connectionStatusCache.TryAdd(config.Ip, new PlcConnectionStatus { IsConnected = false, LastCheckTime = DateTime.MinValue, FailCount = 0, LastRequestTime = DateTime.MinValue, Level = ConnectionLevel.Disconnected }); } _logger.LogInformation($"刷新PLC配置完成,当前有效配置数:{_plcConfigs.Count}"); } } finally { _configRefreshLock.Release(); } } /// /// 重写BackgroundService的ExecuteAsync /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("PLC后台监听服务启动中..."); if (!_plcConfigs.Any()) { _logger.LogWarning("未配置有效PLC参数,跳过PLC自动连接"); return; } _isRunning = true; // 1. 启动时批量并行连接PLC await BatchConnectPlcAsync(stoppingToken); // 2. 启动配置刷新定时器(从配置读取间隔) _configRefreshTimer = new Timer( async _ => await RefreshPlcConfigsAsync(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_currentSettings.ConfigRefreshInterval)); // 3. 启动数据轮询定时器(初始使用空闲频率) _timer = new Timer( TimerCallback, null, TimeSpan.Zero, TimeSpan.FromSeconds(_currentSettings.IdlePollInterval)); _logger.LogInformation($"PLC服务启动完成 | 全局并行度:{_currentSettings.GlobalParallelDegree} | 设备总数:{_plcConfigs.Count}"); // 等待停止信号 await Task.Delay(Timeout.Infinite, stoppingToken); // 停止所有定时器 _timer?.Change(Timeout.Infinite, 0); _configRefreshTimer?.Change(Timeout.Infinite, 0); _logger.LogInformation("PLC后台监听服务已收到停止信号"); } /// /// 优化版定时器回调(动态调整轮询频率) /// private async void TimerCallback(object state) { if (!_isRunning) { _logger.LogDebug("PLC服务已停止,跳过轮询"); return; } // 异步锁防止重叠执行 if (!await _timerAsyncLock.WaitAsync(0)) { _logger.LogDebug("前一轮PLC轮询未完成,跳过本次执行"); return; } try { await PollPlcDataAsync(); // 动态调整定时器频率(全局自适应) AdjustTimerInterval(); } catch (Exception ex) { _logger.LogError(ex, "PLC轮询回调异常"); } finally { _timerAsyncLock.Release(); } } /// /// 动态调整定时器轮询频率 /// private void AdjustTimerInterval() { // 统计活跃时长内有数据请求的PLC数量 var activePlcCount = _connectionStatusCache.Values .Count(s => (DateTime.Now - s.LastRequestTime).TotalSeconds <= _currentSettings.ActiveDuration); // 有活跃PLC则使用高频,否则使用低频 var newInterval = activePlcCount > 0 ? TimeSpan.FromSeconds(_currentSettings.ActivePollInterval) : TimeSpan.FromSeconds(_currentSettings.IdlePollInterval); // 仅当频率变化时更新定时器 if (_timer != null) { try { _timer.Change(newInterval, newInterval); _logger.LogDebug($"调整轮询频率:{newInterval.TotalSeconds}s(活跃PLC数:{activePlcCount})"); } catch (Exception ex) { _logger.LogError(ex, "调整轮询频率失败"); } } } /// /// 启动时批量并行连接PLC(带指数退避重试) /// private async Task BatchConnectPlcAsync(CancellationToken cancellationToken) { _logger.LogInformation("开始批量连接所有PLC..."); var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList(); if (!validConfigs.Any()) { _logger.LogWarning("无有效PLC配置,跳过批量连接"); return; } var tasks = validConfigs.Select(async config => { await _globalSemaphore.WaitAsync(cancellationToken); try { // 指数退避重试连接 bool connectSuccess = false; for (int retry = 0; retry < _currentSettings.MaxRetryTimes; retry++) { try { using var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutTokenSource.CancelAfter(TimeSpan.FromSeconds(_currentSettings.ConnectTimeoutSeconds)); var result = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token); connectSuccess = result.ConnectSuccess; if (connectSuccess) break; // 指数退避等待(从配置读取初始间隔) var waitTime = TimeSpan.FromSeconds(_currentSettings.InitialRetryInterval * Math.Pow(2, retry)); await Task.Delay(waitTime, cancellationToken); } catch (OperationCanceledException) { _logger.LogWarning($"[{config.PlcName}] 连接重试{retry + 1}次超时 | IP:{config.Ip}"); } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 连接重试{retry + 1}次异常 | IP:{config.Ip}"); } } // 更新连接状态 if (_connectionStatusCache.TryGetValue(config.Ip, out var status)) { status.IsConnected = connectSuccess; status.LastCheckTime = DateTime.Now; status.FailCount = connectSuccess ? 0 : status.FailCount + 1; status.Level = connectSuccess ? ConnectionLevel.Normal : ConnectionLevel.Disconnected; } if (connectSuccess) _logger.LogInformation($"[{config.PlcName}] 连接成功 | IP:{config.Ip}"); else _logger.LogWarning($"[{config.PlcName}] 连接失败({_currentSettings.MaxRetryTimes}次重试) | IP:{config.Ip}"); } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 批量连接异常 | IP:{config.Ip}"); } finally { _globalSemaphore.Release(); } }); await Task.WhenAll(tasks); } /// /// 优化版并行轮询PLC数据(IP隔离+动态频率+异常闭环) /// private async Task PollPlcDataAsync() { var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList(); if (!validConfigs.Any()) { _logger.LogWarning("无有效PLC配置,跳过数据轮询"); return; } // 统计本次轮询结果 int successCount = 0, failCount = 0, skipCount = 0, activeCount = 0; var tasks = validConfigs.Select(async config => { // 获取IP隔离信号量 if (!_plcSemaphores.TryGetValue(config.Ip, out var plcSemaphore)) { _logger.LogWarning($"[{config.PlcName}] 未找到IP隔离信号量,跳过 | IP:{config.Ip}"); Interlocked.Increment(ref skipCount); return; } await _globalSemaphore.WaitAsync(); await plcSemaphore.WaitAsync(); try { if (!_connectionStatusCache.TryGetValue(config.Ip, out var status)) { _logger.LogWarning($"[{config.PlcName}] 未找到连接状态缓存,跳过 | IP:{config.Ip}"); Interlocked.Increment(ref skipCount); return; } // 1. 检查连接状态缓存是否过期 bool needRecheck = (DateTime.Now - status.LastCheckTime).TotalSeconds > _currentSettings.ConfigRefreshInterval; bool isConnected = status.IsConnected; // 2. 缓存过期则重新检查连接 if (needRecheck) { isConnected = await RecheckPlcConnectionAsync(config, status); } // 3. 连接断开且失败次数超阈值,触发告警 if (!isConnected && status.FailCount >= _currentSettings.MaxRetryTimes) { _logger.LogError($"[{config.PlcName}] 连续{status.FailCount}次连接失败,触发告警 | IP:{config.Ip}"); Interlocked.Increment(ref skipCount); return; } // 4. 连接断开则跳过读取 if (!isConnected) { _logger.LogDebug($"[{config.PlcName}] 连接断开,跳过数据读取 | IP:{config.Ip}"); Interlocked.Increment(ref skipCount); return; } // 5. 读取PLC生产数据 var (success, prodData, message) = await _plcService.ReadProductionDataAsync( config.Ip, config.PlcName, config.Rack, config.Slot); if (success) { // 更新最后请求时间(标记为活跃) status.LastRequestTime = DateTime.Now; status.Level = ConnectionLevel.Normal; status.FailCount = 0; Interlocked.Increment(ref successCount); // 有有效数据则计数 if (prodData != null) { Interlocked.Increment(ref activeCount); await ProcessPlcProductionDataAsync(config, prodData); } } else { // 读取失败更新状态 status.FailCount++; status.Level = status.FailCount <= 1 ? ConnectionLevel.Weak : ConnectionLevel.Disconnected; Interlocked.Increment(ref failCount); } } catch (OperationCanceledException) { _logger.LogWarning($"[{config.PlcName}] 轮询操作超时 | IP:{config.Ip}"); UpdateStatusOnFailure(config.Ip); Interlocked.Increment(ref failCount); } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 轮询异常 | IP:{config.Ip}"); UpdateStatusOnFailure(config.Ip); Interlocked.Increment(ref failCount); } finally { plcSemaphore.Release(); _globalSemaphore.Release(); } }); await Task.WhenAll(tasks); // 输出本轮轮询统计 //_logger.LogInformation($"PLC轮询完成 | 成功:{successCount} | 失败:{failCount} | 跳过:{skipCount} | 有数据:{activeCount}"); } /// /// 重新检查PLC连接状态 /// private async Task RecheckPlcConnectionAsync(PlcConfig config, PlcConnectionStatus status) { try { using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(_currentSettings.ConnectTimeoutSeconds)); var connectResult = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token); // 更新状态 status.IsConnected = connectResult.ConnectSuccess; status.LastCheckTime = DateTime.Now; status.FailCount = connectResult.ConnectSuccess ? 0 : status.FailCount + 1; status.Level = connectResult.ConnectSuccess ? ConnectionLevel.Normal : ConnectionLevel.Disconnected; if (!connectResult.ConnectSuccess) { _logger.LogDebug($"[{config.PlcName}] 连接缓存过期,重新检测仍断开 | IP:{config.Ip}"); } return connectResult.ConnectSuccess; } catch (OperationCanceledException) { _logger.LogWarning($"[{config.PlcName}] 连接重检超时 | IP:{config.Ip}"); status.IsConnected = false; status.LastCheckTime = DateTime.Now; status.FailCount++; status.Level = ConnectionLevel.Disconnected; return false; } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 连接重检异常 | IP:{config.Ip}"); status.IsConnected = false; status.LastCheckTime = DateTime.Now; status.FailCount++; status.Level = ConnectionLevel.Disconnected; return false; } } /// /// 失败时更新连接状态 /// private void UpdateStatusOnFailure(string ip) { if (_connectionStatusCache.TryGetValue(ip, out var status)) { status.IsConnected = false; status.FailCount++; status.Level = status.FailCount <= 1 ? ConnectionLevel.Weak : ConnectionLevel.Disconnected; } } /// /// 处理PLC生产数据(可扩展) /// private async Task ProcessPlcProductionDataAsync(PlcConfig config, PlcProductionData prodData) { try { // 业务处理逻辑:入库/推MQ/缓存等 // 示例:await _plcDataService.SaveProductionDataAsync(prodData); await Task.CompletedTask; } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 生产数据处理失败 | IP:{config.Ip}"); } } /// /// 重写停止逻辑 /// public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("PLC后台监听服务停止中..."); _isRunning = false; // 停止所有定时器 _timer?.Change(Timeout.Infinite, 0); _configRefreshTimer?.Change(Timeout.Infinite, 0); // 等待当前轮询完成 await Task.Delay(1000, cancellationToken); await base.StopAsync(cancellationToken); _logger.LogInformation("PLC后台监听服务已停止"); } /// /// 完整释放资源 /// public override void Dispose() { // 释放定时器 _timer?.Dispose(); _configRefreshTimer?.Dispose(); // 释放信号量 _globalSemaphore?.Dispose(); _timerAsyncLock?.Dispose(); _configRefreshLock?.Dispose(); // 释放IP隔离信号量 foreach (var semaphore in _plcSemaphores.Values) { semaphore.Dispose(); } base.Dispose(); _logger.LogInformation("PLC后台监听服务已释放所有资源"); } } }