shgx_tz_mom/ZR.Service/tcp/TcpServerService.cs
git_rabbit b4879f821d feat(tcp): 添加TCP服务器服务实现多客户端并发连接
实现基于BackgroundService的TCP服务器,支持多客户端并发连接和消息广播
包含信号去重功能,防止短时间内重复处理相同信号
添加客户端连接管理和统计报告功能
2026-02-06 14:32:46 +08:00

442 lines
19 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ZR.Service.tcp
{
/// <summary>
/// 信号去重记录
/// </summary>
public class SignalDeduplicationRecord
{
/// <summary>
/// 信号唯一标识
/// </summary>
public string SignalId { get; set; }
/// <summary>
/// 接收时间戳(毫秒)
/// </summary>
public long Timestamp { get; set; }
}
/// <summary>
/// TCP服务器服务
/// 基于BackgroundService实现支持多客户端并发连接
/// 监听端口9090
/// </summary>
public class TcpServerService : BackgroundService
{
private readonly ILogger<TcpServerService> _logger;
private TcpListener _tcpListener;
private CancellationTokenSource _cts;
private readonly int _port = 9090;
private readonly string _ipAddress = "0.0.0.0"; // 监听所有网络接口
// 线程安全的客户端连接列表,用于管理所有已连接的客户端
private ConcurrentDictionary<string, TcpClient> _connectedClients;
// 信号去重相关
private ConcurrentDictionary<string, long> _recentSignals; // 键:信号唯一标识,值:时间戳
private long _deduplicationCount; // 去重统计计数
private object _deduplicationLock = new object(); // 用于统计计数的锁
private readonly int _timeWindowMs = 1000; // 时间窗口1000毫秒
private readonly int _cleanupIntervalMs = 5000; // 清理间隔5000毫秒
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志服务</param>
public TcpServerService(ILogger<TcpServerService> logger)
{
_logger = logger;
_connectedClients = new ConcurrentDictionary<string, TcpClient>();
_recentSignals = new ConcurrentDictionary<string, long>();
_deduplicationCount = 0;
}
/// <summary>
/// 执行TCP服务器逻辑
/// </summary>
/// <param name="stoppingToken">停止令牌</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 创建与停止令牌关联的取消令牌源
_cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
try
{
// 初始化TCP监听器
IPAddress localAddr = IPAddress.Parse(_ipAddress);
_tcpListener = new TcpListener(localAddr, _port);
// 启动监听器
_tcpListener.Start();
_logger.LogInformation($"TCP服务器启动成功监听地址: {_ipAddress}:{_port}");
Console.WriteLine($"[TCP服务器] 启动成功,监听地址: {_ipAddress}:{_port}");
// 启动定期清理任务
_ = Task.Run(() => CleanupExpiredSignalsAsync(_cts.Token), _cts.Token);
// 启动统计报告任务
_ = Task.Run(() => ReportDeduplicationStatsAsync(_cts.Token), _cts.Token);
// 循环接受客户端连接
while (!_cts.Token.IsCancellationRequested)
{
try
{
// 检查是否有挂起的连接
if (_tcpListener.Pending())
{
// 接受客户端连接
TcpClient client = await _tcpListener.AcceptTcpClientAsync();
string clientEndPoint = client.Client.RemoteEndPoint?.ToString() ?? "未知客户端";
// 将客户端添加到连接列表
_connectedClients.TryAdd(clientEndPoint, client);
_logger.LogInformation($"客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}");
Console.WriteLine($"[TCP服务器] 客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}");
// 异步处理客户端连接,避免阻塞主线程
_ = HandleClientAsync(client, clientEndPoint, _cts.Token);
}
else
{
// 避免CPU占用过高添加适当延迟
await Task.Delay(100, _cts.Token);
}
}
catch (OperationCanceledException)
{
// 正常停止,无需处理
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "接受客户端连接时发生错误");
Console.WriteLine($"[TCP服务器] 接受客户端连接时发生错误: {ex.Message}");
// 继续运行,不影响服务器整体
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "TCP服务器启动失败");
Console.WriteLine($"[TCP服务器] 启动失败: {ex.Message}");
}
finally
{
// 清理所有客户端连接
foreach (var client in _connectedClients.Values)
{
try
{
client.Close();
}
catch { }
}
_connectedClients.Clear();
// 清理信号记录
_recentSignals.Clear();
// 停止监听器
_tcpListener?.Stop();
_logger.LogInformation("TCP服务器已停止");
Console.WriteLine("[TCP服务器] 已停止");
}
}
/// <summary>
/// 定期清理过期的信号记录
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task CleanupExpiredSignalsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
long currentTime = GetCurrentTimestampMs();
int removedCount = 0;
// 清理过期的信号记录
foreach (var signalEntry in _recentSignals)
{
if (currentTime - signalEntry.Value > _timeWindowMs)
{
if (_recentSignals.TryRemove(signalEntry.Key, out _))
{
removedCount++;
}
}
}
if (removedCount > 0)
{
_logger.LogInformation($"清理了 {removedCount} 条过期信号记录,剩余 {_recentSignals.Count} 条");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "清理过期信号记录时发生错误");
}
// 等待下一次清理
await Task.Delay(_cleanupIntervalMs, cancellationToken);
}
}
/// <summary>
/// 报告去重统计信息
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task ReportDeduplicationStatsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
long currentCount = Interlocked.Exchange(ref _deduplicationCount, 0);
if (currentCount > 0)
{
_logger.LogInformation($"过去5秒内去重了 {currentCount} 条信号");
Console.WriteLine($"[TCP服务器] 过去5秒内去重了 {currentCount} 条信号");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "报告去重统计信息时发生错误");
}
// 每5秒报告一次
await Task.Delay(5000, cancellationToken);
}
}
/// <summary>
/// 检查信号是否需要去重
/// </summary>
/// <param name="signalData">信号数据</param>
/// <returns>是否需要去重true需要去重false不需要去重</returns>
private bool ShouldDeduplicate(string signalData)
{
try
{
// 生成信号唯一标识
string signalId = GenerateSignalId(signalData);
long currentTime = GetCurrentTimestampMs();
// 检查是否存在近期相同的信号
if (_recentSignals.TryGetValue(signalId, out long existingTime))
{
// 检查是否在时间窗口内
if (currentTime - existingTime <= _timeWindowMs)
{
// 记录去重计数
Interlocked.Increment(ref _deduplicationCount);
_logger.LogInformation($"信号去重:{signalId},时间差:{currentTime - existingTime}ms");
return true;
}
}
// 更新或添加信号记录
_recentSignals[signalId] = currentTime;
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "信号去重检查时发生错误");
// 发生错误时,允许信号通过,避免影响主流程
return false;
}
}
/// <summary>
/// 生成信号唯一标识
/// </summary>
/// <param name="signalData">信号数据</param>
/// <returns>信号唯一标识</returns>
private string GenerateSignalId(string signalData)
{
// 对于shgx/production/changePackagePrevention/XXXX格式的信号使用完整字符串作为唯一标识
// 对于其他信号,可以根据具体格式生成唯一标识
return signalData;
}
/// <summary>
/// 获取当前时间戳(毫秒)
/// </summary>
/// <returns>时间戳(毫秒)</returns>
private long GetCurrentTimestampMs()
{
// 使用DateTime.UtcNow确保时间一致性减少系统时间跳变的影响
return (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
}
/// <summary>
/// 处理客户端连接
/// </summary>
/// <param name="client">TCP客户端</param>
/// <param name="clientEndPoint">客户端端点信息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task HandleClientAsync(TcpClient client, string clientEndPoint, CancellationToken cancellationToken)
{
using (client)
using (NetworkStream stream = client.GetStream())
{
try
{
// 设置读取超时
client.ReceiveTimeout = 30000; // 30秒
client.SendTimeout = 30000; // 30秒
// 缓冲区大小
byte[] buffer = new byte[4096];
int bytesRead;
// 循环读取客户端数据
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
{
try
{
// 解析接收到的数据
string receivedData = Encoding.UTF8.GetString(buffer, 0, bytesRead);
_logger.LogInformation($"从客户端 {clientEndPoint} 接收到数据: {receivedData}");
Console.WriteLine($"[TCP服务器] 从客户端 {clientEndPoint} 接收到数据: {receivedData}");
// 检查是否是需要广播的消息格式
if (receivedData.StartsWith("shgx/cx/gz/"))
{
// 检查是否需要去重
if (!ShouldDeduplicate(receivedData))
{
// 向所有已连接的客户端广播消息
await BroadcastMessageAsync(receivedData, cancellationToken);
}
else
{
// 信号已去重,记录日志
Console.WriteLine($"[TCP服务器] 信号已去重: {receivedData}");
}
}
else
{
// 处理数据并生成响应
string response = ProcessData(receivedData);
byte[] responseBytes = Encoding.UTF8.GetBytes(response);
// 发送响应
await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken);
_logger.LogInformation($"向客户端 {clientEndPoint} 发送响应: {response}");
Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 发送响应: {response}");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理客户端 {clientEndPoint} 数据时发生错误");
Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 数据时发生错误: {ex.Message}");
// 发送错误响应
string errorResponse = $"Error: {ex.Message}";
byte[] errorBytes = Encoding.UTF8.GetBytes(errorResponse);
await stream.WriteAsync(errorBytes, 0, errorBytes.Length, cancellationToken);
}
}
// 客户端断开连接
_connectedClients.TryRemove(clientEndPoint, out _);
_logger.LogInformation($"客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}");
Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}");
}
catch (OperationCanceledException)
{
// 正常停止,无需处理
_connectedClients.TryRemove(clientEndPoint, out _);
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理客户端 {clientEndPoint} 连接时发生错误");
Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 连接时发生错误: {ex.Message}");
_connectedClients.TryRemove(clientEndPoint, out _);
}
}
}
/// <summary>
/// 向所有已连接的客户端广播消息
/// </summary>
/// <param name="message">要广播的消息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task BroadcastMessageAsync(string message, CancellationToken cancellationToken)
{
_logger.LogInformation($"开始广播消息: {message},目标客户端数: {_connectedClients.Count}");
Console.WriteLine($"[TCP服务器] 开始广播消息: {message},目标客户端数: {_connectedClients.Count}");
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
int broadcastCount = 0;
// 遍历所有客户端并发送消息
foreach (var clientEntry in _connectedClients)
{
string clientEndPoint = clientEntry.Key;
TcpClient client = clientEntry.Value;
try
{
if (client.Connected)
{
// 直接获取网络流不要使用using块避免重复释放
NetworkStream stream = client.GetStream();
await stream.WriteAsync(messageBytes, 0, messageBytes.Length, cancellationToken);
broadcastCount++;
_logger.LogInformation($"向客户端 {clientEndPoint} 广播消息成功");
Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息成功");
}
else
{
// 移除已断开的客户端
_connectedClients.TryRemove(clientEndPoint, out _);
_logger.LogInformation($"客户端 {clientEndPoint} 已断开,从连接列表中移除");
Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 已断开,从连接列表中移除");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"向客户端 {clientEndPoint} 广播消息时发生错误");
Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息时发生错误: {ex.Message}");
// 移除失败的客户端
_connectedClients.TryRemove(clientEndPoint, out _);
}
}
_logger.LogInformation($"广播完成,成功发送到 {broadcastCount} 个客户端");
Console.WriteLine($"[TCP服务器] 广播完成,成功发送到 {broadcastCount} 个客户端");
}
/// <summary>
/// 处理接收到的数据
/// </summary>
/// <param name="data">接收到的数据</param>
/// <returns>响应数据</returns>
private string ProcessData(string data)
{
// 这里可以根据实际业务需求处理数据
// 示例:简单回显接收到的数据
return $"Server response: {data}";
}
}
}