using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Data; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using YiDa_WinForm.Config; using YiDa_WinForm.Model; namespace YiDa_WinForm { /// /// MQTT连接Service(优化版:使用DbHelper封装数据库操作) /// public class MqttClientService { // MQTT连接配置 private readonly IMqttClient _mqttClient; private readonly MqttClientOptions _options; string getLatestSql; private DataTable dtLatest; // 通知UI层 public event Action MessageReceived; /// /// 构造器 /// public MqttClientService() { var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); // 连接成功事件 _mqttClient.ConnectedAsync += async e => { MessageReceived?.Invoke("已连接MQTT服务器!"); // 定义要订阅的多个 Topic 和对应的 QoS var topicFilters = new List { new MQTTnet.Packets.MqttTopicFilter { Topic = "fg_yida/#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce } }; // 构建批量订阅选项 var subscribeOptions = new MqttClientSubscribeOptions { TopicFilters = topicFilters }; // 执行订阅并仅校验 try { await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None); MessageReceived?.Invoke("已订阅主题:fg_yida"); } catch (Exception ex) { MessageReceived?.Invoke($"订阅失败:{ex.Message}"); } }; // 断开事件 _mqttClient.DisconnectedAsync += e => { MessageReceived?.Invoke("已断开连接!"); return Task.CompletedTask; }; // 收到消息事件 _mqttClient.ApplicationMessageReceivedAsync += async e => { string topic = e.ApplicationMessage.Topic; string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()); MessageReceived?.Invoke($" 收到消息:{topic} → {payload}"); await SaveMessageToDatabaseAsync(topic, payload); }; // 构建连接参数 _options = new MqttClientOptionsBuilder() .WithClientId("fg_yida_server_07") .WithTcpServer("172.16.101.94", 1883) .WithCredentials("admin", "admin") .WithCleanSession() .Build(); // _options = new MqttClientOptionsBuilder() // .WithClientId("fg_yida_server_07") // 本地地址 // .WithTcpServer("192.168.1.103", 1883) // .WithCredentials("admin", "admin") // .WithCleanSession() // .Build(); } /// /// 开启连接 /// public async Task MqttClientStartAsync() { await _mqttClient.ConnectAsync(_options); } /// /// 断开连接 /// public async Task MqttClientStopAsync() { if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); MessageReceived?.Invoke("手动断开连接。"); } } /// /// 保存MQTT数据到数据库(优化版:调用DbHelper) /// private async Task SaveMessageToDatabaseAsync(string topic, string payload) { try { // 解析Topic获取设备编码 string[] topics = topic.Split('/'); string deviceCode = "unknown"; if (topics.Length >= 3) { string deviceName = topics[2]; switch (deviceName) { case "fg_ht_zs_01": deviceCode = "device1"; break; case "fg_ht_zs_02": deviceCode = "device2"; break; case "fg_ht_wk_01": deviceCode = "device3"; break; case "fg_ht_wk_02": deviceCode = "device4"; break; default: deviceCode = deviceName; break; } } // 温控器设备(device3/4):合并PV到注塑机数据 if (deviceCode == "device3" || deviceCode == "device4") { await MergeAndSaveData(deviceCode, payload); return; } // 注塑机设备(device1/2):直接插入原始数据(调用DbHelper执行插入) JObject json = JObject.Parse(payload); long timestamp = json["time"]?.Value() ?? 0; DateTime timeReceive = timestamp == 0 ? DateTime.Now : DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime; // 构建SQL语句和参数 string sql = @"INSERT INTO yida_mqtt_message (device_code, receive_data, receive_time, create_time) VALUES (@device_code, @receive_data, @receive_time, @create_time)"; var parameters = new[] { new MySql.Data.MySqlClient.MySqlParameter("@device_code", deviceCode), new MySql.Data.MySqlClient.MySqlParameter("@receive_data", payload), new MySql.Data.MySqlClient.MySqlParameter("@receive_time", timeReceive), new MySql.Data.MySqlClient.MySqlParameter("@create_time", DateTime.Now) }; // 调用DbHelper公共方法执行插入 await DbHelper.ExecuteNonQueryAsync(sql, parameters); MessageReceived?.Invoke("数据已保存到数据库。"); } catch (Exception ex) { MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}"); } } /// /// 合并温控器PV到注塑机数据 /// private async Task MergeAndSaveData(string deviceCode, string payload) { try { // 1. 解析温控器MQTT消息(提取PV) var tcDataModel = JsonConvert.DeserializeObject(payload); if (tcDataModel == null || tcDataModel.@params == null || string.IsNullOrEmpty(tcDataModel.@params.PV)) { MessageReceived?.Invoke("MQTT消息格式错误,无有效PV字段"); return; } string newPvValue = tcDataModel.@params.PV; if (deviceCode == "device3") { getLatestSql = @"SELECT id, receive_data FROM yida_mqtt_message WHERE device_code = 'device1' ORDER BY id DESC LIMIT 1"; } else if(deviceCode == "device4") { getLatestSql = @"SELECT id, receive_data FROM yida_mqtt_message WHERE device_code = 'device2' ORDER BY id DESC LIMIT 1"; } else { MessageReceived?.Invoke("不是有效设备"); } if (getLatestSql != null) { dtLatest = await DbHelper.ExecuteQueryAsync(getLatestSql); } // 无注塑机数据时直接返回 if (dtLatest.Rows.Count == 0) { MessageReceived?.Invoke("数据库中无注塑机数据,无法合并"); return; } getLatestSql = null; // 提取查询结果中的主键ID和JSON数据 long latestId = Convert.ToInt64(dtLatest.Rows[0]["id"]); string latestReceiveData = dtLatest.Rows[0]["receive_data"] == DBNull.Value ? null : dtLatest.Rows[0]["receive_data"].ToString(); if (string.IsNullOrEmpty(latestReceiveData)) { MessageReceived?.Invoke("数据库中注塑机数据无有效receive_data"); return; } // 3. 反序列化注塑机JSON数据 var dataModel = JsonConvert.DeserializeObject(latestReceiveData); if (dataModel == null || dataModel.@params == null) { dataModel = new SQLDataModel { time = tcDataModel.time, @params = new SQLParamModel() }; } // 4. 仅更新PV字段 dataModel.@params.PV = newPvValue; // 5. 重新序列化为JSON string mergedData = JsonConvert.SerializeObject(dataModel); // 6. 更新数据库(调用DbHelper执行更新) string updateSql = @"UPDATE yida_mqtt_message SET receive_data = @mergedData, pv_status = 1 WHERE id = @latestId"; var updateParameters = new[] { new MySql.Data.MySqlClient.MySqlParameter("@mergedData", mergedData), new MySql.Data.MySqlClient.MySqlParameter("@latestId", latestId) }; // 调用DbHelper执行更新并获取受影响行数 int affectedRows = await DbHelper.ExecuteNonQueryAsync(updateSql, updateParameters); if (affectedRows > 0) { MessageReceived?.Invoke($"成功合并PV字段:{newPvValue},更新ID={latestId}"); } else { MessageReceived?.Invoke("更新失败:未找到匹配的注塑机数据"); } } catch (Exception ex) { MessageReceived?.Invoke($"合并并入库失败:{ex.Message} {ex.InnerException?.Message}"); } } } }