宜搭 mqtt

This commit is contained in:
qianhao.xu 2024-08-27 17:29:21 +08:00
parent b42ea529f6
commit 30fc6e963e
5 changed files with 125 additions and 71 deletions

View File

@ -116,13 +116,15 @@
"vuePath": "" //egD:\Work\DOANAdmin-Vue3
},
//MQTT
"MqttPublisher": {
"MqttPublicConfig": {
//ip
"broker_ip": "192.168.0.58",
"BrokerIp": "192.168.0.58",
//
"Port": 1883,
//
"topic": "yida/log",
"Topic": "yida/log",
//id
"clientId": ""
"ClientId": "server01"
}
}

View File

@ -2,6 +2,7 @@
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Net;
using System.Threading.Tasks;
namespace DOAN.Common
@ -13,12 +14,12 @@ namespace DOAN.Common
private IMqttClient _client;
private MqttClientOptions _options;
public MqttPublisher(string brokerAddress, string clientId)
public MqttPublisher(string brokerAddress, string clientId, int port)
{
_client = new MqttFactory().CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithTcpServer(brokerAddress)
.WithTcpServer(brokerAddress,port)
.WithClientId(clientId)
.Build();
}
@ -33,6 +34,7 @@ namespace DOAN.Common
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithRetainFlag(true)
.WithQualityOfServiceLevel(qualityOfService)
.Build();

View File

@ -20,6 +20,15 @@ namespace DOAN.Model
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
/// <summary>
/// 设备ID
/// </summary>
[SugarColumn(ColumnName = "fk_device_id")]
public int? FkDeviceId { get; set; }
/// <summary>
/// FkTenantId
/// </summary>

View File

