424 lines
17 KiB
C#
Raw Normal View History

2026-01-19 13:46:29 +08:00
using MDM.Services.Plant;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RIZO.Admin.WebApi.PLC.Model;
using RIZO.Admin.WebApi.PLC.Service;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace RIZO.Admin.WebApi.PLC.Service
{
public class PlcHostedService : BackgroundService, IDisposable
{
private readonly ILogger<PlcHostedService> _logger;
private readonly PlcService _plcService;
private readonly List<PlcConfig> _plcConfigs;
private Timer _timer;
private bool _isRunning;
private readonly SemaphoreSlim _semaphore;
private readonly object _timerLock = new object();
// 连接状态缓存:减少短时间内重复连接测试
private readonly ConcurrentDictionary<string, PlcConnectionState> _connectionStateCache;
// 可配置参数建议放到配置文件中通过IOptions注入
2026-01-24 17:28:23 +08:00
private readonly int _parallelDegree = 15; // 并行度20+PLC建议8-12
private readonly double _pollingIntervalSeconds = 0.2; // 轮询间隔
2026-01-21 15:09:02 +08:00
private readonly int _connectTimeoutSeconds = 1; // 单个PLC连接超时时间
2026-01-24 17:28:23 +08:00
private readonly int _stateCacheExpireSeconds = 5; // 连接状态缓存有效期
2026-01-19 13:46:29 +08:00
private PlantWorkstationService plantWorkstationService = new PlantWorkstationService();
/// <summary>
/// PLC 连接状态缓存对象
/// </summary>
private class PlcConnectionState
{
public bool IsConnected { get; set; }
public DateTime LastCheckTime { get; set; }
}
public PlcHostedService(
ILogger<PlcHostedService> logger,
2026-01-19 15:41:58 +08:00
PlcService plcService)
2026-01-19 13:46:29 +08:00
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_plcService = plcService ?? throw new ArgumentNullException(nameof(plcService));
//初始化plcConfigs
_plcConfigs = initPlcConfigs(_plcConfigs);
// 初始化并行控制信号量
_semaphore = new SemaphoreSlim(_parallelDegree, _parallelDegree);
// 初始化连接状态缓存
_connectionStateCache = new ConcurrentDictionary<string, PlcConnectionState>();
foreach (var config in _plcConfigs)
{
if (!string.IsNullOrWhiteSpace(config.Ip))
{
_connectionStateCache.TryAdd(config.Ip, new PlcConnectionState
{
IsConnected = false,
LastCheckTime = DateTime.MinValue
});
}
else
{
_logger.LogWarning("发现空IP的PLC配置已跳过缓存初始化");
}
}
}
private List<PlcConfig> initPlcConfigs(List<PlcConfig> result)
{
var defaultResult = result ?? new List<PlcConfig>();
try
{
List<PlcConfig> query = plantWorkstationService.Queryable()
.Where(it => it.Status == 1)
.Select(it => new PlcConfig
{
PlcName = it.WorkstationCode,
Ip = it.PlcIP,
Rack = (short)it.Rack, // 直接强制转换it.Rack是int非空
Slot = (short)it.Slot // 同理it.Slot是int非空
})
.ToList();
return query.Count > 0 ? query : defaultResult;
}
catch (Exception ex)
{
Console.WriteLine($"初始化PLC配置异常{ex.Message}");
return defaultResult;
}
}
/// <summary>
/// 重写BackgroundService的ExecuteAsync替代StartAsync
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("PLC后台监听服务启动中...");
//获取PLC配置
if (!_plcConfigs.Any())
{
_logger.LogWarning("未配置PLC参数跳过PLC自动连接");
return;
}
_isRunning = true;
// 1. 启动时并行测试所有PLC连接
await BatchConnectPlcAsync(stoppingToken);
// 2. 启动安全定时器(防重叠执行)
_timer = new Timer(
TimerCallback,
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(_pollingIntervalSeconds));
_logger.LogInformation($"PLC服务启动完成 | 并行度:{_parallelDegree} | 轮询间隔:{_pollingIntervalSeconds}s | 设备总数:{_plcConfigs.Count}");
// 等待停止信号
await Task.Delay(Timeout.Infinite, stoppingToken);
// 停止定时器
_timer?.Change(Timeout.Infinite, 0);
_logger.LogInformation("PLC后台监听服务已收到停止信号");
}
// 1. 定义异步锁(全局变量)
private readonly SemaphoreSlim _timerAsyncLock = new SemaphoreSlim(1, 1);
// 2. 重构TimerCallback为异步锁版本
private async void TimerCallback(object state)
{
if (!_isRunning)
{
_logger.LogDebug("PLC服务已停止跳过轮询");
return;
}
// 尝试获取异步锁超时时间设为0等价于TryEnter
bool isLockAcquired = await _timerAsyncLock.WaitAsync(0);
if (!isLockAcquired)
{
_logger.LogDebug("前一轮PLC轮询未完成跳过本次执行");
return;
}
try
{
await PollPlcDataAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "PLC轮询回调异常");
}
finally
{
// 释放异步锁(无异常风险)
_timerAsyncLock.Release();
}
}
/// <summary>
/// 启动时批量并行连接PLC
/// </summary>
private async Task BatchConnectPlcAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("开始批量连接所有PLC...");
// 过滤空IP配置避免无效任务
var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList();
if (!validConfigs.Any())
{
_logger.LogWarning("无有效PLC配置跳过批量连接");
return;
}
var tasks = validConfigs.Select(async config =>
{
await _semaphore.WaitAsync(cancellationToken);
try
{
// 带超时的连接测试合并cancellationToken支持外部停止
using var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutTokenSource.CancelAfter(TimeSpan.FromSeconds(_connectTimeoutSeconds));
var result = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token);
// 安全更新缓存ConcurrentDictionary线程安全
if (_connectionStateCache.TryGetValue(config.Ip, out var state))
{
state.IsConnected = result.ConnectSuccess;
state.LastCheckTime = DateTime.Now;
}
if (result.ConnectSuccess)
_logger.LogInformation($"[{config.PlcName}] 连接成功 | IP{config.Ip}");
else
_logger.LogWarning($"[{config.PlcName}] 连接失败 | IP{config.Ip} | 原因:{result.ConnectMessage}");
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 连接超时 | IP{config.Ip} | 超时时间:{_connectTimeoutSeconds}s");
// 超时标记为断开
if (_connectionStateCache.TryGetValue(config.Ip, out var state))
{
state.IsConnected = false;
state.LastCheckTime = DateTime.Now;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 连接异常 | IP{config.Ip}");
// 异常标记为断开
if (_connectionStateCache.TryGetValue(config.Ip, out var state))
{
state.IsConnected = false;
state.LastCheckTime = DateTime.Now;
}
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
/// <summary>
/// 并行轮询PLC数据读取前先验证连接状态修复传参/日志/数据处理问题)
/// </summary>
private async Task PollPlcDataAsync()
{
// 过滤有效配置非空IP
var validConfigs = _plcConfigs.Where(c => !string.IsNullOrWhiteSpace(c.Ip)).ToList();
if (!validConfigs.Any())
{
_logger.LogWarning("无有效PLC配置跳过数据轮询");
return;
}
// 统计本次轮询结果
int successCount = 0;
int failCount = 0;
int skipCount = 0;
var tasks = validConfigs.Select(async config =>
{
await _semaphore.WaitAsync();
try
{
// 1. 校验缓存是否存在
if (!_connectionStateCache.TryGetValue(config.Ip, out var state))
{
_logger.LogWarning($"[{config.PlcName}] 未找到连接状态缓存,跳过 | IP{config.Ip}");
Interlocked.Increment(ref skipCount);
return;
}
// 2. 判断是否需要重新检查连接(缓存过期)
var needRecheck = (DateTime.Now - state.LastCheckTime).TotalSeconds > _stateCacheExpireSeconds;
bool isConnected = state.IsConnected;
// 3. 缓存过期则重新测试连接
if (needRecheck)
{
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(_connectTimeoutSeconds));
try
{
var connectResult = await _plcService.TestSinglePlcAsync(config, timeoutTokenSource.Token);
isConnected = connectResult.ConnectSuccess;
// 更新缓存状态
state.IsConnected = isConnected;
state.LastCheckTime = DateTime.Now;
if (!isConnected)
{
_logger.LogDebug($"[{config.PlcName}] 连接缓存过期,重新检测仍断开 | IP{config.Ip}");
}
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 连接重检超时 | IP{config.Ip} | 超时:{_connectTimeoutSeconds}s");
isConnected = false;
state.IsConnected = false;
state.LastCheckTime = DateTime.Now;
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 连接重检异常 | IP{config.Ip}");
isConnected = false;
state.IsConnected = false;
state.LastCheckTime = DateTime.Now;
}
}
// 4. 连接断开则跳过读取
if (!isConnected)
{
_logger.LogDebug($"[{config.PlcName}] 连接断开,跳过数据读取 | IP{config.Ip}");
Interlocked.Increment(ref skipCount);
return;
}
// 5. 读取PLC生产数据核心修复移除多余的TestReadAddress参数
var (success, prodData, message) = await _plcService.ReadProductionDataAsync(
2026-01-21 15:34:40 +08:00
config.Ip, config.PlcName, config.Rack, config.Slot); // 仅传3个必要参数
2026-01-19 13:46:29 +08:00
if (success)
{
// 数据处理逻辑(示例:可替换为入库/推MQ/存Redis等
await ProcessPlcProductionDataAsync(config, prodData);
Interlocked.Increment(ref successCount);
}
else
{
_logger.LogWarning($"[{config.PlcName}] 生产数据读取失败 | IP{config.Ip} | 原因:{message}");
// 读取失败标记连接断开
state.IsConnected = false;
Interlocked.Increment(ref failCount);
}
}
catch (OperationCanceledException)
{
_logger.LogWarning($"[{config.PlcName}] 轮询操作超时 | IP{config.Ip}");
if (_connectionStateCache.TryGetValue(config.Ip, out var state))
{
state.IsConnected = false;
}
Interlocked.Increment(ref failCount);
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 轮询异常 | IP{config.Ip}");
if (_connectionStateCache.TryGetValue(config.Ip, out var state))
{
state.IsConnected = false;
}
Interlocked.Increment(ref failCount);
}
finally
{
_semaphore.Release();
}
});
// 等待所有轮询任务完成
await Task.WhenAll(tasks);
}
/// <summary>
/// 处理PLC生产数据示例方法可根据业务扩展
/// </summary>
/// <param name="config">PLC配置</param>
/// <param name="prodData">生产数据</param>
private async Task ProcessPlcProductionDataAsync(PlcConfig config, PlcProductionData prodData)
{
try
{
// 示例1写入数据库需注入仓储/服务)
// await _plcProductionDataRepository.AddAsync(prodData);
// 示例2推送至消息队列如RabbitMQ/Kafka
// await _messageQueueService.PublishAsync("plc_production_data", prodData);
// 示例3缓存至Redis
// await _redisCache.SetAsync($"plc:production:{config.Ip}", prodData, TimeSpan.FromMinutes(5));
await Task.CompletedTask; // 异步占位
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{config.PlcName}] 生产数据处理失败 | IP{config.Ip}");
// 数据处理失败不影响轮询,仅记录日志
}
}
/// <summary>
/// 重写停止逻辑(更安全)
/// </summary>
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("PLC后台监听服务停止中...");
_isRunning = false;
// 停止定时器
_timer?.Change(Timeout.Infinite, 0);
// 等待当前轮询完成
await Task.Delay(100, cancellationToken);
await base.StopAsync(cancellationToken);
_logger.LogInformation("PLC后台监听服务已停止");
}
/// <summary>
/// 资源释放完整实现IDisposable
/// </summary>
public override void Dispose()
{
// 停止定时器
_timer?.Dispose();
// 释放信号量
_semaphore?.Dispose();
// 调用基类释放
base.Dispose();
_logger.LogInformation("PLC后台监听服务已释放所有资源");
}
}
}