575 lines
23 KiB
C#
575 lines
23 KiB
C#
using MQTT_WinformV1.Model;
|
||
using MySql.Data.MySqlClient;
|
||
using MySqlX.XDevAPI.Common;
|
||
using NPOI.OpenXmlFormats.Dml;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.Data;
|
||
using System.IO;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
using System.Windows.Forms;
|
||
|
||
namespace MQTT_WinformV1.Service
|
||
{
|
||
public class DataUploadService
|
||
{
|
||
//private readonly string sendconnectionString = "server=139.224.232.211;port=3308;database=ay2509055-guiyang-fluorescence-lmes;user=root;password=doantech123;";
|
||
private string sendconnectionString = string.Empty;
|
||
private readonly string receiveconnectionString = new MySqlConnectionStringBuilder
|
||
{
|
||
Server = Globalstatic.mesIP,
|
||
Port = uint.Parse(Globalstatic.mesPort), // 端口转成uint(匹配Builder的Port类型)
|
||
Database = Globalstatic.mesDBName,
|
||
UserID = Globalstatic.mesUser,
|
||
Password = Globalstatic.mesPwd,
|
||
CharacterSet = "utf8mb4"
|
||
}.ConnectionString;
|
||
//分批次处理数据批次数,保存数据
|
||
private const int InsertBatchSize = 500;
|
||
//分批次处理数据批次数,变更状态
|
||
private const int UpdateBatchSize = 500;
|
||
|
||
public async Task<int> QueryDataAsync(string strIP,string strPort,string strDBName,string strUser,string strPwd)
|
||
{
|
||
try
|
||
{
|
||
MySqlConnectionStringBuilder connStrBuilder = new MySqlConnectionStringBuilder();
|
||
connStrBuilder.Server = strIP;
|
||
uint.TryParse(strPort, out uint portNum);
|
||
connStrBuilder.Port = portNum;
|
||
connStrBuilder.UserID = strUser;
|
||
connStrBuilder.Password = strPwd;
|
||
connStrBuilder.Database = strDBName;
|
||
connStrBuilder.CharacterSet = "utf8mb4";
|
||
sendconnectionString = connStrBuilder.ConnectionString;
|
||
var (firstMachineData, finalMachineData, finalResultMachineData) = await GetTodayInspectionDataAsync(sendconnectionString);
|
||
int iResult = await InsertDataAsync(firstMachineData, finalMachineData, finalResultMachineData, receiveconnectionString);
|
||
if (iResult > 0)
|
||
{
|
||
//变更上传状态
|
||
await BatchUpdateTodayStatusAsync(sendconnectionString, firstMachineData, finalMachineData,finalResultMachineData);
|
||
}
|
||
return iResult;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
return 0;
|
||
}
|
||
}
|
||
#region 单批次处理数据(变更状态)
|
||
//public async Task<int> BatchUpdateTodayStatusAsync(string connectionString, DataTable firstMachineData, DataTable finalMachineData)
|
||
//{
|
||
// List<int> firstIds = new List<int>();
|
||
// foreach (DataRow dr in firstMachineData.Rows)
|
||
// {
|
||
// firstIds.Add(int.Parse(dr["Id"].ToString()));
|
||
// }
|
||
// List<int> finalIds = new List<int>();
|
||
// foreach (DataRow dr in finalMachineData.Rows)
|
||
// {
|
||
// finalIds.Add(int.Parse(dr["Id"].ToString()));
|
||
// }
|
||
// if (firstIds.Count == 0 && finalIds.Count == 0)
|
||
// {
|
||
// return 0;
|
||
// }
|
||
|
||
// // 存储所有参数
|
||
// List<MySqlParameter> allParams = new List<MySqlParameter>();
|
||
// // 构建第一个表的ID参数
|
||
// string firstIdParams = "";
|
||
// for (int i = 0; i < firstIds.Count; i++)
|
||
// {
|
||
// string paramName = $"@FirstId{i}";
|
||
// firstIdParams += (firstIdParams == "" ? "" : ",") + paramName;
|
||
// allParams.Add(new MySqlParameter(paramName, firstIds[i]));
|
||
// }
|
||
// // 构建第二个表的ID参数
|
||
// string finalIdParams = "";
|
||
// for (int i = 0; i < finalIds.Count; i++)
|
||
// {
|
||
// string paramName = $"@FinalId{i}";
|
||
// finalIdParams += (finalIdParams == "" ? "" : ",") + paramName;
|
||
// allParams.Add(new MySqlParameter(paramName, finalIds[i]));
|
||
// }
|
||
|
||
// // 拼接更新SQL(仅更新有ID的表)
|
||
// StringBuilder updateSql = new StringBuilder();
|
||
// if (firstIds.Count > 0)
|
||
// {
|
||
// updateSql.Append($@"
|
||
// UPDATE qc_firstarticle_inspection
|
||
// SET isload = 1,
|
||
// updated_time = NOW()
|
||
// WHERE isload = 0
|
||
// AND Id IN ({firstIdParams});");
|
||
// }
|
||
// if (finalIds.Count > 0)
|
||
// {
|
||
// updateSql.Append($@"
|
||
// UPDATE qc_final_inspection
|
||
// SET isload = 1,
|
||
// updated_time = NOW()
|
||
// WHERE isload = 0
|
||
// AND Id IN ({finalIdParams});");
|
||
// }
|
||
|
||
// using (var conn = new MySqlConnection(connectionString))
|
||
// {
|
||
// await conn.OpenAsync();
|
||
|
||
// using (var cmd = new MySqlCommand(updateSql.ToString(), conn))
|
||
// {
|
||
// // 添加所有参数
|
||
// cmd.Parameters.AddRange(allParams.ToArray());
|
||
// // 执行多语句更新,返回总行数
|
||
// return await cmd.ExecuteNonQueryAsync();
|
||
// }
|
||
// }
|
||
//}
|
||
|
||
#endregion
|
||
public async Task<int> BatchUpdateTodayStatusAsync(string connectionString, DataTable firstMachineData, DataTable finalMachineData,DataTable finalResultMachineData)
|
||
{
|
||
// 新增:定义批次大小(MySQL IN子句建议不超过1000条,这里设500)
|
||
int totalUpdated = 0;
|
||
|
||
// 1. 处理第一张表的ID(分批次)
|
||
List<int> firstIds = new List<int>();
|
||
if (firstMachineData?.Rows.Count > 0)
|
||
{
|
||
foreach (DataRow dr in firstMachineData.Rows)
|
||
{
|
||
firstIds.Add(int.Parse(dr["Id"].ToString()));
|
||
}
|
||
|
||
// 新增:分批次更新第一张表
|
||
for (int i = 0; i < firstIds.Count; i += UpdateBatchSize)
|
||
{
|
||
int endIndex = Math.Min(i + UpdateBatchSize, firstIds.Count);
|
||
List<int> batchFirstIds = firstIds.GetRange(i, endIndex - i);
|
||
totalUpdated += await UpdateSingleTableBatchAsync(connectionString, "qc_firstarticle_inspection", batchFirstIds);
|
||
}
|
||
}
|
||
|
||
// 2. 处理第二张表的ID(分批次)
|
||
List<int> finalIds = new List<int>();
|
||
if (finalMachineData?.Rows.Count > 0)
|
||
{
|
||
foreach (DataRow dr in finalMachineData.Rows)
|
||
{
|
||
finalIds.Add(int.Parse(dr["Id"].ToString()));
|
||
}
|
||
|
||
// 新增:分批次更新第二张表
|
||
for (int i = 0; i < finalIds.Count; i += UpdateBatchSize)
|
||
{
|
||
int endIndex = Math.Min(i + UpdateBatchSize, finalIds.Count);
|
||
List<int> batchFinalIds = finalIds.GetRange(i, endIndex - i);
|
||
totalUpdated += await UpdateSingleTableBatchAsync(connectionString, "qc_final_inspection", batchFinalIds);
|
||
}
|
||
}
|
||
|
||
// 3. 处理第三张表的ID(分批次)
|
||
List<int> finalResultIds = new List<int>();
|
||
if (finalResultMachineData?.Rows.Count > 0)
|
||
{
|
||
foreach (DataRow dr in finalResultMachineData.Rows)
|
||
{
|
||
finalResultIds.Add(int.Parse(dr["Id"].ToString()));
|
||
}
|
||
|
||
// 新增:分批次更新第二张表
|
||
for (int i = 0; i < finalIds.Count; i += UpdateBatchSize)
|
||
{
|
||
int endIndex = Math.Min(i + UpdateBatchSize, finalIds.Count);
|
||
List<int> batchFinalIds = finalResultIds.GetRange(i, endIndex - i);
|
||
totalUpdated += await UpdateSingleTableBatchAsync(connectionString, "qc_final_inspection_result", batchFinalIds);
|
||
}
|
||
}
|
||
return totalUpdated;
|
||
}
|
||
|
||
private async Task<int> UpdateSingleTableBatchAsync(string connectionString, string tableName, List<int> idList)
|
||
{
|
||
if (idList.Count == 0) return 0;
|
||
|
||
// 原有逻辑:构建参数和SQL
|
||
List<MySqlParameter> batchParams = new List<MySqlParameter>();
|
||
string idParams = "";
|
||
for (int i = 0; i < idList.Count; i++)
|
||
{
|
||
string paramName = $"@Id{i}";
|
||
idParams += (idParams == "" ? "" : ",") + paramName;
|
||
batchParams.Add(new MySqlParameter(paramName, idList[i]));
|
||
}
|
||
string updateSql = $@"
|
||
UPDATE `{tableName}`
|
||
SET isload = 1,
|
||
updated_time = NOW()
|
||
WHERE isload = 0
|
||
AND Id IN ({idParams});";
|
||
using (var conn = new MySqlConnection(connectionString))
|
||
{
|
||
await conn.OpenAsync();
|
||
using (var cmd = new MySqlCommand(updateSql, conn))
|
||
{
|
||
cmd.Parameters.AddRange(batchParams.ToArray());
|
||
return await cmd.ExecuteNonQueryAsync();
|
||
}
|
||
}
|
||
}
|
||
//将首检终检数据插入数据库
|
||
#region 单批次处理数据
|
||
// private async Task<int> InsertDataAsync(
|
||
//DataTable firstMachineData,
|
||
//DataTable finalMachineData,
|
||
//string receiveconnectionString)
|
||
// {
|
||
// int totalInserted = 0;
|
||
|
||
// if (firstMachineData?.Rows.Count > 0)
|
||
// {
|
||
// totalInserted += await BatchInsertFirstMachineDataAsync(
|
||
// firstMachineData, receiveconnectionString);
|
||
// }
|
||
|
||
// if (finalMachineData?.Rows.Count > 0)
|
||
// {
|
||
// totalInserted += await BatchInsertFinalMachineDataAsync(
|
||
// finalMachineData, receiveconnectionString);
|
||
// }
|
||
|
||
// return totalInserted;
|
||
// }
|
||
#endregion
|
||
private async Task<int> InsertDataAsync(
|
||
DataTable firstMachineData,
|
||
DataTable finalMachineData,
|
||
DataTable finalResultMachineData,
|
||
string receiveconnectionString)
|
||
{
|
||
int totalInserted = 0;
|
||
|
||
// 处理第一张表:分批次插入
|
||
if (firstMachineData?.Rows.Count > 0)
|
||
{
|
||
int firstTotal = firstMachineData.Rows.Count;
|
||
for (int i = 0; i < firstTotal; i += InsertBatchSize)
|
||
{
|
||
DataTable batchDt = firstMachineData.Clone();
|
||
int end = Math.Min(i + InsertBatchSize, firstTotal);
|
||
for (int j = i; j < end; j++)
|
||
{
|
||
batchDt.ImportRow(firstMachineData.Rows[j]);
|
||
}
|
||
totalInserted += await BatchInsertFirstMachineDataAsync(batchDt, receiveconnectionString);
|
||
}
|
||
}
|
||
|
||
// 处理第二张表:分批次插入
|
||
if (finalMachineData?.Rows.Count > 0)
|
||
{
|
||
int finalTotal = finalMachineData.Rows.Count;
|
||
for (int i = 0; i < finalTotal; i += InsertBatchSize)
|
||
{
|
||
DataTable batchDt = finalMachineData.Clone();
|
||
int end = Math.Min(i + InsertBatchSize, finalTotal);
|
||
for (int j = i; j < end; j++)
|
||
{
|
||
batchDt.ImportRow(finalMachineData.Rows[j]);
|
||
}
|
||
totalInserted += await BatchInsertFinalMachineDataAsync(batchDt, receiveconnectionString);
|
||
}
|
||
}
|
||
|
||
// 处理第三张表:分批次插入
|
||
if (finalResultMachineData?.Rows.Count > 0)
|
||
{
|
||
int finalResultTotal = finalResultMachineData.Rows.Count;
|
||
for (int i = 0; i < finalResultTotal; i += InsertBatchSize)
|
||
{
|
||
DataTable batchDt = finalResultMachineData.Clone();
|
||
int end = Math.Min(i + InsertBatchSize, finalResultTotal);
|
||
for (int j = i; j < end; j++)
|
||
{
|
||
batchDt.ImportRow(finalResultMachineData.Rows[j]);
|
||
}
|
||
totalInserted += await BatchInsertFinalResultMachineDataAsync(batchDt, receiveconnectionString);
|
||
}
|
||
}
|
||
return totalInserted;
|
||
}
|
||
|
||
private async Task<int> BatchInsertFirstMachineDataAsync(
|
||
DataTable data,
|
||
string connectionString)
|
||
{
|
||
const string insertSql = @"
|
||
INSERT INTO qc_firstarticle_inspection
|
||
(
|
||
fk_workorder, factoryCode, lineCode, machineCode,
|
||
productCode, productname, SNnumber, paramter_name,
|
||
standard_paramter_value, real_paramter_value,
|
||
up_range_limit, low_range_limit, unit, checkResult,
|
||
paramTime, createdby, updatedby, created_time, updated_time
|
||
)
|
||
VALUES
|
||
(
|
||
@fk_workorder, @factoryCode, @lineCode, @machineCode,
|
||
@productCode, @productname, @SNnumber, @paramter_name,
|
||
@standard_paramter_value, @real_paramter_value,
|
||
@up_range_limit, @low_range_limit, @unit, @checkResult,
|
||
@paramTime, @createdby, @updatedby, @created_time, @updated_time
|
||
)";
|
||
|
||
return await BatchInsertAsync(connectionString, insertSql, data, MapFirstMachineParameters);
|
||
}
|
||
|
||
private async Task<int> BatchInsertFinalMachineDataAsync(
|
||
DataTable data,
|
||
string connectionString)
|
||
{
|
||
const string insertSql = @"
|
||
INSERT INTO qc_final_inspection
|
||
(
|
||
fk_workorder, factoryCode, lineCode, machineCode,
|
||
productCode, productname, SNnumber, paramter_name,
|
||
standard_paramter_value, real_paramter_value,
|
||
up_range_limit, low_range_limit, unit, checkResult,
|
||
paramTime, createdby, updatedby, created_time, updated_time
|
||
)
|
||
VALUES
|
||
(
|
||
@fk_workorder, @factoryCode, @lineCode, @machineCode,
|
||
@productCode, @productname, @SNnumber, @paramter_name,
|
||
@standard_paramter_value, @real_paramter_value,
|
||
@up_range_limit, @low_range_limit, @unit, @checkResult,
|
||
@paramTime, @createdby, @updatedby, @created_time, @updated_time
|
||
)";
|
||
|
||
return await BatchInsertAsync(connectionString, insertSql, data, MapFirstMachineParameters);
|
||
}
|
||
|
||
private async Task<int> BatchInsertFinalResultMachineDataAsync(
|
||
DataTable data,
|
||
string connectionString)
|
||
{
|
||
const string insertSql = @"
|
||
INSERT INTO qc_final_inspection_result
|
||
(
|
||
workorder, direction, qualified_quantity, unqualified_quantiy,
|
||
created_by, updated_by, created_time, updated_time
|
||
)
|
||
VALUES
|
||
(
|
||
@workorder, @direction, @qualified_quantity, @unqualified_quantiy,
|
||
@created_by, @updated_by, @created_time, @updated_time
|
||
)";
|
||
|
||
return await BatchInsertAsync(connectionString, insertSql, data, MapFinalResultMachineParameters);
|
||
}
|
||
|
||
private async Task<int> BatchInsertAsync(
|
||
string connectionString,
|
||
string sql,
|
||
DataTable data,
|
||
Action<MySqlCommand, DataRow> parameterMapper)
|
||
{
|
||
int insertedCount = 0;
|
||
|
||
using (var conn = new MySqlConnection(connectionString))
|
||
{
|
||
await conn.OpenAsync();
|
||
|
||
using (var transaction = await conn.BeginTransactionAsync())
|
||
{
|
||
try
|
||
{
|
||
foreach (DataRow row in data.Rows)
|
||
{
|
||
using (var cmd = new MySqlCommand(sql, conn, transaction))
|
||
{
|
||
parameterMapper(cmd, row);
|
||
|
||
insertedCount += await cmd.ExecuteNonQueryAsync();
|
||
}
|
||
}
|
||
|
||
await transaction.CommitAsync();
|
||
return insertedCount;
|
||
}
|
||
catch(Exception ex)
|
||
{
|
||
Console.WriteLine(ex.Message);
|
||
await transaction.RollbackAsync();
|
||
throw;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
private void MapFirstMachineParameters(MySqlCommand cmd, DataRow row)
|
||
{
|
||
cmd.Parameters.AddWithValue("@fk_workorder", row["fk_workorder"]);
|
||
cmd.Parameters.AddWithValue("@factoryCode", row["factoryCode"]);
|
||
cmd.Parameters.AddWithValue("@lineCode", row["lineCode"]);
|
||
cmd.Parameters.AddWithValue("@machineCode", row["machineCode"]);
|
||
cmd.Parameters.AddWithValue("@productCode", row["productCode"]);
|
||
cmd.Parameters.AddWithValue("@productname", row["productname"]);
|
||
cmd.Parameters.AddWithValue("@SNnumber", row["SNnumber"]);
|
||
cmd.Parameters.AddWithValue("@paramter_name", row["paramter_name"]);
|
||
cmd.Parameters.AddWithValue("@standard_paramter_value", ConvertToDecimal(row["standard_paramter_value"]));
|
||
cmd.Parameters.AddWithValue("@real_paramter_value", ConvertToDecimal(row["real_paramter_value"]));
|
||
cmd.Parameters.AddWithValue("@up_range_limit", ConvertToDecimal(row["up_range_limit"]));
|
||
cmd.Parameters.AddWithValue("@low_range_limit", ConvertToDecimal(row["low_range_limit"]));
|
||
cmd.Parameters.AddWithValue("@unit", row["unit"]);
|
||
cmd.Parameters.AddWithValue("@checkResult", ConvertToBoolean(row["checkResult"]));
|
||
cmd.Parameters.AddWithValue("@paramTime", ConvertToDateTime(row["paramTime"]));
|
||
cmd.Parameters.AddWithValue("@createdby", row["createdby"]);
|
||
cmd.Parameters.AddWithValue("@updatedby", row["updatedby"]);
|
||
cmd.Parameters.AddWithValue("@created_time", DateTime.Now);
|
||
cmd.Parameters.AddWithValue("@updated_time", DateTime.Now);
|
||
}
|
||
|
||
private void MapFinalResultMachineParameters(MySqlCommand cmd, DataRow row)
|
||
{
|
||
cmd.Parameters.AddWithValue("@workorder", row["workorder"]);
|
||
cmd.Parameters.AddWithValue("@direction", row["direction"]);
|
||
cmd.Parameters.AddWithValue("@qualified_quantity", Convert.ToInt32(row["qualified_quantity"]));
|
||
cmd.Parameters.AddWithValue("@unqualified_quantiy", Convert.ToInt32(row["unqualified_quantiy"]));
|
||
cmd.Parameters.AddWithValue("@created_by", row["created_by"]);
|
||
cmd.Parameters.AddWithValue("@updated_by", row["updated_by"]);
|
||
cmd.Parameters.AddWithValue("@created_time", DateTime.Now);
|
||
cmd.Parameters.AddWithValue("@updated_time", DateTime.Now);
|
||
}
|
||
|
||
private decimal ConvertToDecimal(object value)
|
||
{
|
||
if (value == null || value == DBNull.Value)
|
||
return 0;
|
||
|
||
return Convert.ToDecimal(value);
|
||
}
|
||
|
||
private DateTime ConvertToDateTime(object value)
|
||
{
|
||
if (value == null || value == DBNull.Value)
|
||
return DateTime.Now;
|
||
|
||
return Convert.ToDateTime(value);
|
||
}
|
||
|
||
private bool ConvertToBoolean(object value)
|
||
{
|
||
if (value == null || value == DBNull.Value)
|
||
return false;
|
||
|
||
return Convert.ToBoolean(value);
|
||
}
|
||
|
||
|
||
// 提取公共方法
|
||
public async Task<(DataTable FirstMachine, DataTable FinalMachine,DataTable FinalResultMachine)>
|
||
GetTodayInspectionDataAsync(string connectionString)
|
||
{
|
||
// 获取今天的日期范围
|
||
var (startTime, endTime) = GetTodayDateRange();
|
||
|
||
// 并行执行查询以提高性能
|
||
var tasks = new[]
|
||
{
|
||
QueryInspectionDataAsync(connectionString, "qc_firstarticle_inspection", startTime, endTime),
|
||
QueryInspectionDataAsync(connectionString, "qc_final_inspection", startTime, endTime),
|
||
QueryInspectionDataAsync(connectionString, "qc_final_inspection_result", startTime, endTime)
|
||
};
|
||
|
||
var results = await Task.WhenAll(tasks);
|
||
|
||
return (results[0], results[1], results[2]);
|
||
}
|
||
|
||
// 参数化查询方法
|
||
private async Task<DataTable> QueryInspectionDataAsync(
|
||
string connectionString,
|
||
string tableName,
|
||
DateTime startTime,
|
||
DateTime endTime)
|
||
{
|
||
// 参数化查询,防止SQL注入
|
||
string sql = $@"
|
||
SELECT *
|
||
FROM {tableName} f
|
||
WHERE f.created_time >= @startTime
|
||
AND f.created_time <= @endTime
|
||
AND f.isload = 0 LIMIT 3000;";
|
||
|
||
return await QueryAsync(connectionString, sql,
|
||
new MySqlParameter("@startTime", startTime),
|
||
new MySqlParameter("@endTime", endTime));
|
||
}
|
||
|
||
// 重构的 QueryAsync 方法,支持参数化查询
|
||
public async Task<DataTable> QueryAsync(string connectionString, string sql, params MySqlParameter[] parameters)
|
||
{
|
||
DataTable dt = new DataTable();
|
||
|
||
using (var conn = new MySqlConnection(connectionString))
|
||
{
|
||
await conn.OpenAsync();
|
||
|
||
using (var cmd = new MySqlCommand(sql, conn))
|
||
{
|
||
cmd.CommandTimeout = 60;
|
||
|
||
// 添加参数
|
||
if (parameters != null && parameters.Length > 0)
|
||
{
|
||
cmd.Parameters.AddRange(parameters);
|
||
}
|
||
|
||
using (var reader = await cmd.ExecuteReaderAsync())
|
||
{
|
||
dt.Load(reader);
|
||
}
|
||
}
|
||
}
|
||
|
||
return dt;
|
||
}
|
||
|
||
// 获取今天日期范围的辅助方法
|
||
private (DateTime StartTime, DateTime EndTime) GetTodayDateRange()
|
||
{
|
||
DateTime today = DateTime.Today;
|
||
// 使用小于下一天的方式,避免精度问题
|
||
DateTime startTime = today;
|
||
DateTime endTime = today.AddDays(1).AddSeconds(-1);
|
||
|
||
return (startTime, endTime);
|
||
}
|
||
|
||
// 更简洁的版本
|
||
private (DateTime StartTime, DateTime EndTime) GetTodayDateRangeSimple()
|
||
{
|
||
var today = DateTime.Today;
|
||
return (today, today.AddDays(1).AddTicks(-1)); // 精确到23:59:59.9999999
|
||
}
|
||
|
||
public async Task<DataTable> getSendDB()
|
||
{
|
||
try
|
||
{
|
||
return await QueryAsync(receiveconnectionString, "select * from qc_db_config");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine(ex.Message);
|
||
return null;
|
||
}
|
||
}
|
||
}
|
||
}
|