From 30fc6e963ecdad624f58160008e9604cfea38d51 Mon Sep 17 00:00:00 2001 From: "qianhao.xu" Date: Tue, 27 Aug 2024 17:29:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9C=E6=90=AD=20=20mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../appsettings.development.json | 10 +- DOAN.Common/MqttPublisher.cs | 6 +- DOAN.Model/ReflexAccount.cs | 9 + .../TaskScheduler/YIDA_dataUpload_Task.cs | 160 +++++++++++------- Infrastructure/Model/OptionsSetting.cs | 11 ++ 5 files changed, 125 insertions(+), 71 deletions(-) diff --git a/DOAN.Admin.WebApi/appsettings.development.json b/DOAN.Admin.WebApi/appsettings.development.json index c412ac6..f48676e 100644 --- a/DOAN.Admin.WebApi/appsettings.development.json +++ b/DOAN.Admin.WebApi/appsettings.development.json @@ -116,13 +116,15 @@ "vuePath": "" //前端代码存储路径eg:D:\Work\DOANAdmin-Vue3 }, //MQTT 配置 - "MqttPublisher": { + "MqttPublicConfig": { //代理ip - "broker_ip": "192.168.0.58", + "BrokerIp": "192.168.0.58", + //端口号 + "Port": 1883, //主题 - "topic": "yida/log", + "Topic": "yida/log", //客户端id - "clientId": "" + "ClientId": "server01" } } diff --git a/DOAN.Common/MqttPublisher.cs b/DOAN.Common/MqttPublisher.cs index 257a9d3..0780f08 100644 --- a/DOAN.Common/MqttPublisher.cs +++ b/DOAN.Common/MqttPublisher.cs @@ -2,6 +2,7 @@ using MQTTnet.Client; using MQTTnet.Protocol; using System; +using System.Net; using System.Threading.Tasks; namespace DOAN.Common @@ -13,12 +14,12 @@ namespace DOAN.Common private IMqttClient _client; private MqttClientOptions _options; - public MqttPublisher(string brokerAddress, string clientId) + public MqttPublisher(string brokerAddress, string clientId, int port) { _client = new MqttFactory().CreateMqttClient(); _options = new MqttClientOptionsBuilder() - .WithTcpServer(brokerAddress) + .WithTcpServer(brokerAddress,port) .WithClientId(clientId) .Build(); } @@ -33,6 +34,7 @@ namespace DOAN.Common var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) + .WithRetainFlag(true) .WithQualityOfServiceLevel(qualityOfService) .Build(); diff --git a/DOAN.Model/ReflexAccount.cs b/DOAN.Model/ReflexAccount.cs index fc9df73..d4eca70 100644 --- a/DOAN.Model/ReflexAccount.cs +++ b/DOAN.Model/ReflexAccount.cs @@ -20,6 +20,15 @@ namespace DOAN.Model [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } + + /// + /// 设备ID + /// + [SugarColumn(ColumnName = "fk_device_id")] + public int? FkDeviceId { get; set; } + + + /// /// FkTenantId /// diff --git a/DOAN.Tasks/TaskScheduler/YIDA_dataUpload_Task.cs b/DOAN.Tasks/TaskScheduler/YIDA_dataUpload_Task.cs index 5745115..5ea593c 100644 --- a/DOAN.Tasks/TaskScheduler/YIDA_dataUpload_Task.cs +++ b/DOAN.Tasks/TaskScheduler/YIDA_dataUpload_Task.cs @@ -27,52 +27,77 @@ using DOAN.Model.huate_group.recipe; using ZR.Model.Cloud.Dto; using DOAN.Common; using Infrastructure.Model; +using Microsoft.Extensions.Options; +using DOAN.Model; +using System.Net; -namespace ZR.Tasks.TaskScheduler +namespace DOAN.Tasks.TaskScheduler { [AppService(ServiceType = typeof(YIDA_dataUpload_Task), ServiceLifetime = LifeTime.Scoped)] - internal class YIDA_dataUpload_Task : JobBase, IJob + public class YIDA_dataUpload_Task : JobBase, IJob { private readonly NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private readonly MqttPublisher publisher = null; - public YIDA_dataUpload_Task() + + + private readonly MqttPublicConfig mqttPublicConfig; + public YIDA_dataUpload_Task(IOptions options) { - var options = App.OptionsSetting; - MqttPublicConfig mqttPublicConfig = options.mqttPublicConfig; + OptionsSetting OptionsSetting = options.Value; + mqttPublicConfig = OptionsSetting.mqttPublicConfig; //init MQTT - publisher = new MqttPublisher(mqttPublicConfig.BrokerIp, mqttPublicConfig.ClientId); + publisher = new MqttPublisher(mqttPublicConfig.BrokerIp, mqttPublicConfig.ClientId, mqttPublicConfig.Port); } public async Task Execute(IJobExecutionContext context) { - Console.WriteLine("---------------------------------上传开始"); - AbstractTrigger trigger = (context as JobExecutionContextImpl).Trigger as AbstractTrigger; - - var infoTask = await DbScoped.SugarScope.CopyNew() - .Queryable() - .FirstAsync(f => f.ID == trigger.JobName) ?? throw new CustomException($"任务{trigger?.JobName}宜搭数据上传任务执行失败,任务不存在"); - - if (infoTask != null) + try { - // 执行异步上传任务 - //Task t1 = dataUpLoad(); - //Task t2 = dataUpLoad(); - //Task t3 = dataUpLoad(); - //Task t4 = dataUpLoad(); - //Task t5 = dataUpLoad(); - //Task t6 = dataUpLoad(); - //Task t7 = dataUpLoad(); - //Task t8 = dataUpLoad(); - //Task t9 = dataUpLoad(); - //Task t10 = dataUpLoad(); - //Task t11 = dataUpLoad(); + await publisher.ConnectAsync(); + + Console.WriteLine("---------------------------------上传开始"); + AbstractTrigger trigger = (context as JobExecutionContextImpl).Trigger as AbstractTrigger; + var infoTask = await DbScoped.SugarScope.CopyNew() + .Queryable() + .FirstAsync(f => f.ID == trigger.JobName) ?? throw new CustomException($"任务{trigger?.JobName}宜搭数据上传任务执行失败,任务不存在"); + + if (infoTask != null) + { + // 执行异步上传任务 + Task t1 = dataUpLoad(); + //Task t2 = dataUpLoad(); + //Task t3 = dataUpLoad(); + //Task t4 = dataUpLoad(); + //Task t5 = dataUpLoad(); + //Task t6 = dataUpLoad(); + //Task t7 = dataUpLoad(); + //Task t8 = dataUpLoad(); + //Task t9 = dataUpLoad(); + //Task t10 = dataUpLoad(); + //Task t11 = dataUpLoad(); - //await Task.WhenAll(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11); + await Task.WhenAll(t1); - Console.WriteLine("上传代码,内部已屏蔽"); + //获取上传设备id + string tablecode = typeof(ZiyuanDevice01).Name; + ReflexAccount reflexAccount= DbScoped.SugarScope.CopyNew().AsTenant() + .QueryableWithAttr().Where(it=>it.TableName== tablecode).First(); - Console.WriteLine("---------------------------------上传结束"); + await publisher.PublishAsync(mqttPublicConfig.Topic+$"/{reflexAccount.FkDeviceId}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")) + ; + Console.WriteLine("---------------------------------上传结束"); + + } + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + finally + { + await publisher.DisconnectAsync(); + } } @@ -80,13 +105,13 @@ namespace ZR.Tasks.TaskScheduler /// 异步任务 数据上传 /// /// - public Task dataUpLoad() where T : ZiyuanDevice01, new() + public Task dataUpLoad() where T : ZiyuanDevice01, new() { - return Task.Run(() => + return Task.Run(() => { int flag = 0; //1.读取设备数据 - List device01s = DbScoped.SugarScope.Queryable().Where(it => it.IsUpload == 0).Take(60).ToList(); + List device01s = DbScoped.SugarScope.Queryable().Where(it => it.IsUpload == 0).Take(600).ToList(); List yidas = new List(); //2.上传到宜搭 @@ -95,7 +120,7 @@ namespace ZR.Tasks.TaskScheduler //2.2 上传数据 if (device01s != null && device01s.Count > 0) { - var query= DbScoped.SugarScope.Queryable().ToList(); + var query = DbScoped.SugarScope.Queryable().ToList(); foreach (T item in device01s) { //1.1 判断设备配置情况 @@ -105,7 +130,7 @@ namespace ZR.Tasks.TaskScheduler { //不可上传 if (!(bool)options.IsUpload) { - Console.WriteLine("设备名为" + typeof(T).Name+ "不可上传,禁止上传"); + Console.WriteLine("设备名为" + typeof(T).Name + "不可上传,禁止上传"); device01s.Remove(item); continue; @@ -113,53 +138,58 @@ namespace ZR.Tasks.TaskScheduler //开启过滤 if ((bool)options.Isfilter) { - if(item.Value>item.UpValue||item.Value item.UpValue || item.Value < item.LowValue) { Console.WriteLine("设备名为" + typeof(T).Name + "不合格数据,禁止上传"); device01s.Remove(item); continue; } - + } item.IsUpload = 1; } - - } - foreach (T item in device01s) + } + if (device01s.Count > 0) { - ZiyuanDevice_YIDA ziyuan = new ZiyuanDevice_YIDA(); - ziyuan.textField_l3plle21 = item.SupplierCode; - ziyuan.textField_l3plle22 = item.SupplierName; - ziyuan.textField_l3plle23 = item.Vehiclemodel; - ziyuan.textField_l3plle24 = item.Partnumber; - ziyuan.textField_l3plle25 = item.Partname; - ziyuan.textField_l3plle26 = item.Configuration; - ziyuan.textField_l3plle27 = item.Workstation; + + foreach (T item in device01s) + { + ZiyuanDevice_YIDA ziyuan = new ZiyuanDevice_YIDA(); + ziyuan.textField_l3plle21 = item.SupplierCode; + ziyuan.textField_l3plle22 = item.SupplierName; + ziyuan.textField_l3plle23 = item.Vehiclemodel; + ziyuan.textField_l3plle24 = item.Partnumber; + ziyuan.textField_l3plle25 = item.Partname; + ziyuan.textField_l3plle26 = item.Configuration; + ziyuan.textField_l3plle27 = item.Workstation; - ziyuan.textField_l3plle29 = item.Paramter; - ziyuan.numberField_l3plle2x = item.Value; - ziyuan.textField_l3plle2o = item.Value; - ziyuan.numberField_l3plle2y = item.LowValue; - ziyuan.textField_l3plle2q = item.LowValue; - ziyuan.textField_l3plle2s = item.UpValue; - ziyuan.numberField_l3plle2z = item.UpValue; - // ziyuan.dateField_l3plle30 = ((DateTime)item.MeasureTime).ToString("yyyyMMddHHmmss"); - ziyuan.dateField_l3plle30 = (long)(((DateTime)item.MeasureTime).ToUniversalTime().Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds; - ziyuan.textField_l3plle2u = ((DateTime)item.MeasureTime).ToString("yyyy-MM-dd HH:mm:ss"); - ziyuan.textField_l3plle2m = "张帆"; - yidas.Add(JsonConvert.SerializeObject(ziyuan)); + ziyuan.textField_l3plle29 = item.Paramter; + ziyuan.numberField_l3plle2x = item.Value; + ziyuan.textField_l3plle2o = item.Value; + ziyuan.numberField_l3plle2y = item.LowValue; + ziyuan.textField_l3plle2q = item.LowValue; + ziyuan.textField_l3plle2s = item.UpValue; + ziyuan.numberField_l3plle2z = item.UpValue; + // ziyuan.dateField_l3plle30 = ((DateTime)item.MeasureTime).ToString("yyyyMMddHHmmss"); + ziyuan.dateField_l3plle30 = (long)(((DateTime)item.MeasureTime).ToUniversalTime().Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds; + ziyuan.textField_l3plle2u = ((DateTime)item.MeasureTime).ToString("yyyy-MM-dd HH:mm:ss"); + ziyuan.textField_l3plle2m = "张帆"; + yidas.Add(JsonConvert.SerializeObject(ziyuan)); + } } - + + } - BatchSaveFormDataResponse response = UploadYIDABatch(tokenResponse.Body.AccessToken.ToString(), yidas); + BatchSaveFormDataResponse response = UploadYIDABatch(tokenResponse.Body.AccessToken.ToString(), yidas); if (response != null && response.StatusCode == 200) { - flag= DbScoped.SugarScope.Updateable(device01s).ExecuteCommand(); + + flag = DbScoped.SugarScope.Updateable(device01s).ExecuteCommand(); } - + @@ -295,7 +325,7 @@ namespace ZR.Tasks.TaskScheduler try { - response= client.BatchSaveFormDataWithOptions(batchSaveFormDataRequest, batchSaveFormDataHeaders, new AlibabaCloud.TeaUtil.Models.RuntimeOptions()); + response = client.BatchSaveFormDataWithOptions(batchSaveFormDataRequest, batchSaveFormDataHeaders, new AlibabaCloud.TeaUtil.Models.RuntimeOptions()); } catch (TeaException err) { @@ -321,7 +351,7 @@ namespace ZR.Tasks.TaskScheduler } - + return response; } diff --git a/Infrastructure/Model/OptionsSetting.cs b/Infrastructure/Model/OptionsSetting.cs index 1860eca..2c5a304 100644 --- a/Infrastructure/Model/OptionsSetting.cs +++ b/Infrastructure/Model/OptionsSetting.cs @@ -54,6 +54,10 @@ namespace Infrastructure.Model /// Mqtt发布配置 /// public MqttPublicConfig mqttPublicConfig { get; set; } + + + + } /// /// 发送邮件数据配置 @@ -189,6 +193,10 @@ namespace Infrastructure.Model /// 代理服务器 ip /// public string BrokerIp { get; set; } + /// + /// 端口号 + /// + public int Port { get; set; } /// /// 主题 @@ -199,5 +207,8 @@ namespace Infrastructure.Model /// 客户端ID /// public string ClientId { get; set; } + + + } }