2026-01-17 10:42:02 +08:00

236 lines
9.2 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
namespace RIZO.Admin.WebApi.PLC
{
public class PlcHostedService : IHostedService, IDisposable
{
private readonly ILogger<PlcHostedService> _logger;
private readonly PlcService _plcService;
private readonly List<PlcConfig> _plcConfigs;
private Timer _timer;
private bool _isRunning;
private readonly SemaphoreSlim _semaphore;
private readonly object _timerLock = new();
// 连接状态缓存:减少短时间内重复连接测试
private readonly ConcurrentDictionary<string, PlcConnectionState> _connectionStateCache;
// 可配置参数(建议放到配置文件中)
private readonly int _parallelDegree = 10; // 并行度20+PLC建议8-12
private readonly int _pollingIntervalSeconds = 5; // 轮询间隔
private readonly int _connectTimeoutSeconds = 3; // 单个PLC连接超时时间
private readonly int _stateCacheExpireSeconds = 10; // 连接状态缓存有效期
/// <summary>
/// PLC 连接状态缓存对象
/// </summary>
private class PlcConnectionState
{
public bool IsConnected { get; set; }
public DateTime LastCheckTime { get; set; }
}
public PlcHostedService(
ILogger<PlcHostedService> logger,
PlcService plcService,
IOptions<List<PlcConfig>> plcConfigs)
{
_logger = logger;
_plcService = plcService;
_plcConfigs = plcConfigs.Value ?? new List<PlcConfig>();
// 初始化并行控制信号量
_semaphore = new SemaphoreSlim(_parallelDegree, _parallelDegree);
// 初始化连接状态缓存
_connectionStateCache = new ConcurrentDictionary<string, PlcConnectionState>();
foreach (var config in _plcConfigs)
{
_connectionStateCache.TryAdd(config.Ip, new PlcConnectionState
{
IsConnected = false,
LastCheckTime = DateTime.MinValue
});
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("PLC后台监听服务启动中...");
if (!_plcConfigs.Any())
{
_logger.LogWarning("未配置PLC参数跳过PLC自动连接");
return;
}
_isRunning = true;
// 1. 启动时并行测试所有PLC连接
await BatchConnectPlcAsync(cancellationToken);
// 2. 启动安全定时器(防重叠执行)
_timer = new Timer(
TimerCallback,
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(_pollingIntervalSeconds));
_logger.LogInformation($"PLC服务启动完成 | 并行度:{_parallelDegree} | 轮询间隔:{_pollingIntervalSeconds}s | 设备总数:{_plcConfigs.Count}");
}
/// <summary>
/// 定时器安全回调(防重叠执行)
/// </summary>
private async void TimerCallback(object state)
{
if (!_isRunning || !Monitor.TryEnter(_timerLock))
{
_logger.LogDebug("前一轮PLC轮询未完成跳过本次执行");
return;
}
try
{
await PollPlcDataAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "PLC轮询回调异常");
}
finally
{
Monitor.Exit(_timerLock);
}
}
/// <summary>
/// 启动时批量并行连接PLC
/// </summary>
private async Task BatchConnectPlcAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("开始批量连接所有PLC...");
var tasks = _plcConfigs.Select(async config =>
{
await _semaphore.WaitAsync(cancellationToken);
try
{
// 带超时的连接测试
using var timeoutToken = new CancellationTokenSource(TimeSpan.FromSeconds(_connectTimeoutSeconds));
var result = await _plcService.TestSinglePlcAsync(config, timeoutToken.Token);
// 更新缓存状态
var state = _connectionStateCache[config.Ip];
state.IsConnected = result.ConnectSuccess;
state.LastCheckTime = DateTime.Now;
if (result.ConnectSuccess)
_logger.LogInformation($"[{config.PlcName}] 连接成功 | IP{config.Ip}");
else
_logger.LogWarning($"[{config.PlcName}] 连接失败 | IP{config.Ip} | 原因:{result.ConnectMessage}");
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 连接超时 | IP{config.Ip} | 超时时间:{_connectTimeoutSeconds}s");
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 连接异常 | IP{config.Ip}");
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
/// <summary>
/// 并行轮询PLC数据读取前先验证连接状态
/// </summary>
private async Task PollPlcDataAsync()
{
_logger.LogInformation("开始轮询PLC数据...");
var tasks = _plcConfigs.Select(async config =>
{
await _semaphore.WaitAsync();
try
{
// 1. 先从缓存获取连接状态,未过期则直接使用
var state = _connectionStateCache[config.Ip];
var needRecheck = (DateTime.Now - state.LastCheckTime).TotalSeconds > _stateCacheExpireSeconds;
bool isConnected = state.IsConnected;
if (needRecheck)
{
// 缓存过期,重新测试连接(带超时)
using var timeoutToken = new CancellationTokenSource(TimeSpan.FromSeconds(_connectTimeoutSeconds));
var connectResult = await _plcService.TestSinglePlcAsync(config, timeoutToken.Token);
isConnected = connectResult.ConnectSuccess;
// 更新缓存
state.IsConnected = isConnected;
state.LastCheckTime = DateTime.Now;
}
// 2. 连接失败直接跳过
if (!isConnected)
{
_logger.LogDebug($"[{config.PlcName}] 连接断开,跳过数据读取 | IP{config.Ip}");
return;
}
// 3. 连接正常,读取数据
var (success, value, message) = await _plcService.ReadPlcDataAsync(
config.Ip, config.Rack, config.Slot, config.TestReadAddress);
if (success)
{
_logger.LogInformation($"[{config.PlcName}] 数据读取成功 | 地址:{config.TestReadAddress} | 值:{value}");
// 数据处理逻辑:入库/推前端/存Redis
// await ProcessPlcDataAsync(config, value);
}
else
{
_logger.LogWarning($"[{config.PlcName}] 数据读取失败 | 原因:{message}");
// 读取失败,标记连接状态为断开
state.IsConnected = false;
}
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 操作超时 | IP{config.Ip}");
_connectionStateCache[config.Ip].IsConnected = false;
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 轮询异常 | IP{config.Ip}");
_connectionStateCache[config.Ip].IsConnected = false;
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks);
_logger.LogInformation($"PLC数据轮询完成 | 本次轮询设备数:{_plcConfigs.Count}");
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("PLC后台监听服务停止中...");
_isRunning = false;
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
_semaphore?.Dispose();
_logger.LogInformation("PLC后台监听服务已释放资源");
}
}
}