using MDM.Services.Plant; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RIZO.Admin.WebApi.PLC.Model; using RIZO.Admin.WebApi.PLC.Service; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace RIZO.Admin.WebApi.PLC.Service { public class PlcHostedService : BackgroundService, IDisposable { private readonly ILogger _logger; private readonly PlcService _plcService; private readonly List _plcConfigs; private Timer _timer; private bool _isRunning; private readonly SemaphoreSlim _semaphore; private readonly object _timerLock = new object(); // 连接状态缓存:减少短时间内重复连接测试 private readonly ConcurrentDictionary _connectionStateCache; // 可配置参数(建议放到配置文件中,通过IOptions注入) private readonly int _parallelDegree = 15; // 并行度(20+PLC建议8-12) private readonly double _pollingIntervalSeconds = 0.2; // 轮询间隔 private readonly int _connectTimeoutSeconds = 1; // 单个PLC连接超时时间 private readonly int _stateCacheExpireSeconds = 5; // 连接状态缓存有效期 private PlantWorkstationService plantWorkstationService = new PlantWorkstationService(); /// /// PLC 连接状态缓存对象 /// private class PlcConnectionState { public bool IsConnected { get; set; } public DateTime LastCheckTime { get; set; } } public PlcHostedService( ILogger logger, PlcService plcService) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _plcService = plcService ?? throw new ArgumentNullException(nameof(plcService)); //初始化plcConfigs _plcConfigs = initPlcConfigs(_plcConfigs); // 初始化并行控制信号量 _semaphore = new SemaphoreSlim(_parallelDegree, _parallelDegree); // 初始化连接状态缓存 _connectionStateCache = new ConcurrentDictionary(); foreach (var config in _plcConfigs) { if (!string.IsNullOrWhiteSpace(config.Ip)) { _connectionStateCache.TryAdd(config.Ip, new PlcConnectionState { IsConnected = false, LastCheckTime = DateTime.MinValue }); } else { _logger.LogWarning("发现空IP的PLC配置,已跳过缓存初始化"); } } } private List initPlcConfigs(List result) { var defaultResult = result ?? new List(); try { List query = plantWorkstationService.Queryable() .Where(it => it.Status == 1) .Select(it => new PlcConfig { PlcName = it.WorkstationCode, Ip = it.PlcIP, Rack = (short)it.Rack, // 直接强制转换(it.Rack是int,非空) Slot = (short)it.Slot // 同理,it.Slot是int,非空 }) .ToList(); return query.Count > 0 ? query : defaultResult; } catch (Exception ex) { Console.WriteLine($"初始化PLC配置异常:{ex.Message}"); return defaultResult; } } /// /// 重写BackgroundService的ExecuteAsync(替代StartAsync) /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("PLC后台监听服务启动中..."); //获取PLC配置 if (!_plcConfigs.Any()) { _logger.LogWarning("未配置PLC参数,跳过PLC自动连接"); return; } _isRunning = true; // 1. 启动时并行测试所有PLC连接 await BatchConnectPlcAsync(stoppingToken); // 2. 启动安全定时器(防重叠执行) _timer = new Timer( TimerCallback, null, TimeSpan.Zero, TimeSpan.FromSeconds(_pollingIntervalSeconds)); _logger.LogInformation($"PLC服务启动完成 | 并行度:{_parallelDegree} | 轮询间隔:{_pollingIntervalSeconds}s | 设备总数:{_plcConfigs.Count}"); // 等待停止信号 await Task.Delay(Timeout.Infinite, stoppingToken); // 停止定时器 _timer?.Change(Timeout.Infinite, 0); _logger.LogInformation("PLC后台监听服务已收到停止信号"); } // 1. 定义异步锁(全局变量) private readonly SemaphoreSlim _timerAsyncLock = new SemaphoreSlim(1, 1); // 2. 重构TimerCallback为异步锁版本 private async void TimerCallback(object state) { if (!_isRunning) { _logger.LogDebug("PLC服务已停止,跳过轮询"); return; } // 尝试获取异步锁(超时时间设为0,等价于TryEnter) bool isLockAcquired = await _timerAsyncLock.WaitAsync(0); if (!isLockAcquired) { _logger.LogDebug("前一轮PLC轮询未完成,跳过本次执行"); return; } try { await PollPlcDataAsync(); } catch (Exception ex) { _logger.LogError(ex, "PLC轮询回调异常"); } finally { // 释放异步锁(无异常风险) _timerAsyncLock.Release(); } } /// /// 启动时批量并行连接PLC /// private async Task BatchConnectPlcAsync(CancellationToken cancellationToken) { _logger.LogInformation("开始批量连接所有PLC..."); // 过滤空IP配置,避免无效任务 var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList(); if (!validConfigs.Any()) { _logger.LogWarning("无有效PLC配置,跳过批量连接"); return; } var tasks = validConfigs.Select(async config => { await _semaphore.WaitAsync(cancellationToken); try { // 带超时的连接测试(合并cancellationToken,支持外部停止) using var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutTokenSource.CancelAfter(TimeSpan.FromSeconds(_connectTimeoutSeconds)); var result = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token); // 安全更新缓存(ConcurrentDictionary线程安全) if (_connectionStateCache.TryGetValue(config.Ip, out var state)) { state.IsConnected = result.ConnectSuccess; state.LastCheckTime = DateTime.Now; } if (result.ConnectSuccess) _logger.LogInformation($"[{config.PlcName}] 连接成功 | IP:{config.Ip}"); else _logger.LogWarning($"[{config.PlcName}] 连接失败 | IP:{config.Ip} | 原因:{result.ConnectMessage}"); } catch (OperationCanceledException) { _logger.LogWarning($"[{config.PlcName}] 连接超时 | IP:{config.Ip} | 超时时间:{_connectTimeoutSeconds}s"); // 超时标记为断开 if (_connectionStateCache.TryGetValue(config.Ip, out var state)) { state.IsConnected = false; state.LastCheckTime = DateTime.Now; } } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 连接异常 | IP:{config.Ip}"); // 异常标记为断开 if (_connectionStateCache.TryGetValue(config.Ip, out var state)) { state.IsConnected = false; state.LastCheckTime = DateTime.Now; } } finally { _semaphore.Release(); } }); await Task.WhenAll(tasks); } /// /// 并行轮询PLC数据(读取前先验证连接状态,修复传参/日志/数据处理问题) /// private async Task PollPlcDataAsync() { // 过滤有效配置(非空IP) var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList(); if (!validConfigs.Any()) { _logger.LogWarning("无有效PLC配置,跳过数据轮询"); return; } // 统计本次轮询结果 int successCount = 0; int failCount = 0; int skipCount = 0; var tasks = validConfigs.Select(async config => { await _semaphore.WaitAsync(); try { // 1. 校验缓存是否存在 if (!_connectionStateCache.TryGetValue(config.Ip, out var state)) { _logger.LogWarning($"[{config.PlcName}] 未找到连接状态缓存,跳过 | IP:{config.Ip}"); Interlocked.Increment(ref skipCount); return; } // 2. 判断是否需要重新检查连接(缓存过期) var needRecheck = (DateTime.Now - state.LastCheckTime).TotalSeconds > _stateCacheExpireSeconds; bool isConnected = state.IsConnected; // 3. 缓存过期则重新测试连接 if (needRecheck) { using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(_connectTimeoutSeconds)); try { var connectResult = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token); isConnected = connectResult.ConnectSuccess; // 更新缓存状态 state.IsConnected = isConnected; state.LastCheckTime = DateTime.Now; if (!isConnected) { _logger.LogDebug($"[{config.PlcName}] 连接缓存过期,重新检测仍断开 | IP:{config.Ip}"); } } catch (OperationCanceledException) { _logger.LogWarning($"[{config.PlcName}] 连接重检超时 | IP:{config.Ip} | 超时:{_connectTimeoutSeconds}s"); isConnected = false; state.IsConnected = false; state.LastCheckTime = DateTime.Now; } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 连接重检异常 | IP:{config.Ip}"); isConnected = false; state.IsConnected = false; state.LastCheckTime = DateTime.Now; } } // 4. 连接断开则跳过读取 if (!isConnected) { _logger.LogDebug($"[{config.PlcName}] 连接断开,跳过数据读取 | IP:{config.Ip}"); Interlocked.Increment(ref skipCount); return; } // 5. 读取PLC生产数据(核心修复:移除多余的TestReadAddress参数) var (success, prodData, message) = await _plcService.ReadProductionDataAsync( config.Ip, config.PlcName, config.Rack, config.Slot); // 仅传3个必要参数 if (success) { // 数据处理逻辑(示例:可替换为入库/推MQ/存Redis等) await ProcessPlcProductionDataAsync(config, prodData); Interlocked.Increment(ref successCount); } else { _logger.LogWarning($"[{config.PlcName}] 生产数据读取失败 | IP:{config.Ip} | 原因:{message}"); // 读取失败标记连接断开 state.IsConnected = false; Interlocked.Increment(ref failCount); } } catch (OperationCanceledException) { _logger.LogWarning($"[{config.PlcName}] 轮询操作超时 | IP:{config.Ip}"); if (_connectionStateCache.TryGetValue(config.Ip, out var state)) { state.IsConnected = false; } Interlocked.Increment(ref failCount); } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 轮询异常 | IP:{config.Ip}"); if (_connectionStateCache.TryGetValue(config.Ip, out var state)) { state.IsConnected = false; } Interlocked.Increment(ref failCount); } finally { _semaphore.Release(); } }); // 等待所有轮询任务完成 await Task.WhenAll(tasks); } /// /// 处理PLC生产数据(示例方法:可根据业务扩展) /// /// PLC配置 /// 生产数据 private async Task ProcessPlcProductionDataAsync(PlcConfig config, PlcProductionData prodData) { try { // 示例1:写入数据库(需注入仓储/服务) // await _plcProductionDataRepository.AddAsync(prodData); // 示例2:推送至消息队列(如RabbitMQ/Kafka) // await _messageQueueService.PublishAsync("plc_production_data", prodData); // 示例3:缓存至Redis // await _redisCache.SetAsync($"plc:production:{config.Ip}", prodData, TimeSpan.FromMinutes(5)); await Task.CompletedTask; // 异步占位 } catch (Exception ex) { _logger.LogError(ex, $"[{config.PlcName}] 生产数据处理失败 | IP:{config.Ip}"); // 数据处理失败不影响轮询,仅记录日志 } } /// /// 重写停止逻辑(更安全) /// public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("PLC后台监听服务停止中..."); _isRunning = false; // 停止定时器 _timer?.Change(Timeout.Infinite, 0); // 等待当前轮询完成 await Task.Delay(100, cancellationToken); await base.StopAsync(cancellationToken); _logger.LogInformation("PLC后台监听服务已停止"); } /// /// 资源释放(完整实现IDisposable) /// public override void Dispose() { // 停止定时器 _timer?.Dispose(); // 释放信号量 _semaphore?.Dispose(); // 调用基类释放 base.Dispose(); _logger.LogInformation("PLC后台监听服务已释放所有资源"); } } }