fg_yida_2/YiDa_WinForm/Service/MqttClientService.cs

186 lines
6.6 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MySql.Data.MySqlClient;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using YiDa_WinForm.Config;
using MQTTnet.Packets;
namespace YiDa_WinForm.Service.Mqtt
{
/// <summary>
/// MQTT连接Service
/// </summary>
public class MqttClientService
{
// MQTT连接配置
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
// 数据库连接配置
private readonly string _connectionString = AppConfig.MySqlConnectionString;
// 通知UI层
public event Action<string> MessageReceived;
/// <summary>
/// 构造器
/// </summary>
public MqttClientService()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 连接成功事件
_mqttClient.ConnectedAsync += async e =>
{
MessageReceived?.Invoke("已连接MQTT服务器");
// 定义要订阅的多个 Topic 和对应的 QoS
var topicFilters = new List<MqttTopicFilter>
{
new MqttTopicFilter { Topic = "fg_yida/#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }
};
// 构建批量订阅选项
var subscribeOptions = new MqttClientSubscribeOptions
{
TopicFilters = topicFilters
};
// 执行订阅并仅校验
try
{
await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
MessageReceived?.Invoke("已订阅主题fg_yida");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"订阅失败:{ex.Message}");
}
};
// 断开事件
_mqttClient.DisconnectedAsync += e =>
{
MessageReceived?.Invoke("已断开连接!");
return Task.CompletedTask;
};
// 收到消息事件
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray());
MessageReceived?.Invoke($" 收到消息:{topic} → {payload}");
await SaveMessageToDatabaseAsync(topic, payload);
};
// 构建连接参数
_options = new MqttClientOptionsBuilder()
.WithClientId("fg_yida_server_07")
.WithTcpServer("192.168.1.103", 1883)
.WithCredentials("admin", "admin123")
.WithCleanSession()
.Build();
}
/// <summary>
/// 开启连接
/// </summary>
public async Task MqttClientStartAsync()
{
await _mqttClient.ConnectAsync(_options);
}
/// <summary>
/// 断开连接
/// </summary>
public async Task MqttClientStopAsync()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
MessageReceived?.Invoke("手动断开连接。");
}
}
/// <summary>
/// 保存MQTT数据到数据库
/// </summary>
/// <param name="topic"></param>
/// <param name="payload"></param>
private async Task SaveMessageToDatabaseAsync(string topic, string payload)
{
try
{
// 新topic示例: fg_yida/424949da1b1e/fg_ht_zs_01/device/data/push
string[] topics = topic.Split('/');
// 默认 unknown
string deviceCode = "unknown";
if (topics.Length >= 3)
{
// 第三个部分是设备代号,例如 fg_ht_zs_01、fg_ht_zs_02
string deviceName = topics[2];
// fg_ht_zs_01 → device1fg_ht_zs_02 → device2
switch (deviceName)
{
case "fg_ht_zs_01":
deviceCode = "device1";
break;
case "fg_ht_zs_02":
deviceCode = "device2";
break;
case "fg_ht_wk_01":
deviceCode = "device3";
break;
case "fg_ht_wk_02":
deviceCode = "device4";
break;
default:
deviceCode = deviceName; // 新设备直接使用原始名称
break;
}
}
// 解析 payload 获取时间戳
JObject json = JObject.Parse(payload);
long timestamp = json["time"]?.Value<long>() ?? 0;
DateTime timeReceive = DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime;
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
string sql = @"INSERT INTO yida_mqtt_message (device_code, receive_data, receive_time, create_time)
VALUES (@device_code, @receive_data, @receive_time, @create_time)";
DateTime timeSave = DateTime.Now;
using (var cmd = new MySqlCommand(sql, conn))
{
cmd.Parameters.AddWithValue("@device_code", deviceCode);
cmd.Parameters.AddWithValue("@receive_data", payload);
cmd.Parameters.AddWithValue("@receive_time", timeReceive);
cmd.Parameters.AddWithValue("@create_time", timeSave);
await cmd.ExecuteNonQueryAsync();
}
}
MessageReceived?.Invoke("数据已保存到数据库。");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}");
}
}
}
}