using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ZR.Service.tcp { /// /// 信号去重记录 /// public class SignalDeduplicationRecord { /// /// 信号唯一标识 /// public string SignalId { get; set; } /// /// 接收时间戳(毫秒) /// public long Timestamp { get; set; } } /// /// TCP服务器服务 /// 基于BackgroundService实现,支持多客户端并发连接 /// 监听端口:9090 /// public class TcpServerService : BackgroundService { private readonly ILogger _logger; private TcpListener _tcpListener; private CancellationTokenSource _cts; private readonly int _port = 9090; private readonly string _ipAddress = "0.0.0.0"; // 监听所有网络接口 // 线程安全的客户端连接列表,用于管理所有已连接的客户端 private ConcurrentDictionary _connectedClients; // 信号去重相关 private ConcurrentDictionary _recentSignals; // 键:信号唯一标识,值:时间戳 private long _deduplicationCount; // 去重统计计数 private object _deduplicationLock = new object(); // 用于统计计数的锁 private readonly int _timeWindowMs = 1000; // 时间窗口:1000毫秒 private readonly int _cleanupIntervalMs = 5000; // 清理间隔:5000毫秒 /// /// 构造函数 /// /// 日志服务 public TcpServerService(ILogger logger) { _logger = logger; _connectedClients = new ConcurrentDictionary(); _recentSignals = new ConcurrentDictionary(); _deduplicationCount = 0; } /// /// 执行TCP服务器逻辑 /// /// 停止令牌 /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // 创建与停止令牌关联的取消令牌源 _cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); try { // 初始化TCP监听器 IPAddress localAddr = IPAddress.Parse(_ipAddress); _tcpListener = new TcpListener(localAddr, _port); // 启动监听器 _tcpListener.Start(); _logger.LogInformation($"TCP服务器启动成功,监听地址: {_ipAddress}:{_port}"); Console.WriteLine($"[TCP服务器] 启动成功,监听地址: {_ipAddress}:{_port}"); // 启动定期清理任务 _ = Task.Run(() => CleanupExpiredSignalsAsync(_cts.Token), _cts.Token); // 启动统计报告任务 _ = Task.Run(() => ReportDeduplicationStatsAsync(_cts.Token), _cts.Token); // 循环接受客户端连接 while (!_cts.Token.IsCancellationRequested) { try { // 检查是否有挂起的连接 if (_tcpListener.Pending()) { // 接受客户端连接 TcpClient client = await _tcpListener.AcceptTcpClientAsync(); string clientEndPoint = client.Client.RemoteEndPoint?.ToString() ?? "未知客户端"; // 将客户端添加到连接列表 _connectedClients.TryAdd(clientEndPoint, client); _logger.LogInformation($"客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}"); Console.WriteLine($"[TCP服务器] 客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}"); // 异步处理客户端连接,避免阻塞主线程 _ = HandleClientAsync(client, clientEndPoint, _cts.Token); } else { // 避免CPU占用过高,添加适当延迟 await Task.Delay(100, _cts.Token); } } catch (OperationCanceledException) { // 正常停止,无需处理 break; } catch (Exception ex) { _logger.LogError(ex, "接受客户端连接时发生错误"); Console.WriteLine($"[TCP服务器] 接受客户端连接时发生错误: {ex.Message}"); // 继续运行,不影响服务器整体 } } } catch (Exception ex) { _logger.LogError(ex, "TCP服务器启动失败"); Console.WriteLine($"[TCP服务器] 启动失败: {ex.Message}"); } finally { // 清理所有客户端连接 foreach (var client in _connectedClients.Values) { try { client.Close(); } catch { } } _connectedClients.Clear(); // 清理信号记录 _recentSignals.Clear(); // 停止监听器 _tcpListener?.Stop(); _logger.LogInformation("TCP服务器已停止"); Console.WriteLine("[TCP服务器] 已停止"); } } /// /// 定期清理过期的信号记录 /// /// 取消令牌 /// private async Task CleanupExpiredSignalsAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { long currentTime = GetCurrentTimestampMs(); int removedCount = 0; // 清理过期的信号记录 foreach (var signalEntry in _recentSignals) { if (currentTime - signalEntry.Value > _timeWindowMs) { if (_recentSignals.TryRemove(signalEntry.Key, out _)) { removedCount++; } } } if (removedCount > 0) { _logger.LogInformation($"清理了 {removedCount} 条过期信号记录,剩余 {_recentSignals.Count} 条"); } } catch (Exception ex) { _logger.LogError(ex, "清理过期信号记录时发生错误"); } // 等待下一次清理 await Task.Delay(_cleanupIntervalMs, cancellationToken); } } /// /// 报告去重统计信息 /// /// 取消令牌 /// private async Task ReportDeduplicationStatsAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { long currentCount = Interlocked.Exchange(ref _deduplicationCount, 0); if (currentCount > 0) { _logger.LogInformation($"过去5秒内去重了 {currentCount} 条信号"); Console.WriteLine($"[TCP服务器] 过去5秒内去重了 {currentCount} 条信号"); } } catch (Exception ex) { _logger.LogError(ex, "报告去重统计信息时发生错误"); } // 每5秒报告一次 await Task.Delay(5000, cancellationToken); } } /// /// 检查信号是否需要去重 /// /// 信号数据 /// 是否需要去重(true:需要去重,false:不需要去重) private bool ShouldDeduplicate(string signalData) { try { // 生成信号唯一标识 string signalId = GenerateSignalId(signalData); long currentTime = GetCurrentTimestampMs(); // 检查是否存在近期相同的信号 if (_recentSignals.TryGetValue(signalId, out long existingTime)) { // 检查是否在时间窗口内 if (currentTime - existingTime <= _timeWindowMs) { // 记录去重计数 Interlocked.Increment(ref _deduplicationCount); _logger.LogInformation($"信号去重:{signalId},时间差:{currentTime - existingTime}ms"); return true; } } // 更新或添加信号记录 _recentSignals[signalId] = currentTime; return false; } catch (Exception ex) { _logger.LogError(ex, "信号去重检查时发生错误"); // 发生错误时,允许信号通过,避免影响主流程 return false; } } /// /// 生成信号唯一标识 /// /// 信号数据 /// 信号唯一标识 private string GenerateSignalId(string signalData) { // 对于shgx/production/changePackagePrevention/XXXX格式的信号,使用完整字符串作为唯一标识 // 对于其他信号,可以根据具体格式生成唯一标识 return signalData; } /// /// 获取当前时间戳(毫秒) /// /// 时间戳(毫秒) private long GetCurrentTimestampMs() { // 使用DateTime.UtcNow确保时间一致性,减少系统时间跳变的影响 return (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds; } /// /// 处理客户端连接 /// /// TCP客户端 /// 客户端端点信息 /// 取消令牌 /// private async Task HandleClientAsync(TcpClient client, string clientEndPoint, CancellationToken cancellationToken) { using (client) using (NetworkStream stream = client.GetStream()) { try { // 设置读取超时 client.ReceiveTimeout = 30000; // 30秒 client.SendTimeout = 30000; // 30秒 // 缓冲区大小 byte[] buffer = new byte[4096]; int bytesRead; // 循环读取客户端数据 while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0) { try { // 解析接收到的数据 string receivedData = Encoding.UTF8.GetString(buffer, 0, bytesRead); _logger.LogInformation($"从客户端 {clientEndPoint} 接收到数据: {receivedData}"); Console.WriteLine($"[TCP服务器] 从客户端 {clientEndPoint} 接收到数据: {receivedData}"); // 检查是否是需要广播的消息格式 if (receivedData.StartsWith("shgx/cx/gz/")) { // 检查是否需要去重 if (!ShouldDeduplicate(receivedData)) { // 向所有已连接的客户端广播消息 await BroadcastMessageAsync(receivedData, cancellationToken); } else { // 信号已去重,记录日志 Console.WriteLine($"[TCP服务器] 信号已去重: {receivedData}"); } } else { // 处理数据并生成响应 string response = ProcessData(receivedData); byte[] responseBytes = Encoding.UTF8.GetBytes(response); // 发送响应 await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken); _logger.LogInformation($"向客户端 {clientEndPoint} 发送响应: {response}"); Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 发送响应: {response}"); } } catch (Exception ex) { _logger.LogError(ex, $"处理客户端 {clientEndPoint} 数据时发生错误"); Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 数据时发生错误: {ex.Message}"); // 发送错误响应 string errorResponse = $"Error: {ex.Message}"; byte[] errorBytes = Encoding.UTF8.GetBytes(errorResponse); await stream.WriteAsync(errorBytes, 0, errorBytes.Length, cancellationToken); } } // 客户端断开连接 _connectedClients.TryRemove(clientEndPoint, out _); _logger.LogInformation($"客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}"); Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}"); } catch (OperationCanceledException) { // 正常停止,无需处理 _connectedClients.TryRemove(clientEndPoint, out _); } catch (Exception ex) { _logger.LogError(ex, $"处理客户端 {clientEndPoint} 连接时发生错误"); Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 连接时发生错误: {ex.Message}"); _connectedClients.TryRemove(clientEndPoint, out _); } } } /// /// 向所有已连接的客户端广播消息 /// /// 要广播的消息 /// 取消令牌 /// private async Task BroadcastMessageAsync(string message, CancellationToken cancellationToken) { _logger.LogInformation($"开始广播消息: {message},目标客户端数: {_connectedClients.Count}"); Console.WriteLine($"[TCP服务器] 开始广播消息: {message},目标客户端数: {_connectedClients.Count}"); byte[] messageBytes = Encoding.UTF8.GetBytes(message); int broadcastCount = 0; // 遍历所有客户端并发送消息 foreach (var clientEntry in _connectedClients) { string clientEndPoint = clientEntry.Key; TcpClient client = clientEntry.Value; try { if (client.Connected) { // 直接获取网络流,不要使用using块,避免重复释放 NetworkStream stream = client.GetStream(); await stream.WriteAsync(messageBytes, 0, messageBytes.Length, cancellationToken); broadcastCount++; _logger.LogInformation($"向客户端 {clientEndPoint} 广播消息成功"); Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息成功"); } else { // 移除已断开的客户端 _connectedClients.TryRemove(clientEndPoint, out _); _logger.LogInformation($"客户端 {clientEndPoint} 已断开,从连接列表中移除"); Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 已断开,从连接列表中移除"); } } catch (Exception ex) { _logger.LogError(ex, $"向客户端 {clientEndPoint} 广播消息时发生错误"); Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息时发生错误: {ex.Message}"); // 移除失败的客户端 _connectedClients.TryRemove(clientEndPoint, out _); } } _logger.LogInformation($"广播完成,成功发送到 {broadcastCount} 个客户端"); Console.WriteLine($"[TCP服务器] 广播完成,成功发送到 {broadcastCount} 个客户端"); } /// /// 处理接收到的数据 /// /// 接收到的数据 /// 响应数据 private string ProcessData(string data) { // 这里可以根据实际业务需求处理数据 // 示例:简单回显接收到的数据 return $"Server response: {data}"; } } }