fg_yida_2/YiDa_WinForm/Service/MqttClientService.cs

273 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MySql.Data.MySqlClient;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using YiDa_WinForm.Config;
using YiDa_WinForm.Model;
namespace YiDa_WinForm.Service.Mqtt
{
/// <summary>
/// MQTT连接Service修复版
/// </summary>
public class MqttClientService
{
// MQTT连接配置
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
// 数据库连接配置
private readonly string _connectionString = AppConfig.MySqlConnectionString;
// 通知UI层
public event Action<string> MessageReceived;
/// <summary>
/// 构造器
/// </summary>
public MqttClientService()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 连接成功事件
_mqttClient.ConnectedAsync += async e =>
{
MessageReceived?.Invoke("已连接MQTT服务器");
// 定义要订阅的多个 Topic 和对应的 QoS
var topicFilters = new List<MQTTnet.Packets.MqttTopicFilter>
{
new MQTTnet.Packets.MqttTopicFilter
{ Topic = "fg_yida/#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }
};
// 构建批量订阅选项
var subscribeOptions = new MqttClientSubscribeOptions
{
TopicFilters = topicFilters
};
// 执行订阅并仅校验
try
{
await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
MessageReceived?.Invoke("已订阅主题fg_yida");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"订阅失败:{ex.Message}");
}
};
// 断开事件
_mqttClient.DisconnectedAsync += e =>
{
MessageReceived?.Invoke("已断开连接!");
return Task.CompletedTask;
};
// 收到消息事件
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray());
MessageReceived?.Invoke($" 收到消息:{topic} → {payload}");
await SaveMessageToDatabaseAsync(topic, payload);
};
// 构建连接参数
_options = new MqttClientOptionsBuilder()
.WithClientId("fg_yida_server_07")
.WithTcpServer("192.168.1.103", 1883)
.WithCredentials("admin", "admin123")
.WithCleanSession()
.Build();
}
/// <summary>
/// 开启连接
/// </summary>
public async Task MqttClientStartAsync()
{
await _mqttClient.ConnectAsync(_options);
}
/// <summary>
/// 断开连接
/// </summary>
public async Task MqttClientStopAsync()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
MessageReceived?.Invoke("手动断开连接。");
}
}
/// <summary>
/// 保存MQTT数据到数据库
/// </summary>
private async Task SaveMessageToDatabaseAsync(string topic, string payload)
{
try
{
// 解析Topic获取设备编码
string[] topics = topic.Split('/');
string deviceCode = "unknown";
if (topics.Length >= 3)
{
string deviceName = topics[2];
switch (deviceName)
{
case "fg_ht_zs_01": deviceCode = "device1"; break;
case "fg_ht_zs_02": deviceCode = "device2"; break;
case "fg_ht_wk_01": deviceCode = "device3"; break;
case "fg_ht_wk_02": deviceCode = "device4"; break;
default: deviceCode = deviceName; break;
}
}
// 温控器设备device3/4合并PV到注塑机数据
if (deviceCode == "device3" || deviceCode == "device4")
{
await MergeAndSaveData(payload); // 关键添加await确保异步执行
return;
}
// 注塑机设备device1/2直接插入原始数据
JObject json = JObject.Parse(payload);
long timestamp = json["time"]?.Value<long>() ?? 0;
DateTime timeReceive = timestamp == 0 ? DateTime.Now : DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime;
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
string sql = @"INSERT INTO yida_mqtt_message (device_code, receive_data, receive_time, create_time)
VALUES (@device_code, @receive_data, @receive_time, @create_time)";
using (var cmd = new MySqlCommand(sql, conn))
{
cmd.Parameters.AddWithValue("@device_code", deviceCode);
cmd.Parameters.AddWithValue("@receive_data", payload);
cmd.Parameters.AddWithValue("@receive_time", timeReceive);
cmd.Parameters.AddWithValue("@create_time", DateTime.Now);
await cmd.ExecuteNonQueryAsync();
}
}
MessageReceived?.Invoke("数据已保存到数据库。");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}");
}
}
/// <summary>
/// 合并温控器PV到注塑机数据核心修复
/// </summary>
private async Task MergeAndSaveData(string payload)
{
try
{
// 1. 解析温控器MQTT消息提取PV
var tcDataModel = JsonConvert.DeserializeObject<TcDataModel>(payload);
if (tcDataModel == null || tcDataModel.@params == null || string.IsNullOrEmpty(tcDataModel.@params.PV))
{
MessageReceived?.Invoke("MQTT消息格式错误无有效PV字段");
return;
}
string newPvValue = tcDataModel.@params.PV;
// 2. 读取最新注塑机数据的receive_dataJSON字符串
string latestReceiveData = null;
long latestId = 0; // 用主键ID定位最新数据避免time类型问题
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
// 查询最新注塑机数据device1/2
string getLatestSql = @"SELECT id, receive_data FROM yida_mqtt_message
WHERE device_code IN ('device1','device2')
ORDER BY id DESC LIMIT 1";
using (var cmd = new MySqlCommand(getLatestSql, conn))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
if (await reader.ReadAsync())
{
latestId = reader.GetInt64(0);
latestReceiveData = reader.IsDBNull(1) ? null : reader.GetString(1);
}
}
}
// 无注塑机数据时直接返回
if (string.IsNullOrEmpty(latestReceiveData))
{
MessageReceived?.Invoke("数据库中无注塑机数据,无法合并");
return;
}
// 3. 反序列化注塑机JSON数据
var dataModel = JsonConvert.DeserializeObject<SQLDataModel>(latestReceiveData);
if (dataModel == null || dataModel.@params == null)
{
dataModel = new SQLDataModel
{
time = tcDataModel.time,
@params = new SQLParamModel()
};
}
// 4. 仅更新PV字段
dataModel.@params.PV = newPvValue;
// 5. 重新序列化为JSON
string mergedData = JsonConvert.SerializeObject(dataModel);
// 6. 更新数据库用ID定位避免time类型问题
string updateSql = @"UPDATE yida_mqtt_message
SET receive_data = @mergedData
WHERE id = @latestId";
using (var updateCmd = new MySqlCommand(updateSql, conn))
{
updateCmd.Parameters.AddWithValue("@mergedData", mergedData);
updateCmd.Parameters.AddWithValue("@latestId", latestId);
int affectedRows = await updateCmd.ExecuteNonQueryAsync();
if (affectedRows > 0)
{
MessageReceived?.Invoke($"成功合并PV字段{newPvValue}更新ID={latestId}");
}
else
{
MessageReceived?.Invoke("更新失败:未找到匹配的注塑机数据");
}
}
}
}
catch (Exception ex)
{
MessageReceived?.Invoke($"合并并入库失败:{ex.Message} {ex.InnerException?.Message}");
}
}
/// <summary>
/// 安全读取字符串(通用版)
/// </summary>
private string GetSafeString(DbDataReader reader, string columnName)
{
int ordinal = reader.GetOrdinal(columnName);
return reader.IsDBNull(ordinal) ? string.Empty : reader.GetString(ordinal);
}
}
}