170 lines
5.5 KiB
C#
170 lines
5.5 KiB
C#
using MQTTnet;
|
|
using MQTTnet.Formatter;
|
|
using MQTTnet.Protocol;
|
|
using System;
|
|
using System.Threading.Tasks;
|
|
namespace DOAN.Infrastructure.Helper
|
|
{
|
|
|
|
/// <summary>
|
|
/// MQTT 消息发送工具类
|
|
/// </summary>
|
|
public class MqttHelper : IDisposable
|
|
{
|
|
// MQTT Broker 配置(可外部设置或配置文件读取)
|
|
public string MqttBrokerUrl { get; set; } = "broker.hivemq.com";
|
|
public int MqttBrokerPort { get; set; } = 1883;
|
|
public string MqttUsername { get; set; } = null;
|
|
public string MqttPassword { get; set; } = null;
|
|
|
|
private IMqttClient _mqttClient;
|
|
|
|
private bool _disposed = false;
|
|
|
|
/// <summary>
|
|
/// 构造函数 - 使用默认配置
|
|
/// </summary>
|
|
public MqttHelper() { }
|
|
|
|
/// <summary>
|
|
/// 构造函数 - 自定义配置
|
|
/// </summary>
|
|
public MqttHelper(string brokerUrl, int brokerPort, string username = null, string password = null)
|
|
{
|
|
MqttBrokerUrl = brokerUrl;
|
|
MqttBrokerPort = brokerPort;
|
|
MqttUsername = username;
|
|
MqttPassword = password;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 连接到 MQTT Broker
|
|
/// </summary>
|
|
public async Task<bool> ConnectAsync()
|
|
{
|
|
try
|
|
{
|
|
if (_mqttClient?.IsConnected == true)
|
|
{
|
|
Console.WriteLine("MQTT 客户端已经连接");
|
|
return true;
|
|
}
|
|
|
|
var factory = new MqttClientFactory();
|
|
_mqttClient = factory.CreateMqttClient();
|
|
|
|
var options = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(MqttBrokerUrl, MqttBrokerPort)
|
|
.WithCredentials(MqttUsername, MqttPassword)
|
|
.WithCleanSession()
|
|
.Build();
|
|
|
|
Console.WriteLine($"正在连接到 MQTT Broker: {MqttBrokerUrl}:{MqttBrokerPort}...");
|
|
await _mqttClient.ConnectAsync(options);
|
|
|
|
Console.WriteLine("✅ MQTT 连接成功!");
|
|
return true;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"❌ MQTT 连接失败: {ex.Message}");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发送消息到指定主题
|
|
/// </summary>
|
|
/// <param name="topic">目标主题</param>
|
|
/// <param name="messageContent">消息内容</param>
|
|
/// <param name="qosLevel">QoS 级别,默认为 AtLeastOnce(1)</param>
|
|
/// <param name="retain">是否保留消息,默认为 false</param>
|
|
public async Task<bool> PublishMessageAsync(string topic, string messageContent,
|
|
MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtLeastOnce,
|
|
bool retain = false)
|
|
{
|
|
try
|
|
{
|
|
if (_mqttClient?.IsConnected != true)
|
|
{
|
|
Console.WriteLine("MQTT 客户端未连接,尝试重新连接...");
|
|
if (!await ConnectAsync())
|
|
{
|
|
Console.WriteLine("❌ 重连失败,无法发送消息");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
var message = new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(messageContent)
|
|
.WithQualityOfServiceLevel(qosLevel)
|
|
.WithRetainFlag(retain)
|
|
.Build();
|
|
|
|
Console.WriteLine($"正在向主题 '{topic}' 发送消息: {messageContent}");
|
|
await _mqttClient.PublishAsync(message);
|
|
|
|
Console.WriteLine("✅ 消息发送成功!");
|
|
return true;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"❌ 消息发送失败: {ex.Message}");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发送消息(使用默认主题配置)
|
|
/// </summary>
|
|
/// <param name="messageContent">消息内容</param>
|
|
/// <param name="qosLevel">QoS 级别</param>
|
|
/// <param name="retain">是否保留消息</param>
|
|
public async Task<bool> PublishMessageAsync(string messageContent,
|
|
MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtLeastOnce,
|
|
bool retain = false)
|
|
{
|
|
return await PublishMessageAsync("product/topic", messageContent, qosLevel, retain);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 断开 MQTT 连接
|
|
/// </summary>
|
|
public async Task DisconnectAsync()
|
|
{
|
|
try
|
|
{
|
|
if (_mqttClient?.IsConnected == true)
|
|
{
|
|
await _mqttClient.DisconnectAsync();
|
|
Console.WriteLine("🔌 MQTT 已断开连接");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"❌ 断开连接时发生错误: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 检查是否已连接
|
|
/// </summary>
|
|
public bool IsConnected => _mqttClient?.IsConnected == true;
|
|
|
|
/// <summary>
|
|
/// 释放资源
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
DisconnectAsync().Wait();
|
|
_mqttClient?.Dispose();
|
|
_disposed = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|