98 lines
3.5 KiB
C#
98 lines
3.5 KiB
C#
using Infrastructure.Attribute;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.Logging;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using MQTTnet.Packets;
|
|
using MQTTnet.Protocol;
|
|
using MQTTnet.Server;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using static Org.BouncyCastle.Math.EC.ECCurve;
|
|
|
|
namespace ZR.Common.MqttHelper
|
|
{
|
|
[AppService(ServiceLifetime = LifeTime.Singleton)]
|
|
public class MyMqttConfig
|
|
{
|
|
// 配置常量
|
|
private const string MqttConfigSection = "MqttConfig";
|
|
private const string TopicsConfigKey = "Topics";
|
|
private const int DefaultReconnectDelaySeconds = 5;
|
|
private const int MaxReconnectAttempts = 10;
|
|
private const int MaxReconnectDelaySeconds = 60;
|
|
|
|
private readonly ILogger<MyMqttConfig> _logger;
|
|
private readonly IConfiguration _configuration;
|
|
|
|
public MyMqttConfig(ILogger<MyMqttConfig> logger, IConfiguration configuration)
|
|
{
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
_configuration = configuration;
|
|
}
|
|
|
|
public MqttClientOptions BuildMqttClientOptions()
|
|
{
|
|
var mqttConfig = _configuration.GetSection(MqttConfigSection);
|
|
var builder = new MqttClientOptionsBuilder()
|
|
.WithClientId(mqttConfig["ClientId"] ?? Guid.NewGuid().ToString())
|
|
.WithTcpServer(mqttConfig["Server"], mqttConfig.GetValue<int>("Port", 1883))
|
|
.WithCleanSession();
|
|
return builder.Build();
|
|
}
|
|
|
|
public MqttClientSubscribeOptions GetSubscribeToTopicsAsync()
|
|
{
|
|
var topicsSection = _configuration.GetSection($"{MqttConfigSection}:{TopicsConfigKey}");
|
|
var topicConfigs = topicsSection.Get<List<TopicSubscriptionConfig>>() ?? new List<TopicSubscriptionConfig>();
|
|
|
|
if (!topicConfigs.Any())
|
|
{
|
|
_logger.LogInformation("没有配置要订阅的MQTT主题");
|
|
return null; // 或返回空配置,根据业务需求决定
|
|
}
|
|
|
|
var subscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder();
|
|
|
|
foreach (var config in topicConfigs)
|
|
{
|
|
var topicFilter = config.ToMqttTopicFilter();
|
|
subscribeOptionsBuilder.WithTopicFilter(topicFilter); // 直接添加单个主题过滤器
|
|
}
|
|
|
|
return subscribeOptionsBuilder.Build();
|
|
}
|
|
|
|
|
|
|
|
// 主题订阅配置类
|
|
private class TopicSubscriptionConfig
|
|
{
|
|
public string Topic { get; set; }
|
|
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } = MqttQualityOfServiceLevel.AtLeastOnce;
|
|
public bool NoLocal { get; set; } = false;
|
|
public bool RetainAsPublished { get; set; } = false;
|
|
public MqttRetainHandling RetainHandling { get; set; } = MqttRetainHandling.SendAtSubscribe;
|
|
|
|
public MqttTopicFilter ToMqttTopicFilter()
|
|
{
|
|
return new MqttTopicFilterBuilder()
|
|
.WithTopic(Topic)
|
|
.WithQualityOfServiceLevel(QualityOfServiceLevel)
|
|
.WithNoLocal(NoLocal)
|
|
.WithRetainAsPublished(RetainAsPublished)
|
|
.WithRetainHandling(RetainHandling)
|
|
.Build();
|
|
}
|
|
|
|
public override string ToString()
|
|
{
|
|
return $"{Topic}@QoS{QualityOfServiceLevel}";
|
|
}
|
|
}
|
|
}
|
|
}
|