using MQTTnet;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using System;
using System.Threading.Tasks;
namespace DOAN.Infrastructure.Helper
{
///
/// MQTT 消息发送工具类
///
public class MqttHelper : IDisposable
{
// MQTT Broker 配置(可外部设置或配置文件读取)
public string MqttBrokerUrl { get; set; } = "broker.hivemq.com";
public int MqttBrokerPort { get; set; } = 1883;
public string MqttUsername { get; set; } = null;
public string MqttPassword { get; set; } = null;
private IMqttClient _mqttClient;
private bool _disposed = false;
///
/// 构造函数 - 使用默认配置
///
public MqttHelper() { }
///
/// 构造函数 - 自定义配置
///
public MqttHelper(string brokerUrl, int brokerPort, string username = null, string password = null)
{
MqttBrokerUrl = brokerUrl;
MqttBrokerPort = brokerPort;
MqttUsername = username;
MqttPassword = password;
}
///
/// 连接到 MQTT Broker
///
public async Task ConnectAsync()
{
try
{
if (_mqttClient?.IsConnected == true)
{
Console.WriteLine("MQTT 客户端已经连接");
return true;
}
var factory = new MqttClientFactory();
_mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(MqttBrokerUrl, MqttBrokerPort)
.WithCredentials(MqttUsername, MqttPassword)
.WithCleanSession()
.Build();
Console.WriteLine($"正在连接到 MQTT Broker: {MqttBrokerUrl}:{MqttBrokerPort}...");
await _mqttClient.ConnectAsync(options);
Console.WriteLine(" MQTT 连接成功!");
return true;
}
catch (Exception ex)
{
Console.WriteLine($" MQTT 连接失败: {ex.Message}");
return false;
}
}
///
/// 发送消息到指定主题
///
/// 目标主题
/// 消息内容
/// QoS 级别,默认为 AtLeastOnce(1)
/// 是否保留消息,默认为 false
public async Task PublishMessageAsync(string topic, string messageContent,
MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtLeastOnce,
bool retain = false)
{
try
{
if (_mqttClient?.IsConnected != true)
{
Console.WriteLine("MQTT 客户端未连接,尝试重新连接...");
if (!await ConnectAsync())
{
Console.WriteLine(" 重连失败,无法发送消息");
return false;
}
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(messageContent)
.WithQualityOfServiceLevel(qosLevel)
.WithRetainFlag(retain)
.Build();
Console.WriteLine($"正在向主题 '{topic}' 发送消息: {messageContent}");
await _mqttClient.PublishAsync(message);
Console.WriteLine(" 消息发送成功!");
return true;
}
catch (Exception ex)
{
Console.WriteLine($" 消息发送失败: {ex.Message}");
return false;
}
}
///
/// 发送消息(使用默认主题配置)
///
/// 消息内容
/// QoS 级别
/// 是否保留消息
public async Task PublishMessageAsync(string messageContent,
MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtLeastOnce,
bool retain = false)
{
return await PublishMessageAsync("product/topic", messageContent, qosLevel, retain);
}
///
/// 断开 MQTT 连接
///
public async Task DisconnectAsync()
{
try
{
if (_mqttClient?.IsConnected == true)
{
await _mqttClient.DisconnectAsync();
Console.WriteLine(" MQTT 已断开连接");
}
}
catch (Exception ex)
{
Console.WriteLine($" 断开连接时发生错误: {ex.Message}");
}
}
///
/// 检查是否已连接
///
public bool IsConnected => _mqttClient?.IsConnected == true;
///
/// 释放资源
///
public void Dispose()
{
if (!_disposed)
{
DisconnectAsync().Wait();
_mqttClient?.Dispose();
_disposed = true;
}
}
}
}