2026-01-21 19:31:27 +08:00
|
|
|
|
using MQTTnet;
|
|
|
|
|
|
using MQTTnet.Client;
|
|
|
|
|
|
using MQTTnet.Protocol;
|
2026-01-22 16:59:41 +08:00
|
|
|
|
using Newtonsoft.Json;
|
2026-01-21 19:31:27 +08:00
|
|
|
|
using Newtonsoft.Json.Linq;
|
|
|
|
|
|
using System;
|
|
|
|
|
|
using System.Collections.Generic;
|
2026-01-29 20:29:12 +08:00
|
|
|
|
using System.Data;
|
2026-01-21 19:31:27 +08:00
|
|
|
|
using System.Linq;
|
|
|
|
|
|
using System.Text;
|
|
|
|
|
|
using System.Threading;
|
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
using YiDa_WinForm.Config;
|
2026-01-22 16:59:41 +08:00
|
|
|
|
using YiDa_WinForm.Model;
|
2026-01-21 19:31:27 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
namespace YiDa_WinForm
|
2026-01-21 19:31:27 +08:00
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
2026-01-29 20:29:12 +08:00
|
|
|
|
/// MQTT连接Service(优化版:使用DbHelper封装数据库操作)
|
2026-01-21 19:31:27 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
public class MqttClientService
|
|
|
|
|
|
{
|
|
|
|
|
|
// MQTT连接配置
|
|
|
|
|
|
private readonly IMqttClient _mqttClient;
|
|
|
|
|
|
private readonly MqttClientOptions _options;
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-21 19:31:27 +08:00
|
|
|
|
// 通知UI层
|
2026-01-22 16:59:41 +08:00
|
|
|
|
public event Action<string> MessageReceived;
|
2026-01-21 19:31:27 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 构造器
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public MqttClientService()
|
|
|
|
|
|
{
|
|
|
|
|
|
var factory = new MqttFactory();
|
|
|
|
|
|
_mqttClient = factory.CreateMqttClient();
|
|
|
|
|
|
|
|
|
|
|
|
// 连接成功事件
|
|
|
|
|
|
_mqttClient.ConnectedAsync += async e =>
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke("已连接MQTT服务器!");
|
|
|
|
|
|
// 定义要订阅的多个 Topic 和对应的 QoS
|
2026-01-22 16:59:41 +08:00
|
|
|
|
var topicFilters = new List<MQTTnet.Packets.MqttTopicFilter>
|
2026-01-21 19:31:27 +08:00
|
|
|
|
{
|
2026-01-22 16:59:41 +08:00
|
|
|
|
new MQTTnet.Packets.MqttTopicFilter
|
|
|
|
|
|
{ Topic = "fg_yida/#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }
|
2026-01-21 19:31:27 +08:00
|
|
|
|
};
|
|
|
|
|
|
// 构建批量订阅选项
|
|
|
|
|
|
var subscribeOptions = new MqttClientSubscribeOptions
|
|
|
|
|
|
{
|
|
|
|
|
|
TopicFilters = topicFilters
|
|
|
|
|
|
};
|
|
|
|
|
|
// 执行订阅并仅校验
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
|
|
|
|
|
|
MessageReceived?.Invoke("已订阅主题:fg_yida");
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke($"订阅失败:{ex.Message}");
|
|
|
|
|
|
}
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 断开事件
|
|
|
|
|
|
_mqttClient.DisconnectedAsync += e =>
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke("已断开连接!");
|
|
|
|
|
|
return Task.CompletedTask;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 收到消息事件
|
|
|
|
|
|
_mqttClient.ApplicationMessageReceivedAsync += async e =>
|
|
|
|
|
|
{
|
|
|
|
|
|
string topic = e.ApplicationMessage.Topic;
|
|
|
|
|
|
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray());
|
|
|
|
|
|
|
|
|
|
|
|
MessageReceived?.Invoke($" 收到消息:{topic} → {payload}");
|
|
|
|
|
|
|
|
|
|
|
|
await SaveMessageToDatabaseAsync(topic, payload);
|
|
|
|
|
|
};
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-21 19:31:27 +08:00
|
|
|
|
// 构建连接参数
|
|
|
|
|
|
_options = new MqttClientOptionsBuilder()
|
|
|
|
|
|
.WithClientId("fg_yida_server_07")
|
|
|
|
|
|
.WithTcpServer("192.168.1.103", 1883)
|
|
|
|
|
|
.WithCredentials("admin", "admin123")
|
|
|
|
|
|
.WithCleanSession()
|
|
|
|
|
|
.Build();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 开启连接
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public async Task MqttClientStartAsync()
|
|
|
|
|
|
{
|
|
|
|
|
|
await _mqttClient.ConnectAsync(_options);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 断开连接
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public async Task MqttClientStopAsync()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (_mqttClient.IsConnected)
|
|
|
|
|
|
{
|
|
|
|
|
|
await _mqttClient.DisconnectAsync();
|
|
|
|
|
|
MessageReceived?.Invoke("手动断开连接。");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-21 19:31:27 +08:00
|
|
|
|
/// <summary>
|
2026-01-29 20:29:12 +08:00
|
|
|
|
/// 保存MQTT数据到数据库(优化版:调用DbHelper)
|
2026-01-21 19:31:27 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
private async Task SaveMessageToDatabaseAsync(string topic, string payload)
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
2026-01-22 16:59:41 +08:00
|
|
|
|
// 解析Topic获取设备编码
|
2026-01-21 19:31:27 +08:00
|
|
|
|
string[] topics = topic.Split('/');
|
|
|
|
|
|
string deviceCode = "unknown";
|
|
|
|
|
|
if (topics.Length >= 3)
|
|
|
|
|
|
{
|
|
|
|
|
|
string deviceName = topics[2];
|
|
|
|
|
|
switch (deviceName)
|
|
|
|
|
|
{
|
2026-01-22 16:59:41 +08:00
|
|
|
|
case "fg_ht_zs_01": deviceCode = "device1"; break;
|
|
|
|
|
|
case "fg_ht_zs_02": deviceCode = "device2"; break;
|
|
|
|
|
|
case "fg_ht_wk_01": deviceCode = "device3"; break;
|
|
|
|
|
|
case "fg_ht_wk_02": deviceCode = "device4"; break;
|
|
|
|
|
|
default: deviceCode = deviceName; break;
|
2026-01-21 19:31:27 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
|
|
|
|
|
// 温控器设备(device3/4):合并PV到注塑机数据
|
|
|
|
|
|
if (deviceCode == "device3" || deviceCode == "device4")
|
|
|
|
|
|
{
|
2026-01-29 20:29:12 +08:00
|
|
|
|
await MergeAndSaveData(payload);
|
2026-01-22 16:59:41 +08:00
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 注塑机设备(device1/2):直接插入原始数据(调用DbHelper执行插入)
|
2026-01-21 19:31:27 +08:00
|
|
|
|
JObject json = JObject.Parse(payload);
|
|
|
|
|
|
long timestamp = json["time"]?.Value<long>() ?? 0;
|
2026-01-22 16:59:41 +08:00
|
|
|
|
DateTime timeReceive = timestamp == 0 ? DateTime.Now : DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime;
|
2026-01-21 19:31:27 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 构建SQL语句和参数
|
|
|
|
|
|
string sql = @"INSERT INTO yida_mqtt_message (device_code, receive_data, receive_time, create_time)
|
2026-01-21 19:31:27 +08:00
|
|
|
|
VALUES (@device_code, @receive_data, @receive_time, @create_time)";
|
2026-01-29 20:29:12 +08:00
|
|
|
|
var parameters = new[]
|
|
|
|
|
|
{
|
|
|
|
|
|
new MySql.Data.MySqlClient.MySqlParameter("@device_code", deviceCode),
|
|
|
|
|
|
new MySql.Data.MySqlClient.MySqlParameter("@receive_data", payload),
|
|
|
|
|
|
new MySql.Data.MySqlClient.MySqlParameter("@receive_time", timeReceive),
|
|
|
|
|
|
new MySql.Data.MySqlClient.MySqlParameter("@create_time", DateTime.Now)
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 调用DbHelper公共方法执行插入
|
|
|
|
|
|
await DbHelper.ExecuteNonQueryAsync(sql, parameters);
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-21 19:31:27 +08:00
|
|
|
|
MessageReceived?.Invoke("数据已保存到数据库。");
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2026-01-29 20:29:12 +08:00
|
|
|
|
/// 合并温控器PV到注塑机数据
|
2026-01-22 16:59:41 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
private async Task MergeAndSaveData(string payload)
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
// 1. 解析温控器MQTT消息(提取PV)
|
|
|
|
|
|
var tcDataModel = JsonConvert.DeserializeObject<TcDataModel>(payload);
|
|
|
|
|
|
if (tcDataModel == null || tcDataModel.@params == null || string.IsNullOrEmpty(tcDataModel.@params.PV))
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke("MQTT消息格式错误,无有效PV字段");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
string newPvValue = tcDataModel.@params.PV;
|
|
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 2. 读取最新注塑机数据的receive_data(JSON字符串)(调用DbHelper查询)
|
|
|
|
|
|
string getLatestSql = @"SELECT id, receive_data FROM yida_mqtt_message
|
|
|
|
|
|
WHERE device_code IN ('device1','device2')
|
|
|
|
|
|
ORDER BY id DESC LIMIT 1";
|
|
|
|
|
|
DataTable dtLatest = await DbHelper.ExecuteQueryAsync(getLatestSql);
|
|
|
|
|
|
|
|
|
|
|
|
// 无注塑机数据时直接返回
|
|
|
|
|
|
if (dtLatest.Rows.Count == 0)
|
2026-01-22 16:59:41 +08:00
|
|
|
|
{
|
2026-01-29 20:29:12 +08:00
|
|
|
|
MessageReceived?.Invoke("数据库中无注塑机数据,无法合并");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 提取查询结果中的主键ID和JSON数据
|
|
|
|
|
|
long latestId = Convert.ToInt64(dtLatest.Rows[0]["id"]);
|
|
|
|
|
|
string latestReceiveData = dtLatest.Rows[0]["receive_data"] == DBNull.Value ? null : dtLatest.Rows[0]["receive_data"].ToString();
|
|
|
|
|
|
if (string.IsNullOrEmpty(latestReceiveData))
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke("数据库中注塑机数据无有效receive_data");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 3. 反序列化注塑机JSON数据
|
|
|
|
|
|
var dataModel = JsonConvert.DeserializeObject<SQLDataModel>(latestReceiveData);
|
|
|
|
|
|
if (dataModel == null || dataModel.@params == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
dataModel = new SQLDataModel
|
2026-01-22 16:59:41 +08:00
|
|
|
|
{
|
2026-01-29 20:29:12 +08:00
|
|
|
|
time = tcDataModel.time,
|
|
|
|
|
|
@params = new SQLParamModel()
|
|
|
|
|
|
};
|
|
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 4. 仅更新PV字段
|
|
|
|
|
|
dataModel.@params.PV = newPvValue;
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 5. 重新序列化为JSON
|
|
|
|
|
|
string mergedData = JsonConvert.SerializeObject(dataModel);
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 6. 更新数据库(调用DbHelper执行更新)
|
|
|
|
|
|
string updateSql = @"UPDATE yida_mqtt_message
|
|
|
|
|
|
SET receive_data = @mergedData
|
|
|
|
|
|
WHERE id = @latestId";
|
|
|
|
|
|
var updateParameters = new[]
|
|
|
|
|
|
{
|
|
|
|
|
|
new MySql.Data.MySqlClient.MySqlParameter("@mergedData", mergedData),
|
|
|
|
|
|
new MySql.Data.MySqlClient.MySqlParameter("@latestId", latestId)
|
|
|
|
|
|
};
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
// 调用DbHelper执行更新并获取受影响行数
|
|
|
|
|
|
int affectedRows = await DbHelper.ExecuteNonQueryAsync(updateSql, updateParameters);
|
2026-01-22 16:59:41 +08:00
|
|
|
|
|
2026-01-29 20:29:12 +08:00
|
|
|
|
if (affectedRows > 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke($"成功合并PV字段:{newPvValue},更新ID={latestId}");
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke("更新失败:未找到匹配的注塑机数据");
|
2026-01-22 16:59:41 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
MessageReceived?.Invoke($"合并并入库失败:{ex.Message} {ex.InnerException?.Message}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-01-21 19:31:27 +08:00
|
|
|
|
}
|
2026-01-22 16:59:41 +08:00
|
|
|
|
}
|