@ -27,52 +27,77 @@ using DOAN.Model.huate_group.recipe;
using ZR.Model.Cloud.Dto;
using DOAN.Common;
using Infrastructure.Model;
using Microsoft.Extensions.Options;
using DOAN.Model;
using System.Net;
namespace ZR.Tasks.TaskScheduler
namespace DOAN.Tasks.TaskScheduler
{
[AppService(ServiceType = typeof(YIDA_dataUpload_Task), ServiceLifetime = LifeTime.Scoped)]
internal class YIDA_dataUpload_Task : JobBase, IJob
public class YIDA_dataUpload_Task : JobBase, IJob
{
private readonly NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
private readonly MqttPublisher publisher = null;
public YIDA_dataUpload_Task()
private readonly MqttPublicConfig mqttPublicConfig;
public YIDA_dataUpload_Task(IOptions<OptionsSetting> options)
{
var options = App.OptionsSetting;
MqttPublicConfig mqttPublicConfig = options.mqttPublicConfig;
OptionsSetting OptionsSetting = options.Value;
mqttPublicConfig = OptionsSetting.mqttPublicConfig;
//init MQTT
publisher = new MqttPublisher(mqttPublicConfig.BrokerIp, mqttPublicConfig.ClientId);
publisher = new MqttPublisher(mqttPublicConfig.BrokerIp, mqttPublicConfig.ClientId, mqttPublicConfig.Port);
}
public async Task Execute(IJobExecutionContext context)
{
Console.WriteLine("---------------------------------上传开始");
AbstractTrigger trigger = (context as JobExecutionContextImpl).Trigger as AbstractTrigger;
var infoTask = await DbScoped.SugarScope.CopyNew()
.Queryable<SysTasks>()
.FirstAsync(f => f.ID == trigger.JobName) ?? throw new CustomException($"任务{trigger?.JobName}宜搭数据上传任务执行失败,任务不存在");
if (infoTask != null)
try
{
// 执行异步上传任务
//Task<int> t1 = dataUpLoad<ZiyuanDevice01>();
//Task<int> t2 = dataUpLoad<ZiyuanDevice02>();
//Task<int> t3 = dataUpLoad<ZiyuanDevice03>();
//Task<int> t4 = dataUpLoad<ZiyuanDevice04>();
//Task<int> t5 = dataUpLoad<ZiyuanDevice05>();
//Task<int> t6 = dataUpLoad<ZiyuanDevice06>();
//Task<int> t7 = dataUpLoad<ZiyuanDevice07>();
//Task<int> t8 = dataUpLoad<ZiyuanDevice08>();
//Task<int> t9 = dataUpLoad<ZiyuanDevice09>();
//Task<int> t10 = dataUpLoad<ZiyuanDevice10>();
//Task<int> t11 = dataUpLoad<ZiyuanDevice11>();
await publisher.ConnectAsync();
Console.WriteLine("---------------------------------上传开始");
AbstractTrigger trigger = (context as JobExecutionContextImpl).Trigger as AbstractTrigger;
var infoTask = await DbScoped.SugarScope.CopyNew()
.Queryable<SysTasks>()
.FirstAsync(f => f.ID == trigger.JobName) ?? throw new CustomException($"任务{trigger?.JobName}宜搭数据上传任务执行失败,任务不存在");
if (infoTask != null)
{
// 执行异步上传任务
Task<int> t1 = dataUpLoad<ZiyuanDevice01>();
//Task<int> t2 = dataUpLoad<ZiyuanDevice02>();
//Task<int> t3 = dataUpLoad<ZiyuanDevice03>();
//Task<int> t4 = dataUpLoad<ZiyuanDevice04>();
//Task<int> t5 = dataUpLoad<ZiyuanDevice05>();
//Task<int> t6 = dataUpLoad<ZiyuanDevice06>();
//Task<int> t7 = dataUpLoad<ZiyuanDevice07>();
//Task<int> t8 = dataUpLoad<ZiyuanDevice08>();
//Task<int> t9 = dataUpLoad<ZiyuanDevice09>();
//Task<int> t10 = dataUpLoad<ZiyuanDevice10>();
//Task<int> t11 = dataUpLoad<ZiyuanDevice11>();
//await Task.WhenAll(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11);
await Task.WhenAll(t1);
Console.WriteLine("上传代码,内部已屏蔽");
//获取上传设备id
string tablecode = typeof(ZiyuanDevice01).Name;
ReflexAccount reflexAccount= DbScoped.SugarScope.CopyNew().AsTenant()
.QueryableWithAttr<ReflexAccount>().Where(it=>it.TableName== tablecode).First();
Console.WriteLine("---------------------------------上传结束");
await publisher.PublishAsync(mqttPublicConfig.Topic+$"/{reflexAccount.FkDeviceId}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"))
;
Console.WriteLine("---------------------------------上传结束");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
finally
{
await publisher.DisconnectAsync();
}
}
@ -80,13 +105,13 @@ namespace ZR.Tasks.TaskScheduler
/// 异步任务 数据上传
/// </summary>
/// <returns></returns>
public Task<int> dataUpLoad<T>() where T : ZiyuanDevice01, new()
public Task<int> dataUpLoad<T>() where T : ZiyuanDevice01, new()
{
return Task.Run(() =>
return Task.Run(() =>
{
int flag = 0;
//1.读取设备数据
List<T> device01s = DbScoped.SugarScope.Queryable<T>().Where(it => it.IsUpload == 0).Take(60).ToList();
List<T> device01s = DbScoped.SugarScope.Queryable<T>().Where(it => it.IsUpload == 0).Take(600).ToList();
List<string> yidas = new List<string>();
//2.上传到宜搭
@ -95,7 +120,7 @@ namespace ZR.Tasks.TaskScheduler
//2.2 上传数据
if (device01s != null && device01s.Count > 0)
{
var query= DbScoped.SugarScope.Queryable<Recipee>().ToList();
var query = DbScoped.SugarScope.Queryable<Recipee>().ToList();
foreach (T item in device01s)
{
//1.1 判断设备配置情况
@ -105,7 +130,7 @@ namespace ZR.Tasks.TaskScheduler
{ //不可上传
if (!(bool)options.IsUpload)
{
Console.WriteLine("设备名为" + typeof(T).Name+ "不可上传,禁止上传");
Console.WriteLine("设备名为" + typeof(T).Name + "不可上传,禁止上传");
device01s.Remove(item);
continue;
@ -113,53 +138,58 @@ namespace ZR.Tasks.TaskScheduler
//开启过滤
if ((bool)options.Isfilter)
{
if(item.Value>item.UpValue||item.Value<item.LowValue)
if (item.Value > item.UpValue || item.Value < item.LowValue)
{
Console.WriteLine("设备名为" + typeof(T).Name + "不合格数据,禁止上传");
device01s.Remove(item);
continue;
}
}
item.IsUpload = 1;
}
}
foreach (T item in device01s)
}
if (device01s.Count > 0)
{
ZiyuanDevice_YIDA ziyuan = new ZiyuanDevice_YIDA();
ziyuan.textField_l3plle21 = item.SupplierCode;
ziyuan.textField_l3plle22 = item.SupplierName;
ziyuan.textField_l3plle23 = item.Vehiclemodel;
ziyuan.textField_l3plle24 = item.Partnumber;
ziyuan.textField_l3plle25 = item.Partname;
ziyuan.textField_l3plle26 = item.Configuration;
ziyuan.textField_l3plle27 = item.Workstation;
foreach (T item in device01s)
{
ZiyuanDevice_YIDA ziyuan = new ZiyuanDevice_YIDA();
ziyuan.textField_l3plle21 = item.SupplierCode;
ziyuan.textField_l3plle22 = item.SupplierName;
ziyuan.textField_l3plle23 = item.Vehiclemodel;
ziyuan.textField_l3plle24 = item.Partnumber;
ziyuan.textField_l3plle25 = item.Partname;
ziyuan.textField_l3plle26 = item.Configuration;
ziyuan.textField_l3plle27 = item.Workstation;
ziyuan.textField_l3plle29 = item.Paramter;
ziyuan.numberField_l3plle2x = item.Value;
ziyuan.textField_l3plle2o = item.Value;
ziyuan.numberField_l3plle2y = item.LowValue;
ziyuan.textField_l3plle2q = item.LowValue;
ziyuan.textField_l3plle2s = item.UpValue;
ziyuan.numberField_l3plle2z = item.UpValue;
// ziyuan.dateField_l3plle30 = ((DateTime)item.MeasureTime).ToString("yyyyMMddHHmmss");
ziyuan.dateField_l3plle30 = (long)(((DateTime)item.MeasureTime).ToUniversalTime().Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds;
ziyuan.textField_l3plle2u = ((DateTime)item.MeasureTime).ToString("yyyy-MM-dd HH:mm:ss");
ziyuan.textField_l3plle2m = "张帆";
yidas.Add(JsonConvert.SerializeObject(ziyuan));
ziyuan.textField_l3plle29 = item.Paramter;
ziyuan.numberField_l3plle2x = item.Value;
ziyuan.textField_l3plle2o = item.Value;
ziyuan.numberField_l3plle2y = item.LowValue;
ziyuan.textField_l3plle2q = item.LowValue;
ziyuan.textField_l3plle2s = item.UpValue;
ziyuan.numberField_l3plle2z = item.UpValue;
// ziyuan.dateField_l3plle30 = ((DateTime)item.MeasureTime).ToString("yyyyMMddHHmmss");
ziyuan.dateField_l3plle30 = (long)(((DateTime)item.MeasureTime).ToUniversalTime().Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds;
ziyuan.textField_l3plle2u = ((DateTime)item.MeasureTime).ToString("yyyy-MM-dd HH:mm:ss");
ziyuan.textField_l3plle2m = "张帆";
yidas.Add(JsonConvert.SerializeObject(ziyuan));
}
}
}
BatchSaveFormDataResponse response = UploadYIDABatch(tokenResponse.Body.AccessToken.ToString(), yidas);
BatchSaveFormDataResponse response = UploadYIDABatch(tokenResponse.Body.AccessToken.ToString(), yidas);
if (response != null && response.StatusCode == 200)
{
flag= DbScoped.SugarScope.Updateable<T>(device01s).ExecuteCommand();
flag = DbScoped.SugarScope.Updateable<T>(device01s).ExecuteCommand();
}
@ -295,7 +325,7 @@ namespace ZR.Tasks.TaskScheduler
try
{
response= client.BatchSaveFormDataWithOptions(batchSaveFormDataRequest, batchSaveFormDataHeaders, new AlibabaCloud.TeaUtil.Models.RuntimeOptions());
response = client.BatchSaveFormDataWithOptions(batchSaveFormDataRequest, batchSaveFormDataHeaders, new AlibabaCloud.TeaUtil.Models.RuntimeOptions());
}
catch (TeaException err)
{
@ -321,7 +351,7 @@ namespace ZR.Tasks.TaskScheduler
}
return response;
}

View File

@ -54,6 +54,10 @@ namespace Infrastructure.Model
/// Mqtt发布配置
/// </summary>
public MqttPublicConfig mqttPublicConfig { get; set; }
}
/// <summary>
/// 发送邮件数据配置
@ -189,6 +193,10 @@ namespace Infrastructure.Model
/// 代理服务器 ip
/// </summary>
public string BrokerIp { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int Port { get; set; }
/// <summary>
/// 主题
@ -199,5 +207,8 @@ namespace Infrastructure.Model
/// 客户端ID
/// </summary>
public string ClientId { get; set; }
}
}