shgx_zp_DevcieDataMove/MQTT-WinformV1/MqttClientService.cs
2025-12-09 09:28:39 +08:00

270 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MQTT_WinformV1.Model;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MySql.Data.MySqlClient;
using Newtonsoft.Json.Linq;
using NPOI.XWPF.UserModel;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlTypes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MqttClient
{
public class MqttClientService
{
private IMqttClient _mqttClient;
private MqttClientOptions _options;
private readonly string _connectionString = "server=139.224.232.211;port=3308;database=fgassembly;user=root;password=doantech123;";
public event Action<string> MessageReceived; // 通知UI层
public MqttClientService()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 连接成功事件
_mqttClient.ConnectedAsync += async e =>
{
MessageReceived?.Invoke("已连接MQTT服务器");
await _mqttClient.SubscribeAsync("fg_yida/#", MqttQualityOfServiceLevel.AtLeastOnce);
MessageReceived?.Invoke("已订阅主题fg_yida/#");
};
// 断开事件
_mqttClient.DisconnectedAsync += e =>
{
MessageReceived?.Invoke("已断开连接!");
return Task.CompletedTask;
};
// 收到消息事件
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray());
DateTime timeSave = DateTime.Now;
MessageReceived?.Invoke($" 收到消息:{topic} → {payload}");
await SaveMessageToDatabaseAsync(topic, payload, timeSave);
};
// 构建连接参数
_options = new MqttClientOptionsBuilder()
.WithClientId("FG-Winform-Client")
.WithTcpServer("192.168.1.4", 1883)
.WithCredentials("admin", "admin")
.WithCleanSession()
.Build();
}
//开启连接
public async Task MqttClientStartAsync()
{
await _mqttClient.ConnectAsync(_options);
}
//断开连接
public async Task MqttClientStopAsync()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
MessageReceived?.Invoke("手动断开连接。");
}
}
//保存数据到数据库
private async Task SaveMessageToDatabaseAsync(string topic, string payload, DateTime timeSave)
{
try
{
// 解析 topic 获取 device_code
// 例如 topic = fg_yida/device/2 → device2
string[] parts = topic.Split('/');
string deviceCode = parts.Length >= 3 ? parts[1] + parts[2] : "unknown";
// 解析 payload 获取时间戳
JObject json = JObject.Parse(payload);
long timestamp = json["time"]?.Value<long>() ?? 0;
DateTime timeReceive = DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime;
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
string sql = @"INSERT INTO fg_init_message (device_code, message, time_receive, time_save)
VALUES (@device_code, @message, @time_receive, @time_save)";
using (var cmd = new MySqlCommand(sql, conn))
{
cmd.Parameters.AddWithValue("@device_code", deviceCode);
cmd.Parameters.AddWithValue("@message", payload);
cmd.Parameters.AddWithValue("@time_receive", timeReceive);
cmd.Parameters.AddWithValue("@time_save", timeSave);
await cmd.ExecuteNonQueryAsync();
}
}
MessageReceived?.Invoke("数据已保存到数据库。");
}
catch (Exception ex)
{
MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}");
}
}
//保存配方表格数据到数据库中
public async Task SavePeiFangByExcel(DataTable dtExcel)
{
try
{
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
string sqlDel = @"DELETE FROM peifang_import;";
using (var delCmd = new MySqlCommand(sqlDel, conn))
{
// 异步执行删除await 等待完成后再继续
await delCmd.ExecuteNonQueryAsync();
}
foreach (DataRow dr in dtExcel.Rows)
{
string sql = string.Format(@"INSERT INTO peifang_import (supplier_code, supplier_name, vehicle_model,part_number,part_name,configuration,leader_part)
VALUES (@supplier_code, @supplier_name, @vehicle_model,@part_number,@part_name,@configuration,@leader_part)");
using (var cmd = new MySqlCommand(sql, conn))
{
cmd.Parameters.AddWithValue("@supplier_code", dr["供应商代码"] == DBNull.Value ? (object)DBNull.Value : dr["供应商代码"].ToString());
cmd.Parameters.AddWithValue("@supplier_name", dr["供应商名称"] == DBNull.Value ? (object)DBNull.Value : dr["供应商名称"].ToString());
cmd.Parameters.AddWithValue("@vehicle_model", dr["车型"] == DBNull.Value ? (object)DBNull.Value : dr["车型"].ToString());
cmd.Parameters.AddWithValue("@part_number", dr["零件号"] == DBNull.Value ? (object)DBNull.Value : dr["零件号"].ToString());
cmd.Parameters.AddWithValue("@part_name", dr["零件名"] == DBNull.Value ? (object)DBNull.Value : dr["零件名"].ToString());
cmd.Parameters.AddWithValue("@configuration", dr["配置"] == DBNull.Value ? (object)DBNull.Value : dr["配置"].ToString());
cmd.Parameters.AddWithValue("@leader_part", dr["零件负责人"] == DBNull.Value ? (object)DBNull.Value : dr["零件负责人"].ToString());
await cmd.ExecuteNonQueryAsync();
}
}
}
}
catch (Exception ex)
{
MessageReceived?.Invoke($"保存数据库时出错:{ex.Message} {ex.InnerException?.Message}");
}
}
public async Task<DataTable> QueryPeifangAsync()
{
// 初始化DataTable用于存储结果
DataTable dt = new DataTable();
using (var conn = new MySqlConnection(_connectionString))
{
// 异步打开连接必须用await等待完成
await conn.OpenAsync();
string strSql = @"SELECT * FROM peifang_import";
using (var queryCmd = new MySqlCommand(strSql, conn))
{
// 设置命令超时时间(可选,根据需要调整)
queryCmd.CommandTimeout = 60;
// 异步执行查询,获取数据读取器
using (var reader = await queryCmd.ExecuteReaderAsync())
{
// 通过读取器填充DataTable
dt.Load(reader);
}
}
}
return dt;
}
//获取最新的Mqtt网关数据
public async Task<DataTable> GetLatestMqttDataAsync()
{
DataTable dt = new DataTable();
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
string strSql = @"SELECT * FROM fg_init_message ORDER BY time_save DESC LIMIT 1;";
using (var queryCmd = new MySqlCommand(strSql, conn))
{
// 设置命令超时时间(可选,根据需要调整)
queryCmd.CommandTimeout = 120;
using (var reader = await queryCmd.ExecuteReaderAsync())
{
dt.Load(reader);
}
}
}
return dt;
}
//获取Mqtt字典
public async Task<DataTable> InitMqttDic()
{
DataTable dt = new DataTable();
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
string strSql = @"SELECT * from trace_dict_eol_parameter";
using (var queryCmd = new MySqlCommand(strSql, conn))
{
// 设置命令超时时间(可选,根据需要调整)
queryCmd.CommandTimeout = 120;
using (var reader = await queryCmd.ExecuteReaderAsync())
{
dt.Load(reader);
}
}
}
return dt;
}
//保存配方表格数据到数据库中
public async Task CreateSuccessLog(List<MQTTModel> mqttLists)
{
try
{
using (var conn = new MySqlConnection(_connectionString))
{
await conn.OpenAsync();
DateTime time = DateTime.Now;
foreach (MQTTModel mqttItem in mqttLists)
{
string sql = string.Format(@"INSERT INTO yida_success_log (supplier_code, supplier_name, vehicle_model,part_number,part_name,configuration,leader_part,time_upload_database)
VALUES (@supplier_code, @supplier_name, @vehicle_model,@part_number,@part_name,@configuration,@leader_part,@time_upload_database)");
using (var cmd = new MySqlCommand(sql, conn))
{
cmd.Parameters.AddWithValue("@supplier_code", mqttItem.SupplierCode);
cmd.Parameters.AddWithValue("@supplier_name", mqttItem.SupplierName);
cmd.Parameters.AddWithValue("@vehicle_model", mqttItem.VehicleModel);
cmd.Parameters.AddWithValue("@part_number", mqttItem.PartNumber);
cmd.Parameters.AddWithValue("@part_name", mqttItem.PartName);
cmd.Parameters.AddWithValue("@configuration", mqttItem.Configuration);
cmd.Parameters.AddWithValue("@leader_part", mqttItem.LeaderPart);
cmd.Parameters.AddWithValue("@time_upload_database", time);
await cmd.ExecuteNonQueryAsync();
}
}
}
}
catch (Exception ex)
{
}
}
}
}