2026-01-28 17:14:43 +08:00

576 lines
23 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MDM.Services.Plant;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RIZO.Admin.WebApi.PLC.Model;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace RIZO.Admin.WebApi.PLC.Service
{
public class PlcHostedService : BackgroundService, IDisposable
{
private readonly ILogger<PlcHostedService> _logger;
private readonly PlcService _plcService;
private Timer _timer;
private bool _isRunning;
private readonly object _timerLock = new object();
// 1. 按IP隔离信号量避免单PLC故障阻塞所有
private readonly ConcurrentDictionary<string, SemaphoreSlim> _plcSemaphores = new();
// 2. 连接状态增强(分级+失败次数+最后请求时间)
private readonly ConcurrentDictionary<string, PlcConnectionStatus> _connectionStatusCache = new();
// 3. 配置刷新
private Timer _configRefreshTimer;
private readonly SemaphoreSlim _configRefreshLock = new(1, 1);
// 基础配置(配置驱动+热更新)
private readonly IOptionsMonitor<PlcPollingSettings> _pollingSettingsMonitor;
private PlcPollingSettings _currentSettings;
private SemaphoreSlim _globalSemaphore;
private readonly SemaphoreSlim _timerAsyncLock = new(1, 1);
// PLC配置列表支持动态刷新
private List<PlcConfig> _plcConfigs = new();
private PlantWorkstationService _plantWorkstationService;
/// <summary>
/// 增强版PLC连接状态
/// </summary>
private class PlcConnectionStatus
{
public bool IsConnected { get; set; }
public DateTime LastCheckTime { get; set; }
public int FailCount { get; set; } // 连续失败次数
public DateTime LastRequestTime { get; set; } // 最后有数据请求的时间
public ConnectionLevel Level { get; set; } = ConnectionLevel.Disconnected;
}
/// <summary>
/// 连接状态分级
/// </summary>
private enum ConnectionLevel
{
Disconnected = 0, // 断开
Weak = 1, // 弱连接(偶尔失败)
Normal = 2 // 正常连接
}
public PlcHostedService(
ILogger<PlcHostedService> logger,
PlcService plcService,
IOptionsMonitor<PlcPollingSettings> pollingSettingsMonitor = null)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_plcService = plcService ?? throw new ArgumentNullException(nameof(plcService));
_plantWorkstationService = new PlantWorkstationService();
// 配置初始化(支持热更新,无配置时用默认值)
_pollingSettingsMonitor = pollingSettingsMonitor;
_currentSettings = pollingSettingsMonitor?.CurrentValue ?? new PlcPollingSettings();
// 监听配置热更新
if (_pollingSettingsMonitor != null)
{
_pollingSettingsMonitor.OnChange(newSettings =>
{
_currentSettings = newSettings;
_logger.LogInformation("PLC轮询配置已热更新新配置{Settings}",
Newtonsoft.Json.JsonConvert.SerializeObject(newSettings));
// 配置变更后立即调整定时器频率
AdjustTimerInterval();
// 重新初始化全局信号量(并行度变更时生效)
_globalSemaphore?.Dispose();
_globalSemaphore = new SemaphoreSlim(
_currentSettings.GlobalParallelDegree,
_currentSettings.GlobalParallelDegree);
});
}
// 初始化全局信号量
_globalSemaphore = new SemaphoreSlim(
_currentSettings.GlobalParallelDegree,
_currentSettings.GlobalParallelDegree);
// 初始化PLC配置
_ = RefreshPlcConfigsAsync();
}
/// <summary>
/// 动态刷新PLC配置支持热更新
/// </summary>
private async Task RefreshPlcConfigsAsync()
{
if (!await _configRefreshLock.WaitAsync(0)) return;
try
{
var newConfigs = new List<PlcConfig>();
try
{
newConfigs = _plantWorkstationService.Queryable()
.Where(it => it.Status == 1)
.Select(it => new PlcConfig
{
PlcName = it.WorkstationCode,
Ip = it.PlcIP,
Rack = (short)it.Rack, // 空值兜底
Slot = (short)it.Slot // 空值兜底
})
.ToList();
}
catch (Exception ex)
{
_logger.LogError(ex, "刷新PLC配置异常使用旧配置");
return;
}
// 更新配置并初始化状态缓存
if (newConfigs.Any())
{
_plcConfigs = newConfigs;
foreach (var config in _plcConfigs)
{
// 初始化IP隔离信号量从配置读取并发数
_plcSemaphores.TryAdd(config.Ip, new SemaphoreSlim(
_currentSettings.MaxConcurrentPerPlc,
_currentSettings.MaxConcurrentPerPlc));
// 初始化连接状态
_connectionStatusCache.TryAdd(config.Ip, new PlcConnectionStatus
{
IsConnected = false,
LastCheckTime = DateTime.MinValue,
FailCount = 0,
LastRequestTime = DateTime.MinValue,
Level = ConnectionLevel.Disconnected
});
}
_logger.LogInformation($"刷新PLC配置完成当前有效配置数{_plcConfigs.Count}");
}
}
finally
{
_configRefreshLock.Release();
}
}
/// <summary>
/// 重写BackgroundService的ExecuteAsync
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("PLC后台监听服务启动中...");
if (!_plcConfigs.Any())
{
_logger.LogWarning("未配置有效PLC参数跳过PLC自动连接");
return;
}
_isRunning = true;
// 1. 启动时批量并行连接PLC
await BatchConnectPlcAsync(stoppingToken);
// 2. 启动配置刷新定时器(从配置读取间隔)
_configRefreshTimer = new Timer(
async _ => await RefreshPlcConfigsAsync(),
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(_currentSettings.ConfigRefreshInterval));
// 3. 启动数据轮询定时器(初始使用空闲频率)
_timer = new Timer(
TimerCallback,
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(_currentSettings.IdlePollInterval));
_logger.LogInformation($"PLC服务启动完成 | 全局并行度:{_currentSettings.GlobalParallelDegree} | 设备总数:{_plcConfigs.Count}");
// 等待停止信号
await Task.Delay(Timeout.Infinite, stoppingToken);
// 停止所有定时器
_timer?.Change(Timeout.Infinite, 0);
_configRefreshTimer?.Change(Timeout.Infinite, 0);
_logger.LogInformation("PLC后台监听服务已收到停止信号");
}
/// <summary>
/// 优化版定时器回调(动态调整轮询频率)
/// </summary>
private async void TimerCallback(object state)
{
if (!_isRunning)
{
_logger.LogDebug("PLC服务已停止跳过轮询");
return;
}
// 异步锁防止重叠执行
if (!await _timerAsyncLock.WaitAsync(0))
{
_logger.LogDebug("前一轮PLC轮询未完成跳过本次执行");
return;
}
try
{
await PollPlcDataAsync();
// 动态调整定时器频率(全局自适应)
AdjustTimerInterval();
}
catch (Exception ex)
{
_logger.LogError(ex, "PLC轮询回调异常");
}
finally
{
_timerAsyncLock.Release();
}
}
/// <summary>
/// 动态调整定时器轮询频率
/// </summary>
private void AdjustTimerInterval()
{
// 统计活跃时长内有数据请求的PLC数量
var activePlcCount = _connectionStatusCache.Values
.Count(s => (DateTime.Now - s.LastRequestTime).TotalSeconds <= _currentSettings.ActiveDuration);
// 有活跃PLC则使用高频否则使用低频
var newInterval = activePlcCount > 0
? TimeSpan.FromSeconds(_currentSettings.ActivePollInterval)
: TimeSpan.FromSeconds(_currentSettings.IdlePollInterval);
// 仅当频率变化时更新定时器
if (_timer != null)
{
try
{
_timer.Change(newInterval, newInterval);
_logger.LogDebug($"调整轮询频率:{newInterval.TotalSeconds}s活跃PLC数{activePlcCount}");
}
catch (Exception ex)
{
_logger.LogError(ex, "调整轮询频率失败");
}
}
}
/// <summary>
/// 启动时批量并行连接PLC带指数退避重试
/// </summary>
private async Task BatchConnectPlcAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("开始批量连接所有PLC...");
var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList();
if (!validConfigs.Any())
{
_logger.LogWarning("无有效PLC配置跳过批量连接");
return;
}
var tasks = validConfigs.Select(async config =>
{
await _globalSemaphore.WaitAsync(cancellationToken);
try
{
// 指数退避重试连接
bool connectSuccess = false;
for (int retry = 0; retry < _currentSettings.MaxRetryTimes; retry++)
{
try
{
using var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutTokenSource.CancelAfter(TimeSpan.FromSeconds(_currentSettings.ConnectTimeoutSeconds));
var result = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token);
connectSuccess = result.ConnectSuccess;
if (connectSuccess) break;
// 指数退避等待(从配置读取初始间隔)
var waitTime = TimeSpan.FromSeconds(_currentSettings.InitialRetryInterval * Math.Pow(2, retry));
await Task.Delay(waitTime, cancellationToken);
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 连接重试{retry + 1}次超时 | IP{config.Ip}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 连接重试{retry + 1}次异常 | IP{config.Ip}");
}
}
// 更新连接状态
if (_connectionStatusCache.TryGetValue(config.Ip, out var status))
{
status.IsConnected = connectSuccess;
status.LastCheckTime = DateTime.Now;
status.FailCount = connectSuccess ? 0 : status.FailCount + 1;
status.Level = connectSuccess ? ConnectionLevel.Normal : ConnectionLevel.Disconnected;
}
if (connectSuccess)
_logger.LogInformation($"[{config.PlcName}] 连接成功 | IP{config.Ip}");
else
_logger.LogWarning($"[{config.PlcName}] 连接失败({_currentSettings.MaxRetryTimes}次重试) | IP{config.Ip}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 批量连接异常 | IP{config.Ip}");
}
finally
{
_globalSemaphore.Release();
}
});
await Task.WhenAll(tasks);
}
/// <summary>
/// 优化版并行轮询PLC数据IP隔离+动态频率+异常闭环)
/// </summary>
private async Task PollPlcDataAsync()
{
var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList();
if (!validConfigs.Any())
{
_logger.LogWarning("无有效PLC配置跳过数据轮询");
return;
}
// 统计本次轮询结果
int successCount = 0, failCount = 0, skipCount = 0, activeCount = 0;
var tasks = validConfigs.Select(async config =>
{
// 获取IP隔离信号量
if (!_plcSemaphores.TryGetValue(config.Ip, out var plcSemaphore))
{
_logger.LogWarning($"[{config.PlcName}] 未找到IP隔离信号量跳过 | IP{config.Ip}");
Interlocked.Increment(ref skipCount);
return;
}
await _globalSemaphore.WaitAsync();
await plcSemaphore.WaitAsync();
try
{
if (!_connectionStatusCache.TryGetValue(config.Ip, out var status))
{
_logger.LogWarning($"[{config.PlcName}] 未找到连接状态缓存,跳过 | IP{config.Ip}");
Interlocked.Increment(ref skipCount);
return;
}
// 1. 检查连接状态缓存是否过期
bool needRecheck = (DateTime.Now - status.LastCheckTime).TotalSeconds > _currentSettings.ConfigRefreshInterval;
bool isConnected = status.IsConnected;
// 2. 缓存过期则重新检查连接
if (needRecheck)
{
isConnected = await RecheckPlcConnectionAsync(config, status);
}
// 3. 连接断开且失败次数超阈值,触发告警
if (!isConnected && status.FailCount >= _currentSettings.MaxRetryTimes)
{
_logger.LogError($"[{config.PlcName}] 连续{status.FailCount}次连接失败,触发告警 | IP{config.Ip}");
Interlocked.Increment(ref skipCount);
return;
}
// 4. 连接断开则跳过读取
if (!isConnected)
{
_logger.LogDebug($"[{config.PlcName}] 连接断开,跳过数据读取 | IP{config.Ip}");
Interlocked.Increment(ref skipCount);
return;
}
// 5. 读取PLC生产数据
var (success, prodData, message) = await _plcService.ReadProductionDataAsync(
config.Ip, config.PlcName, config.Rack, config.Slot);
if (success)
{
// 更新最后请求时间(标记为活跃)
status.LastRequestTime = DateTime.Now;
status.Level = ConnectionLevel.Normal;
status.FailCount = 0;
Interlocked.Increment(ref successCount);
// 有有效数据则计数
if (prodData != null)
{
Interlocked.Increment(ref activeCount);
await ProcessPlcProductionDataAsync(config, prodData);
}
}
else
{
// 读取失败更新状态
status.FailCount++;
status.Level = status.FailCount <= 1 ? ConnectionLevel.Weak : ConnectionLevel.Disconnected;
Interlocked.Increment(ref failCount);
}
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 轮询操作超时 | IP{config.Ip}");
UpdateStatusOnFailure(config.Ip);
Interlocked.Increment(ref failCount);
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 轮询异常 | IP{config.Ip}");
UpdateStatusOnFailure(config.Ip);
Interlocked.Increment(ref failCount);
}
finally
{
plcSemaphore.Release();
_globalSemaphore.Release();
}
});
await Task.WhenAll(tasks);
// 输出本轮轮询统计
//_logger.LogInformation($"PLC轮询完成 | 成功:{successCount} | 失败:{failCount} | 跳过:{skipCount} | 有数据:{activeCount}");
}
/// <summary>
/// 重新检查PLC连接状态
/// </summary>
private async Task<bool> RecheckPlcConnectionAsync(PlcConfig config, PlcConnectionStatus status)
{
try
{
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(_currentSettings.ConnectTimeoutSeconds));
var connectResult = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token);
// 更新状态
status.IsConnected = connectResult.ConnectSuccess;
status.LastCheckTime = DateTime.Now;
status.FailCount = connectResult.ConnectSuccess ? 0 : status.FailCount + 1;
status.Level = connectResult.ConnectSuccess ? ConnectionLevel.Normal : ConnectionLevel.Disconnected;
if (!connectResult.ConnectSuccess)
{
_logger.LogDebug($"[{config.PlcName}] 连接缓存过期,重新检测仍断开 | IP{config.Ip}");
}
return connectResult.ConnectSuccess;
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 连接重检超时 | IP{config.Ip}");
status.IsConnected = false;
status.LastCheckTime = DateTime.Now;
status.FailCount++;
status.Level = ConnectionLevel.Disconnected;
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 连接重检异常 | IP{config.Ip}");
status.IsConnected = false;
status.LastCheckTime = DateTime.Now;
status.FailCount++;
status.Level = ConnectionLevel.Disconnected;
return false;
}
}
/// <summary>
/// 失败时更新连接状态
/// </summary>
private void UpdateStatusOnFailure(string ip)
{
if (_connectionStatusCache.TryGetValue(ip, out var status))
{
status.IsConnected = false;
status.FailCount++;
status.Level = status.FailCount <= 1 ? ConnectionLevel.Weak : ConnectionLevel.Disconnected;
}
}
/// <summary>
/// 处理PLC生产数据可扩展
/// </summary>
private async Task ProcessPlcProductionDataAsync(PlcConfig config, PlcProductionData prodData)
{
try
{
// 业务处理逻辑:入库/推MQ/缓存等
// 示例await _plcDataService.SaveProductionDataAsync(prodData);
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 生产数据处理失败 | IP{config.Ip}");
}
}
/// <summary>
/// 重写停止逻辑
/// </summary>
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("PLC后台监听服务停止中...");
_isRunning = false;
// 停止所有定时器
_timer?.Change(Timeout.Infinite, 0);
_configRefreshTimer?.Change(Timeout.Infinite, 0);
// 等待当前轮询完成
await Task.Delay(1000, cancellationToken);
await base.StopAsync(cancellationToken);
_logger.LogInformation("PLC后台监听服务已停止");
}
/// <summary>
/// 完整释放资源
/// </summary>
public override void Dispose()
{
// 释放定时器
_timer?.Dispose();
_configRefreshTimer?.Dispose();
// 释放信号量
_globalSemaphore?.Dispose();
_timerAsyncLock?.Dispose();
_configRefreshLock?.Dispose();
// 释放IP隔离信号量
foreach (var semaphore in _plcSemaphores.Values)
{
semaphore.Dispose();
}
base.Dispose();
_logger.LogInformation("PLC后台监听服务已释放所有资源");
}
}
}