fg_yida_2/YiDa_WinForm/Service/MqttClientService.cs
2026-02-04 09:53:15 +08:00

279 lines
11 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using YiDa_WinForm.Config;
using YiDa_WinForm.Model;
namespace YiDa_WinForm
{
/// <summary>
/// MQTT连接Service优化版使用DbHelper封装数据库操作
/// </summary>
public class MqttClientService
{
// MQTT连接配置
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
string getLatestSql;
private DataTable dtLatest;
// 通知UI层
public event Action<string> MessageReceived;
/// <summary>
/// 构造器
/// </summary>
public MqttClientService()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 连接成功事件
_mqttClient.ConnectedAsync += async e =>
{
MessageReceived?.Invoke("已连接MQTT服务器");
// 定义要订阅的多个 Topic 和对应的 QoS
var topicFilters = new List<MQTTnet.Packets.MqttTopicFilter>
{
new MQTTnet.Packets.MqttTopicFilter
{ Topic = "fg_yida/#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }
};
// 构建批量订阅选项
var subscribeOptions = new MqttClientSubscribeOptions
{
TopicFilters = topicFilters
};
// 执行订阅并仅校验
try
{
await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
MessageReceived?.Invoke("已订阅主题fg_yida");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"订阅失败:{ex.Message}");
}
};
// 断开事件
_mqttClient.DisconnectedAsync += e =>
{
MessageReceived?.Invoke("已断开连接!");
return Task.CompletedTask;
};
// 收到消息事件
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray());
MessageReceived?.Invoke($" 收到消息:{topic} → {payload}");
await SaveMessageToDatabaseAsync(topic, payload);
};
// 构建连接参数
_options = new MqttClientOptionsBuilder()
.WithClientId("fg_yida_server_07")
.WithTcpServer("172.16.101.94", 1883)
.WithCredentials("admin", "admin")
.WithCleanSession()
.Build();
// _options = new MqttClientOptionsBuilder()
// .WithClientId("fg_yida_server_07") // 本地地址
// .WithTcpServer("192.168.1.103", 1883)
// .WithCredentials("admin", "admin")
// .WithCleanSession()
// .Build();
}
/// <summary>
/// 开启连接
/// </summary>
public async Task MqttClientStartAsync()
{
await _mqttClient.ConnectAsync(_options);
}
/// <summary>
/// 断开连接
/// </summary>
public async Task MqttClientStopAsync()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
MessageReceived?.Invoke("手动断开连接。");
}
}
/// <summary>
/// 保存MQTT数据到数据库优化版调用DbHelper
/// </summary>
private async Task SaveMessageToDatabaseAsync(string topic, string payload)
{
try
{
// 解析Topic获取设备编码
string[] topics = topic.Split('/');
string deviceCode = "unknown";
if (topics.Length >= 3)
{
string deviceName = topics[2];
switch (deviceName)
{
case "fg_ht_zs_01": deviceCode = "device1"; break;
case "fg_ht_zs_02": deviceCode = "device2"; break;
case "fg_ht_wk_01": deviceCode = "device3"; break;
case "fg_ht_wk_02": deviceCode = "device4"; break;
default: deviceCode = deviceName; break;
}
}
// 温控器设备device3/4合并PV到注塑机数据
if (deviceCode == "device3" || deviceCode == "device4")
{
await MergeAndSaveData(deviceCode, payload);
return;
}
// 注塑机设备device1/2直接插入原始数据调用DbHelper执行插入
JObject json = JObject.Parse(payload);
long timestamp = json["time"]?.Value<long>() ?? 0;
DateTime timeReceive = timestamp == 0 ? DateTime.Now : DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime;
// 构建SQL语句和参数
string sql = @"INSERT INTO yida_mqtt_message (device_code, receive_data, receive_time, create_time)
VALUES (@device_code, @receive_data, @receive_time, @create_time)";
var parameters = new[]
{
new MySql.Data.MySqlClient.MySqlParameter("@device_code", deviceCode),
new MySql.Data.MySqlClient.MySqlParameter("@receive_data", payload),
new MySql.Data.MySqlClient.MySqlParameter("@receive_time", timeReceive),
new MySql.Data.MySqlClient.MySqlParameter("@create_time", DateTime.Now)
};
// 调用DbHelper公共方法执行插入
await DbHelper.ExecuteNonQueryAsync(sql, parameters);
MessageReceived?.Invoke("数据已保存到数据库。");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}");
}
}
/// <summary>
/// 合并温控器PV到注塑机数据
/// </summary>
private async Task MergeAndSaveData(string deviceCode, string payload)
{
try
{
// 1. 解析温控器MQTT消息提取PV
var tcDataModel = JsonConvert.DeserializeObject<TcDataModel>(payload);
if (tcDataModel == null || tcDataModel.@params == null || string.IsNullOrEmpty(tcDataModel.@params.PV))
{
MessageReceived?.Invoke("MQTT消息格式错误无有效PV字段");
return;
}
string newPvValue = tcDataModel.@params.PV;
if (deviceCode == "device3")
{
getLatestSql = @"SELECT id, receive_data FROM yida_mqtt_message
WHERE device_code = 'device1'
ORDER BY id DESC LIMIT 1";
}
else if(deviceCode == "device4")
{
getLatestSql = @"SELECT id, receive_data FROM yida_mqtt_message
WHERE device_code = 'device2'
ORDER BY id DESC LIMIT 1";
}
else
{
MessageReceived?.Invoke("不是有效设备");
}
if (getLatestSql != null)
{
dtLatest = await DbHelper.ExecuteQueryAsync(getLatestSql);
}
// 无注塑机数据时直接返回
if (dtLatest.Rows.Count == 0)
{
MessageReceived?.Invoke("数据库中无注塑机数据,无法合并");
return;
}
getLatestSql = null;
// 提取查询结果中的主键ID和JSON数据
long latestId = Convert.ToInt64(dtLatest.Rows[0]["id"]);
string latestReceiveData = dtLatest.Rows[0]["receive_data"] == DBNull.Value ? null : dtLatest.Rows[0]["receive_data"].ToString();
if (string.IsNullOrEmpty(latestReceiveData))
{
MessageReceived?.Invoke("数据库中注塑机数据无有效receive_data");
return;
}
// 3. 反序列化注塑机JSON数据
var dataModel = JsonConvert.DeserializeObject<SQLDataModel>(latestReceiveData);
if (dataModel == null || dataModel.@params == null)
{
dataModel = new SQLDataModel
{
time = tcDataModel.time,
@params = new SQLParamModel()
};
}
// 4. 仅更新PV字段
dataModel.@params.PV = newPvValue;
// 5. 重新序列化为JSON
string mergedData = JsonConvert.SerializeObject(dataModel);
// 6. 更新数据库调用DbHelper执行更新
string updateSql = @"UPDATE yida_mqtt_message
SET receive_data = @mergedData, pv_status = 1
WHERE id = @latestId";
var updateParameters = new[]
{
new MySql.Data.MySqlClient.MySqlParameter("@mergedData", mergedData),
new MySql.Data.MySqlClient.MySqlParameter("@latestId", latestId)
};
// 调用DbHelper执行更新并获取受影响行数
int affectedRows = await DbHelper.ExecuteNonQueryAsync(updateSql, updateParameters);
if (affectedRows > 0)
{
MessageReceived?.Invoke($"成功合并PV字段{newPvValue}更新ID={latestId}");
}
else
{
MessageReceived?.Invoke("更新失败:未找到匹配的注塑机数据");
}
}
catch (Exception ex)
{
MessageReceived?.Invoke($"合并并入库失败:{ex.Message} {ex.InnerException?.Message}");
}
}
}
}