From b4879f821d921a73f34fe513d582119d0f594dba Mon Sep 17 00:00:00 2001 From: git_rabbit Date: Fri, 6 Feb 2026 14:32:46 +0800 Subject: [PATCH] =?UTF-8?q?feat(tcp):=20=E6=B7=BB=E5=8A=A0TCP=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=99=A8=E6=9C=8D=E5=8A=A1=E5=AE=9E=E7=8E=B0=E5=A4=9A?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=B9=B6=E5=8F=91=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现基于BackgroundService的TCP服务器,支持多客户端并发连接和消息广播 包含信号去重功能,防止短时间内重复处理相同信号 添加客户端连接管理和统计报告功能 --- ZR.Admin.WebApi/Program.cs | 3 + ZR.Service/tcp/TcpServerService.cs | 441 +++++++++++++++++++++++++++++ 2 files changed, 444 insertions(+) create mode 100644 ZR.Service/tcp/TcpServerService.cs diff --git a/ZR.Admin.WebApi/Program.cs b/ZR.Admin.WebApi/Program.cs index a9d03142..84d677a0 100644 --- a/ZR.Admin.WebApi/Program.cs +++ b/ZR.Admin.WebApi/Program.cs @@ -42,6 +42,9 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); /// =============================================================================== + +// 注册TCP服务器服务 +builder.Services.AddHostedService(); // 跨域配置 builder.Services.AddCors(builder.Configuration); // 显示logo diff --git a/ZR.Service/tcp/TcpServerService.cs b/ZR.Service/tcp/TcpServerService.cs new file mode 100644 index 00000000..a78caf94 --- /dev/null +++ b/ZR.Service/tcp/TcpServerService.cs @@ -0,0 +1,441 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ZR.Service.tcp +{ + /// + /// 信号去重记录 + /// + public class SignalDeduplicationRecord + { + /// + /// 信号唯一标识 + /// + public string SignalId { get; set; } + + /// + /// 接收时间戳(毫秒) + /// + public long Timestamp { get; set; } + } + + /// + /// TCP服务器服务 + /// 基于BackgroundService实现,支持多客户端并发连接 + /// 监听端口:9090 + /// + public class TcpServerService : BackgroundService + { + private readonly ILogger _logger; + private TcpListener _tcpListener; + private CancellationTokenSource _cts; + private readonly int _port = 9090; + private readonly string _ipAddress = "0.0.0.0"; // 监听所有网络接口 + + // 线程安全的客户端连接列表,用于管理所有已连接的客户端 + private ConcurrentDictionary _connectedClients; + + // 信号去重相关 + private ConcurrentDictionary _recentSignals; // 键:信号唯一标识,值:时间戳 + private long _deduplicationCount; // 去重统计计数 + private object _deduplicationLock = new object(); // 用于统计计数的锁 + private readonly int _timeWindowMs = 1000; // 时间窗口:1000毫秒 + private readonly int _cleanupIntervalMs = 5000; // 清理间隔:5000毫秒 + + /// + /// 构造函数 + /// + /// 日志服务 + public TcpServerService(ILogger logger) + { + _logger = logger; + _connectedClients = new ConcurrentDictionary(); + _recentSignals = new ConcurrentDictionary(); + _deduplicationCount = 0; + } + + /// + /// 执行TCP服务器逻辑 + /// + /// 停止令牌 + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // 创建与停止令牌关联的取消令牌源 + _cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + + try + { + // 初始化TCP监听器 + IPAddress localAddr = IPAddress.Parse(_ipAddress); + _tcpListener = new TcpListener(localAddr, _port); + + // 启动监听器 + _tcpListener.Start(); + _logger.LogInformation($"TCP服务器启动成功,监听地址: {_ipAddress}:{_port}"); + Console.WriteLine($"[TCP服务器] 启动成功,监听地址: {_ipAddress}:{_port}"); + + // 启动定期清理任务 + _ = Task.Run(() => CleanupExpiredSignalsAsync(_cts.Token), _cts.Token); + + // 启动统计报告任务 + _ = Task.Run(() => ReportDeduplicationStatsAsync(_cts.Token), _cts.Token); + + // 循环接受客户端连接 + while (!_cts.Token.IsCancellationRequested) + { + try + { + // 检查是否有挂起的连接 + if (_tcpListener.Pending()) + { + // 接受客户端连接 + TcpClient client = await _tcpListener.AcceptTcpClientAsync(); + string clientEndPoint = client.Client.RemoteEndPoint?.ToString() ?? "未知客户端"; + + // 将客户端添加到连接列表 + _connectedClients.TryAdd(clientEndPoint, client); + _logger.LogInformation($"客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}"); + Console.WriteLine($"[TCP服务器] 客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}"); + + // 异步处理客户端连接,避免阻塞主线程 + _ = HandleClientAsync(client, clientEndPoint, _cts.Token); + } + else + { + // 避免CPU占用过高,添加适当延迟 + await Task.Delay(100, _cts.Token); + } + } + catch (OperationCanceledException) + { + // 正常停止,无需处理 + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "接受客户端连接时发生错误"); + Console.WriteLine($"[TCP服务器] 接受客户端连接时发生错误: {ex.Message}"); + // 继续运行,不影响服务器整体 + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "TCP服务器启动失败"); + Console.WriteLine($"[TCP服务器] 启动失败: {ex.Message}"); + } + finally + { + // 清理所有客户端连接 + foreach (var client in _connectedClients.Values) + { + try + { + client.Close(); + } + catch { } + } + _connectedClients.Clear(); + + // 清理信号记录 + _recentSignals.Clear(); + + // 停止监听器 + _tcpListener?.Stop(); + _logger.LogInformation("TCP服务器已停止"); + Console.WriteLine("[TCP服务器] 已停止"); + } + } + + /// + /// 定期清理过期的信号记录 + /// + /// 取消令牌 + /// + private async Task CleanupExpiredSignalsAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + long currentTime = GetCurrentTimestampMs(); + int removedCount = 0; + + // 清理过期的信号记录 + foreach (var signalEntry in _recentSignals) + { + if (currentTime - signalEntry.Value > _timeWindowMs) + { + if (_recentSignals.TryRemove(signalEntry.Key, out _)) + { + removedCount++; + } + } + } + + if (removedCount > 0) + { + _logger.LogInformation($"清理了 {removedCount} 条过期信号记录,剩余 {_recentSignals.Count} 条"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "清理过期信号记录时发生错误"); + } + + // 等待下一次清理 + await Task.Delay(_cleanupIntervalMs, cancellationToken); + } + } + + /// + /// 报告去重统计信息 + /// + /// 取消令牌 + /// + private async Task ReportDeduplicationStatsAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + long currentCount = Interlocked.Exchange(ref _deduplicationCount, 0); + if (currentCount > 0) + { + _logger.LogInformation($"过去5秒内去重了 {currentCount} 条信号"); + Console.WriteLine($"[TCP服务器] 过去5秒内去重了 {currentCount} 条信号"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "报告去重统计信息时发生错误"); + } + + // 每5秒报告一次 + await Task.Delay(5000, cancellationToken); + } + } + + /// + /// 检查信号是否需要去重 + /// + /// 信号数据 + /// 是否需要去重(true:需要去重,false:不需要去重) + private bool ShouldDeduplicate(string signalData) + { + try + { + // 生成信号唯一标识 + string signalId = GenerateSignalId(signalData); + long currentTime = GetCurrentTimestampMs(); + + // 检查是否存在近期相同的信号 + if (_recentSignals.TryGetValue(signalId, out long existingTime)) + { + // 检查是否在时间窗口内 + if (currentTime - existingTime <= _timeWindowMs) + { + // 记录去重计数 + Interlocked.Increment(ref _deduplicationCount); + _logger.LogInformation($"信号去重:{signalId},时间差:{currentTime - existingTime}ms"); + return true; + } + } + + // 更新或添加信号记录 + _recentSignals[signalId] = currentTime; + return false; + } + catch (Exception ex) + { + _logger.LogError(ex, "信号去重检查时发生错误"); + // 发生错误时,允许信号通过,避免影响主流程 + return false; + } + } + + /// + /// 生成信号唯一标识 + /// + /// 信号数据 + /// 信号唯一标识 + private string GenerateSignalId(string signalData) + { + // 对于shgx/production/changePackagePrevention/XXXX格式的信号,使用完整字符串作为唯一标识 + // 对于其他信号,可以根据具体格式生成唯一标识 + return signalData; + } + + /// + /// 获取当前时间戳(毫秒) + /// + /// 时间戳(毫秒) + private long GetCurrentTimestampMs() + { + // 使用DateTime.UtcNow确保时间一致性,减少系统时间跳变的影响 + return (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds; + } + + /// + /// 处理客户端连接 + /// + /// TCP客户端 + /// 客户端端点信息 + /// 取消令牌 + /// + private async Task HandleClientAsync(TcpClient client, string clientEndPoint, CancellationToken cancellationToken) + { + using (client) + using (NetworkStream stream = client.GetStream()) + { + try + { + // 设置读取超时 + client.ReceiveTimeout = 30000; // 30秒 + client.SendTimeout = 30000; // 30秒 + + // 缓冲区大小 + byte[] buffer = new byte[4096]; + int bytesRead; + + // 循环读取客户端数据 + while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0) + { + try + { + // 解析接收到的数据 + string receivedData = Encoding.UTF8.GetString(buffer, 0, bytesRead); + _logger.LogInformation($"从客户端 {clientEndPoint} 接收到数据: {receivedData}"); + Console.WriteLine($"[TCP服务器] 从客户端 {clientEndPoint} 接收到数据: {receivedData}"); + + // 检查是否是需要广播的消息格式 + if (receivedData.StartsWith("shgx/cx/gz/")) + { + // 检查是否需要去重 + if (!ShouldDeduplicate(receivedData)) + { + // 向所有已连接的客户端广播消息 + await BroadcastMessageAsync(receivedData, cancellationToken); + } + else + { + // 信号已去重,记录日志 + Console.WriteLine($"[TCP服务器] 信号已去重: {receivedData}"); + } + } + else + { + // 处理数据并生成响应 + string response = ProcessData(receivedData); + byte[] responseBytes = Encoding.UTF8.GetBytes(response); + + // 发送响应 + await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken); + _logger.LogInformation($"向客户端 {clientEndPoint} 发送响应: {response}"); + Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 发送响应: {response}"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"处理客户端 {clientEndPoint} 数据时发生错误"); + Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 数据时发生错误: {ex.Message}"); + + // 发送错误响应 + string errorResponse = $"Error: {ex.Message}"; + byte[] errorBytes = Encoding.UTF8.GetBytes(errorResponse); + await stream.WriteAsync(errorBytes, 0, errorBytes.Length, cancellationToken); + } + } + + // 客户端断开连接 + _connectedClients.TryRemove(clientEndPoint, out _); + _logger.LogInformation($"客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}"); + Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}"); + } + catch (OperationCanceledException) + { + // 正常停止,无需处理 + _connectedClients.TryRemove(clientEndPoint, out _); + } + catch (Exception ex) + { + _logger.LogError(ex, $"处理客户端 {clientEndPoint} 连接时发生错误"); + Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 连接时发生错误: {ex.Message}"); + _connectedClients.TryRemove(clientEndPoint, out _); + } + } + } + + /// + /// 向所有已连接的客户端广播消息 + /// + /// 要广播的消息 + /// 取消令牌 + /// + private async Task BroadcastMessageAsync(string message, CancellationToken cancellationToken) + { + _logger.LogInformation($"开始广播消息: {message},目标客户端数: {_connectedClients.Count}"); + Console.WriteLine($"[TCP服务器] 开始广播消息: {message},目标客户端数: {_connectedClients.Count}"); + + byte[] messageBytes = Encoding.UTF8.GetBytes(message); + int broadcastCount = 0; + + // 遍历所有客户端并发送消息 + foreach (var clientEntry in _connectedClients) + { + string clientEndPoint = clientEntry.Key; + TcpClient client = clientEntry.Value; + + try + { + if (client.Connected) + { + // 直接获取网络流,不要使用using块,避免重复释放 + NetworkStream stream = client.GetStream(); + await stream.WriteAsync(messageBytes, 0, messageBytes.Length, cancellationToken); + broadcastCount++; + _logger.LogInformation($"向客户端 {clientEndPoint} 广播消息成功"); + Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息成功"); + } + else + { + // 移除已断开的客户端 + _connectedClients.TryRemove(clientEndPoint, out _); + _logger.LogInformation($"客户端 {clientEndPoint} 已断开,从连接列表中移除"); + Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 已断开,从连接列表中移除"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"向客户端 {clientEndPoint} 广播消息时发生错误"); + Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息时发生错误: {ex.Message}"); + // 移除失败的客户端 + _connectedClients.TryRemove(clientEndPoint, out _); + } + } + + _logger.LogInformation($"广播完成,成功发送到 {broadcastCount} 个客户端"); + Console.WriteLine($"[TCP服务器] 广播完成,成功发送到 {broadcastCount} 个客户端"); + } + + /// + /// 处理接收到的数据 + /// + /// 接收到的数据 + /// 响应数据 + private string ProcessData(string data) + { + // 这里可以根据实际业务需求处理数据 + // 示例:简单回显接收到的数据 + return $"Server response: {data}"; + } + } +}