feat(tcp): 添加TCP服务器服务实现多客户端并发连接

实现基于BackgroundService的TCP服务器,支持多客户端并发连接和消息广播
包含信号去重功能,防止短时间内重复处理相同信号
添加客户端连接管理和统计报告功能
This commit is contained in:
赵正易 2026-02-06 14:32:46 +08:00
parent 37c19fbd24
commit b4879f821d
2 changed files with 444 additions and 0 deletions

View File

@ -42,6 +42,9 @@ builder.Services.AddSingleton<MyMqttConfig>();
builder.Services.AddSingleton<MqttService>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<MqttService>());
/// ===============================================================================
// 注册TCP服务器服务
builder.Services.AddHostedService<ZR.Service.tcp.TcpServerService>();
// 跨域配置
builder.Services.AddCors(builder.Configuration);
// 显示logo

View File

@ -0,0 +1,441 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ZR.Service.tcp
{
/// <summary>
/// 信号去重记录
/// </summary>
public class SignalDeduplicationRecord
{
/// <summary>
/// 信号唯一标识
/// </summary>
public string SignalId { get; set; }
/// <summary>
/// 接收时间戳(毫秒)
/// </summary>
public long Timestamp { get; set; }
}
/// <summary>
/// TCP服务器服务
/// 基于BackgroundService实现支持多客户端并发连接
/// 监听端口9090
/// </summary>
public class TcpServerService : BackgroundService
{
private readonly ILogger<TcpServerService> _logger;
private TcpListener _tcpListener;
private CancellationTokenSource _cts;
private readonly int _port = 9090;
private readonly string _ipAddress = "0.0.0.0"; // 监听所有网络接口
// 线程安全的客户端连接列表,用于管理所有已连接的客户端
private ConcurrentDictionary<string, TcpClient> _connectedClients;
// 信号去重相关
private ConcurrentDictionary<string, long> _recentSignals; // 键:信号唯一标识,值:时间戳
private long _deduplicationCount; // 去重统计计数
private object _deduplicationLock = new object(); // 用于统计计数的锁
private readonly int _timeWindowMs = 1000; // 时间窗口1000毫秒
private readonly int _cleanupIntervalMs = 5000; // 清理间隔5000毫秒
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志服务</param>
public TcpServerService(ILogger<TcpServerService> logger)
{
_logger = logger;
_connectedClients = new ConcurrentDictionary<string, TcpClient>();
_recentSignals = new ConcurrentDictionary<string, long>();
_deduplicationCount = 0;
}
/// <summary>
/// 执行TCP服务器逻辑
/// </summary>
/// <param name="stoppingToken">停止令牌</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 创建与停止令牌关联的取消令牌源
_cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
try
{
// 初始化TCP监听器
IPAddress localAddr = IPAddress.Parse(_ipAddress);
_tcpListener = new TcpListener(localAddr, _port);
// 启动监听器
_tcpListener.Start();
_logger.LogInformation($"TCP服务器启动成功监听地址: {_ipAddress}:{_port}");
Console.WriteLine($"[TCP服务器] 启动成功,监听地址: {_ipAddress}:{_port}");
// 启动定期清理任务
_ = Task.Run(() => CleanupExpiredSignalsAsync(_cts.Token), _cts.Token);
// 启动统计报告任务
_ = Task.Run(() => ReportDeduplicationStatsAsync(_cts.Token), _cts.Token);
// 循环接受客户端连接
while (!_cts.Token.IsCancellationRequested)
{
try
{
// 检查是否有挂起的连接
if (_tcpListener.Pending())
{
// 接受客户端连接
TcpClient client = await _tcpListener.AcceptTcpClientAsync();
string clientEndPoint = client.Client.RemoteEndPoint?.ToString() ?? "未知客户端";
// 将客户端添加到连接列表
_connectedClients.TryAdd(clientEndPoint, client);
_logger.LogInformation($"客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}");
Console.WriteLine($"[TCP服务器] 客户端连接成功: {clientEndPoint},当前连接数: {_connectedClients.Count}");
// 异步处理客户端连接,避免阻塞主线程
_ = HandleClientAsync(client, clientEndPoint, _cts.Token);
}
else
{
// 避免CPU占用过高添加适当延迟
await Task.Delay(100, _cts.Token);
}
}
catch (OperationCanceledException)
{
// 正常停止,无需处理
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "接受客户端连接时发生错误");
Console.WriteLine($"[TCP服务器] 接受客户端连接时发生错误: {ex.Message}");
// 继续运行,不影响服务器整体
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "TCP服务器启动失败");
Console.WriteLine($"[TCP服务器] 启动失败: {ex.Message}");
}
finally
{
// 清理所有客户端连接
foreach (var client in _connectedClients.Values)
{
try
{
client.Close();
}
catch { }
}
_connectedClients.Clear();
// 清理信号记录
_recentSignals.Clear();
// 停止监听器
_tcpListener?.Stop();
_logger.LogInformation("TCP服务器已停止");
Console.WriteLine("[TCP服务器] 已停止");
}
}
/// <summary>
/// 定期清理过期的信号记录
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task CleanupExpiredSignalsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
long currentTime = GetCurrentTimestampMs();
int removedCount = 0;
// 清理过期的信号记录
foreach (var signalEntry in _recentSignals)
{
if (currentTime - signalEntry.Value > _timeWindowMs)
{
if (_recentSignals.TryRemove(signalEntry.Key, out _))
{
removedCount++;
}
}
}
if (removedCount > 0)
{
_logger.LogInformation($"清理了 {removedCount} 条过期信号记录,剩余 {_recentSignals.Count} 条");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "清理过期信号记录时发生错误");
}
// 等待下一次清理
await Task.Delay(_cleanupIntervalMs, cancellationToken);
}
}
/// <summary>
/// 报告去重统计信息
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task ReportDeduplicationStatsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
long currentCount = Interlocked.Exchange(ref _deduplicationCount, 0);
if (currentCount > 0)
{
_logger.LogInformation($"过去5秒内去重了 {currentCount} 条信号");
Console.WriteLine($"[TCP服务器] 过去5秒内去重了 {currentCount} 条信号");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "报告去重统计信息时发生错误");
}
// 每5秒报告一次
await Task.Delay(5000, cancellationToken);
}
}
/// <summary>
/// 检查信号是否需要去重
/// </summary>
/// <param name="signalData">信号数据</param>
/// <returns>是否需要去重true需要去重false不需要去重</returns>
private bool ShouldDeduplicate(string signalData)
{
try
{
// 生成信号唯一标识
string signalId = GenerateSignalId(signalData);
long currentTime = GetCurrentTimestampMs();
// 检查是否存在近期相同的信号
if (_recentSignals.TryGetValue(signalId, out long existingTime))
{
// 检查是否在时间窗口内
if (currentTime - existingTime <= _timeWindowMs)
{
// 记录去重计数
Interlocked.Increment(ref _deduplicationCount);
_logger.LogInformation($"信号去重:{signalId},时间差:{currentTime - existingTime}ms");
return true;
}
}
// 更新或添加信号记录
_recentSignals[signalId] = currentTime;
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "信号去重检查时发生错误");
// 发生错误时,允许信号通过,避免影响主流程
return false;
}
}
/// <summary>
/// 生成信号唯一标识
/// </summary>
/// <param name="signalData">信号数据</param>
/// <returns>信号唯一标识</returns>
private string GenerateSignalId(string signalData)
{
// 对于shgx/production/changePackagePrevention/XXXX格式的信号使用完整字符串作为唯一标识
// 对于其他信号,可以根据具体格式生成唯一标识
return signalData;
}
/// <summary>
/// 获取当前时间戳(毫秒)
/// </summary>
/// <returns>时间戳(毫秒)</returns>
private long GetCurrentTimestampMs()
{
// 使用DateTime.UtcNow确保时间一致性减少系统时间跳变的影响
return (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
}
/// <summary>
/// 处理客户端连接
/// </summary>
/// <param name="client">TCP客户端</param>
/// <param name="clientEndPoint">客户端端点信息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task HandleClientAsync(TcpClient client, string clientEndPoint, CancellationToken cancellationToken)
{
using (client)
using (NetworkStream stream = client.GetStream())
{
try
{
// 设置读取超时
client.ReceiveTimeout = 30000; // 30秒
client.SendTimeout = 30000; // 30秒
// 缓冲区大小
byte[] buffer = new byte[4096];
int bytesRead;
// 循环读取客户端数据
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
{
try
{
// 解析接收到的数据
string receivedData = Encoding.UTF8.GetString(buffer, 0, bytesRead);
_logger.LogInformation($"从客户端 {clientEndPoint} 接收到数据: {receivedData}");
Console.WriteLine($"[TCP服务器] 从客户端 {clientEndPoint} 接收到数据: {receivedData}");
// 检查是否是需要广播的消息格式
if (receivedData.StartsWith("shgx/cx/gz/"))
{
// 检查是否需要去重
if (!ShouldDeduplicate(receivedData))
{
// 向所有已连接的客户端广播消息
await BroadcastMessageAsync(receivedData, cancellationToken);
}
else
{
// 信号已去重,记录日志
Console.WriteLine($"[TCP服务器] 信号已去重: {receivedData}");
}
}
else
{
// 处理数据并生成响应
string response = ProcessData(receivedData);
byte[] responseBytes = Encoding.UTF8.GetBytes(response);
// 发送响应
await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken);
_logger.LogInformation($"向客户端 {clientEndPoint} 发送响应: {response}");
Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 发送响应: {response}");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理客户端 {clientEndPoint} 数据时发生错误");
Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 数据时发生错误: {ex.Message}");
// 发送错误响应
string errorResponse = $"Error: {ex.Message}";
byte[] errorBytes = Encoding.UTF8.GetBytes(errorResponse);
await stream.WriteAsync(errorBytes, 0, errorBytes.Length, cancellationToken);
}
}
// 客户端断开连接
_connectedClients.TryRemove(clientEndPoint, out _);
_logger.LogInformation($"客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}");
Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 断开连接,当前连接数: {_connectedClients.Count}");
}
catch (OperationCanceledException)
{
// 正常停止,无需处理
_connectedClients.TryRemove(clientEndPoint, out _);
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理客户端 {clientEndPoint} 连接时发生错误");
Console.WriteLine($"[TCP服务器] 处理客户端 {clientEndPoint} 连接时发生错误: {ex.Message}");
_connectedClients.TryRemove(clientEndPoint, out _);
}
}
}
/// <summary>
/// 向所有已连接的客户端广播消息
/// </summary>
/// <param name="message">要广播的消息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
private async Task BroadcastMessageAsync(string message, CancellationToken cancellationToken)
{
_logger.LogInformation($"开始广播消息: {message},目标客户端数: {_connectedClients.Count}");
Console.WriteLine($"[TCP服务器] 开始广播消息: {message},目标客户端数: {_connectedClients.Count}");
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
int broadcastCount = 0;
// 遍历所有客户端并发送消息
foreach (var clientEntry in _connectedClients)
{
string clientEndPoint = clientEntry.Key;
TcpClient client = clientEntry.Value;
try
{
if (client.Connected)
{
// 直接获取网络流不要使用using块避免重复释放
NetworkStream stream = client.GetStream();
await stream.WriteAsync(messageBytes, 0, messageBytes.Length, cancellationToken);
broadcastCount++;
_logger.LogInformation($"向客户端 {clientEndPoint} 广播消息成功");
Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息成功");
}
else
{
// 移除已断开的客户端
_connectedClients.TryRemove(clientEndPoint, out _);
_logger.LogInformation($"客户端 {clientEndPoint} 已断开,从连接列表中移除");
Console.WriteLine($"[TCP服务器] 客户端 {clientEndPoint} 已断开,从连接列表中移除");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"向客户端 {clientEndPoint} 广播消息时发生错误");
Console.WriteLine($"[TCP服务器] 向客户端 {clientEndPoint} 广播消息时发生错误: {ex.Message}");
// 移除失败的客户端
_connectedClients.TryRemove(clientEndPoint, out _);
}
}
_logger.LogInformation($"广播完成,成功发送到 {broadcastCount} 个客户端");
Console.WriteLine($"[TCP服务器] 广播完成,成功发送到 {broadcastCount} 个客户端");
}
/// <summary>
/// 处理接收到的数据
/// </summary>
/// <param name="data">接收到的数据</param>
/// <returns>响应数据</returns>
private string ProcessData(string data)
{
// 这里可以根据实际业务需求处理数据
// 示例:简单回显接收到的数据
return $"Server response: {data}";
}
}
}