From 84f3bd6807199ae2112c8bba284825accf2aa0c8 Mon Sep 17 00:00:00 2001 From: Yang <17363321594@163.com> Date: Tue, 25 Nov 2025 16:23:46 +0800 Subject: [PATCH] =?UTF-8?q?MQTT=E9=87=87=E9=9B=86=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DryEquipmentScreenController.java | 1 - shgx-dryingroom/pom.xml | 15 + .../dryingroom/alarm/AlarmLightHandler.java | 10 +- .../collect/AutoCollectService.java | 606 ++++++++++-------- .../com/shgx/dryingroom/mqtt/MqttConsole.java | 149 ----- .../shgx/dryingroom/mqtt/MqttDataHandler.java | 121 ++++ .../com/shgx/dryingroom/mqtt/MqttService.java | 117 +--- .../utils/AdcToTemperatureConverter.java | 33 + .../dryingroom/web/domain/DryAlarmLog.java | 10 +- .../web/domain/DryEquipmentDictionary.java | 111 +++- .../mapper/DryEquipmentDictionaryMapper.java | 1 + .../impl/DryEquipmentScreenServiceImpl.java | 315 ++++++--- 12 files changed, 819 insertions(+), 670 deletions(-) delete mode 100644 shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttConsole.java create mode 100644 shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttDataHandler.java create mode 100644 shgx-dryingroom/src/main/java/com/shgx/dryingroom/utils/AdcToTemperatureConverter.java diff --git a/shgx-admin/src/main/java/com/shgx/web/controller/dryingroom/DryEquipmentScreenController.java b/shgx-admin/src/main/java/com/shgx/web/controller/dryingroom/DryEquipmentScreenController.java index 346310f..1524de8 100644 --- a/shgx-admin/src/main/java/com/shgx/web/controller/dryingroom/DryEquipmentScreenController.java +++ b/shgx-admin/src/main/java/com/shgx/web/controller/dryingroom/DryEquipmentScreenController.java @@ -4,7 +4,6 @@ import com.shgx.common.core.controller.BaseController; import com.shgx.common.core.domain.AjaxResult; import com.shgx.dryingroom.web.domain.dto.EquipmentScreenDTO; import com.shgx.dryingroom.web.service.IDryEquipmentScreenService; -import com.shgx.dryingroom.mqtt.MqttConsole; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; diff --git a/shgx-dryingroom/pom.xml b/shgx-dryingroom/pom.xml index 7f47b53..a0a7b88 100644 --- a/shgx-dryingroom/pom.xml +++ b/shgx-dryingroom/pom.xml @@ -51,6 +51,21 @@ compile + + + org.mapstruct + mapstruct + 1.5.5.Final + + + + org.mapstruct + mapstruct-processor + 1.5.5.Final + provided + + + \ No newline at end of file diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/alarm/AlarmLightHandler.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/alarm/AlarmLightHandler.java index 33cd797..6cd6b09 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/alarm/AlarmLightHandler.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/alarm/AlarmLightHandler.java @@ -7,14 +7,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * 报警灯TCP消息处理器(多设备版本,携带deviceId) + * 报警灯TCP消息处理器 */ public class AlarmLightHandler extends SimpleChannelInboundHandler { private static final Logger log = LoggerFactory.getLogger(AlarmLightHandler.class); private final MqttService mqttService; - private final String deviceId; // 新增:当前处理器对应的设备ID + + // 当前处理器对应的设备ID + private final String deviceId; /** * 构造方法:传入MQTT服务和设备ID @@ -25,7 +27,7 @@ public class AlarmLightHandler extends SimpleChannelInboundHandler { } /** - * 读取设备响应(核心修改:响应中携带deviceId) + * 读取设备响应 */ @Override protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) { @@ -41,7 +43,7 @@ public class AlarmLightHandler extends SimpleChannelInboundHandler { } } - // ---------------------- 以下方法增加deviceId日志和主题 ---------------------- + // ---------------------- deviceId日志和主题 ---------------------- @Override public void channelActive(ChannelHandlerContext ctx) { log.info("🔗 设备[{}]报警灯TCP连接已建立", deviceId); diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/collect/AutoCollectService.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/collect/AutoCollectService.java index 09b592e..9f3d5d5 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/collect/AutoCollectService.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/collect/AutoCollectService.java @@ -1,26 +1,31 @@ package com.shgx.dryingroom.collect; +import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.shgx.common.utils.SecurityUtils; +import com.shgx.common.utils.DateUtils; import com.shgx.dryingroom.alarm.AlarmLightService; +import com.shgx.dryingroom.mqtt.MqttDataHandler; import com.shgx.dryingroom.mqtt.MqttService; import com.shgx.dryingroom.web.domain.DryAlarmLog; -import com.shgx.dryingroom.web.mapper.DryAlarmLogMapper; -import lombok.extern.slf4j.Slf4j; -import com.baomidou.dynamic.datasource.annotation.DS; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.shgx.common.utils.DateUtils; import com.shgx.dryingroom.web.domain.DryEquipmentInfo; import com.shgx.dryingroom.web.domain.DryMonitoringData; import com.shgx.dryingroom.web.domain.DryParamThreshold; +import com.shgx.dryingroom.web.mapper.DryAlarmLogMapper; import com.shgx.dryingroom.web.mapper.DryEquipmentInfoMapper; import com.shgx.dryingroom.web.mapper.DryMonitoringDataMapper; import com.shgx.dryingroom.web.mapper.DryParamThresholdMapper; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; + import javax.annotation.Resource; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -50,310 +55,345 @@ public class AutoCollectService { @Resource private AlarmLightService alarmLightService; - // 设备编码列表(延迟加载,首次采集时初始化) - private List allEquipmentCodes; + // 设备缓存(设备ID→设备详情,仅加载一次) + private Map equipmentInfoCache; - // 采集次数计数器(用于判断报警轮次) - private int collectCount = 0; + // Redis Key 前缀 + private static final String REDIS_EQUIP_PREFIX = "dry_equipment:"; + // 前端 MQTT 推送主题 + private static final String FRONTEND_TOPIC = "equipment/data_update"; + // 设备总数(30台) + private static final int TOTAL_EQUIPMENT = 30; + // 时间格式化器 + private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - // 随机数生成器 - private final Random random = new Random(); + @Resource + private RedisTemplate redisTemplate; - /** - * 定时任务:每10秒执行一次数据采集 - */ - @Scheduled(cron = "0/59 * * * * ?") + // ==================== 独立任务1:每5分钟持久化Redis数据到数据库 ==================== + @Scheduled(cron = "0 0/5 * * * ?") @Transactional(rollbackFor = Exception.class) - public void autoCollectData() { - log.info("===== 开始执行自动采集任务 ====="); + public void persistRedisDataToDb() { + log.info("===== 【持久化任务】开始执行Redis数据→数据库 ====="); - // 1. 加载设备列表(首次执行时) - if (allEquipmentCodes == null || allEquipmentCodes.isEmpty()) { - loadEquipmentCodes(); - if (allEquipmentCodes.isEmpty()) { - log.warn("未加载到任何设备,本次采集任务终止"); + // 1. 加载设备缓存(首次执行时) + if (equipmentInfoCache == null || equipmentInfoCache.isEmpty()) { + loadEquipmentCache(); + if (equipmentInfoCache.isEmpty()) { + log.warn("【持久化任务】未加载到设备信息,任务终止"); return; } - log.info("成功加载{}台设备,开始生成监测数据", allEquipmentCodes.size()); } - // 2. 判断报警轮次 - collectCount++; - boolean isAlarmRound = collectCount % 2 == 0; - List alarmEquipCodes = new ArrayList<>(); - if (isAlarmRound) { - alarmEquipCodes = selectRandomEquipments(1); - log.info("本次为报警轮次,随机选中1台报警设备:{}", alarmEquipCodes); - } else { - log.info("本次为正常轮次(第{}次采集),无报警设备", collectCount); - } - - // 3. 生成并循环插入监测数据 - LocalDateTime collectTime = LocalDateTime.now(); - int insertSuccess = 0; - int total = allEquipmentCodes.size(); - - for (String equipCode : allEquipmentCodes) { - try { - if (equipmentInfoMapper.selectOne(new LambdaQueryWrapper() - .eq(DryEquipmentInfo::getEquipmentCode, equipCode)) - .getEquipmentType() - .equals("2")) { - // 生成露点数据 - DryMonitoringData data1 = generateMonitoringData(equipCode, alarmEquipCodes.contains(equipCode), collectTime, "DEW"); - monitoringDataMapper.insert(data1); - judgeAlarm(data1, "DEW"); - log.debug("设备[{}]数据插入成功:{}(状态:{})", - equipCode, data1.getDataValue(), - data1.getDataStatus().equals("1") ? "报警" : "正常"); - } - // 生成温度数据 - DryMonitoringData data2 = generateMonitoringData(equipCode, alarmEquipCodes.contains(equipCode), collectTime, "TEMP"); - monitoringDataMapper.insert(data2); - // 判断是否需要报警 - judgeAlarm(data2, "TEMP"); - insertSuccess++; - log.debug("设备[{}]数据插入成功:{}(状态:{})", - equipCode, data2.getDataValue(), - data2.getDataStatus().equals("1") ? "报警" : "正常"); - } catch (Exception e) { - log.error("设备[{}]数据插入失败", equipCode, e); - // 若需保证所有数据要么全成功要么全失败,可抛出异常触发事务回滚 - // throw new RuntimeException("设备[" + equipCode + "]数据插入失败", e); - } - } - // 4. 每30秒推送消息 "1" 给前端 - sendDataUpdateNotification(); - - // 4. 打印插入结果汇总 - log.info("===== 本次采集任务结束 ====="); - log.info("数据插入汇总:共需插入{}条,成功{}条,失败{}条", - total, insertSuccess, total - insertSuccess); - } - - /** - * 加载所有设备(首次采集时执行) - */ - private void loadEquipmentCodes() { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); -// List equipmentList = equipmentInfoMapper.selectList(queryWrapper); -// System.out.println("从数据库查询到的设备数量:" + equipmentList.size() ); // 打印设备数量 - allEquipmentCodes = equipmentInfoMapper.selectList(queryWrapper) - .stream() - .map(DryEquipmentInfo::getEquipmentCode) - .collect(Collectors.toList()); - } - - /** - * 随机选择N台设备作为报警设备 - */ - private List selectRandomEquipments(int n) { - // 复制设备列表并打乱顺序 - List shuffled = new ArrayList<>(allEquipmentCodes); - Collections.shuffle(shuffled); - // 取前N台设备(若设备总数不足N,返回全部) - return shuffled.subList(0, Math.min(n, shuffled.size())); - } - - /** - * 生成单台设备的监测数据 - * @param equipCode 设备编码 - * @param isAlarm 是否强制报警(超阈值) - * @param collectTime 采集时间 - */ - private DryMonitoringData generateMonitoringData(String equipCode, boolean isAlarm, LocalDateTime collectTime, String paramCode) { - // 查询该设备的阈值规则 - LambdaQueryWrapper thresholdWrapper = new LambdaQueryWrapper<>(); - thresholdWrapper.eq(DryParamThreshold::getEquipmentCode, equipCode).eq(DryParamThreshold::getParamCode, paramCode); - DryParamThreshold threshold = thresholdMapper.selectOne(thresholdWrapper); - // 新建监测数据 - DryMonitoringData data = new DryMonitoringData(); - data.setEquipmentCode(equipCode); - // 判断设备类型 - if ("TEMP".equals(paramCode)) { - data.setParamCode("TEMP"); - } else { - data.setParamCode("DEW"); - } - data.setCollectorCode("AUTO-COLLECTOR-001"); - data.setCreateBy("shgx"); - data.setCreateTime(DateUtils.getNowDate()); - data.setCollectTime(collectTime); - data.setRemark("自动采集数据"); - // 如果是没有采集规则 - if (threshold == null) { - double value; - if (isAlarm) { - // 报警:生成超阈值数据 - if (paramCode == "TEMP") { - double min = 50; - double max = 60; - value = random.nextBoolean() - ? min - (1 + random.nextDouble() * 4) // 低于最小值 - : max + (1 + random.nextDouble() * 4); // 高于最大值 - data.setDataStatus("1"); // 异常状态 - } else { - double min = 5; - double max = 6; - value = random.nextBoolean() - ? min - (1 + random.nextDouble() * 4) // 低于最小值 - : max + (1 + random.nextDouble() * 4); // 高于最大值 - data.setDataStatus("1"); // 异常状态 - } - } else { - // 正常:生成阈值范围内的随机值 - if (paramCode == "TEMP") { - double min = 50; - double max = 60; - value = min + random.nextDouble() * (max - min); - data.setDataStatus("0"); // 正常状态 - } else { - double min = 5; - double max = 6; - value = min + random.nextDouble() * (max - min); - data.setDataStatus("0"); // 正常状态 - } - } - - // 格式化数据(保留1位小数,拼接单位) - String unit = "°C"; - data.setDataValue(String.format("%.1f%s", value, unit)); - - log.info("生成设备[{}]的监测数据:{},状态:{}", - equipCode, - data.getDataValue(), - isAlarm ? "报警" : "正常"); - return data; - } - // 有采集规则 - else { - // 安全解析阈值 - double min = parseThresholdValue(threshold.getMinValue()); - double max = parseThresholdValue(threshold.getMaxValue()); - double value; - - if (isAlarm) { - // 报警:生成超阈值数据 - value = random.nextBoolean() - ? min - (1 + random.nextDouble() * 4) // 低于最小值 - : max + (1 + random.nextDouble() * 4); // 高于最大值 - data.setDataStatus("1"); // 异常状态 - } else { - // 正常:生成阈值范围内的随机值 - value = min + random.nextDouble() * (max - min); - data.setDataStatus("0"); // 正常状态 - } - - // 格式化数据(保留1位小数,拼接单位) - String unit = threshold.getUnit(); - data.setDataValue(String.format("%.1f%s", value, unit)); - - log.info("生成设备[{}]的监测数据:{},状态:{}", - equipCode, - data.getDataValue(), - isAlarm ? "报警" : "正常"); - return data; - } - } - - /** - * 判断是否报警 - */ - private void judgeAlarm(DryMonitoringData data, String paramCode) { - - LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); - // 查询该设备规则列表 - List list = thresholdMapper.selectList(new LambdaQueryWrapper().eq(DryParamThreshold::getEquipmentCode, data.getEquipmentCode())); - if (list == null || list.size() == 0) { + // 2. 扫描Redis中所有设备数据Key + Set redisKeys = redisTemplate.keys(REDIS_EQUIP_PREFIX + "*:*"); + if (redisKeys == null || redisKeys.isEmpty()) { + log.warn("【持久化任务】Redis中无待持久化数据,任务终止"); return; } - // 新建报警日志对象 - DryAlarmLog log = new DryAlarmLog(); - // 循环规则列表 - for (DryParamThreshold threshold : list) { - if (threshold.getParamCode().equals(paramCode)) { - if (parseThresholdValue(threshold.getMaxValue()) < parseThresholdValue(data.getDataValue())) { - // 插入报警日志 - log.setAlarmType("1"); - log.setEquipmentCode(data.getEquipmentCode()); - log.setParamCode(threshold.getParamCode()); - log.setAlarmValue(data.getDataValue()); - log.setAlarmTime(LocalDateTime.now()); - log.setAlarmStatus("0"); - log.setCreateBy("shgx"); - log.setCreateTime(DateUtils.getNowDate()); - logMapper.insert(log); - // 修改设备状态 - updateWrapper.eq(DryEquipmentInfo::getEquipmentCode, data.getEquipmentCode()) - .set(DryEquipmentInfo::getStatus, '2'); - equipmentInfoMapper.update(updateWrapper); - /** MQTT向报警灯发送报警 */ - alarmLightService.triggerAllAlarmsByEquipment(data.getEquipmentCode()); + + // 3. 解析Redis数据,转换为DryMonitoringData + List dbDataList = new ArrayList<>(); + for (String redisKey : redisKeys) { + try { + MqttDataHandler.EquipmentData redisData = (MqttDataHandler.EquipmentData) redisTemplate.opsForValue().get(redisKey); + if (redisData == null) continue; + + // 转换为数据库实体 + DryMonitoringData dbData = convertToDryMonitoringData(redisData); + if (dbData == null) continue; + + // 报警判断(持久化时顺便校验,不影响前端推送) + judgeAlarm(dbData); + + dbDataList.add(dbData); + } catch (Exception e) { + log.error("【持久化任务】解析Redis数据失败 → Key:{}", redisKey, e); + } + } + + // 4. 批量写入数据库 + if (!dbDataList.isEmpty()) { + monitoringDataMapper.batchInsert(dbDataList); + log.info("【持久化任务】✅ 成功 - 共{}条数据写入dry_monitoring_data表", dbDataList.size()); + } else { + log.warn("【持久化任务】无有效数据可持久化"); + } + + log.info("===== 【持久化任务】执行结束 ====="); + } + + // ==================== 独立任务2:每10秒从Redis推数据给前端 ==================== + @Scheduled(cron = "0/10 * * * * ?") + public void pushRedisDataToFrontend() { + log.info("===== 【前端推送任务】开始执行Redis数据→前端 ====="); + + // 1. 加载设备缓存(首次执行时) + if (equipmentInfoCache == null || equipmentInfoCache.isEmpty()) { + loadEquipmentCache(); + if (equipmentInfoCache.isEmpty()) { + log.warn("【前端推送任务】未加载到设备信息,任务终止"); + return; + } + } + + // 2. 从Redis读取30台设备的最新数据(仅读,不修改Redis) + List result = buildFrontendDataList(); + + // 3. 过滤掉空数据(只保留有效数据) + List validResult = result.stream() + .filter(Objects::nonNull) // 过滤null对象 + .filter(data -> data.getEquipmentCode() != null && !data.getEquipmentCode().isEmpty()) // 设备编码非空 + .filter(data -> data.getDataValue() != null && !data.getDataValue().isEmpty()) // 数据值非空 + .collect(Collectors.toList()); + + // 4. 组装JSON并推送(只推送有效数据) + try { + JSONObject pushData = new JSONObject(); + pushData.put("data", validResult); // 推送过滤后的有效数据 + pushData.put("pushTime", LocalDateTime.now().format(DATETIME_FORMATTER)); + pushData.put("total", validResult.size()); // 有效数据条数 + String jsonStr = pushData.toJSONString(); + + // 发送到前端MQTT主题(仅推送,不涉及数据库) + boolean success = mqttService.sendMessage(FRONTEND_TOPIC, jsonStr); + if (success) { + log.info("【前端推送任务】✅ 成功 → 主题:{},有效数据条数:{}", FRONTEND_TOPIC, validResult.size()); + } else { + log.warn("【前端推送任务】❌ 失败 → 主题:{}", FRONTEND_TOPIC); + } + } catch (Exception e) { + log.error("【前端推送任务】推送异常", e); + } + + log.info("===== 【前端推送任务】执行结束 ====="); + } + + // ==================== 工具方法(仅内部调用,分离职责) ==================== + /** + * 加载设备缓存 + */ + private void loadEquipmentCache() { + log.info("加载设备信息缓存..."); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + List equipmentList = equipmentInfoMapper.selectList(queryWrapper); + + // 构建缓存:key=设备ID(格式化后:001-030),value=设备详情 + equipmentInfoCache = equipmentList.stream() + .collect(Collectors.toMap( + eq -> { + // 增加空判断,避免格式转换异常 + if (eq.getLocation() == null || eq.getLocation().isEmpty()) { + log.warn("设备location为空,跳过该设备 → 设备编码:{}", eq.getEquipmentCode()); + return ""; + } + try { + return String.format("%03d", Integer.parseInt(eq.getLocation())); + } catch (NumberFormatException e) { + log.warn("设备location格式异常 → location:{},设备编码:{}", eq.getLocation(), eq.getEquipmentCode()); + return eq.getLocation(); + } + }, + eq -> eq, + (k1, k2) -> k1 // 避免重复ID冲突 + )) + // 过滤掉key为空的缓存项 + .entrySet().stream() + .filter(entry -> !entry.getKey().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + log.info("设备缓存加载完成,共{}台有效设备", equipmentInfoCache.size()); + } + + /** + * 从Redis读取数据 + */ + private List buildFrontendDataList() { + List result = new ArrayList<>(); + + // 遍历30台设备(001-030) + for (int i = 1; i <= TOTAL_EQUIPMENT; i++) { + String equipmentId = String.format("%03d", i); // 设备ID:001-030 + DryEquipmentInfo equipmentInfo = equipmentInfoCache.get(equipmentId); + if (equipmentInfo == null) { + log.warn("【前端推送任务】设备ID{}无对应信息,跳过", equipmentId); + continue; + } + + // 从Redis读取该设备的最新数据(Key格式:dry_equipment:001:TEMP) + String redisKey = REDIS_EQUIP_PREFIX + equipmentId + ":TEMP"; // 假设参数编码是TEMP + MqttDataHandler.EquipmentData redisData = (MqttDataHandler.EquipmentData) redisTemplate.opsForValue().get(redisKey); + + // 增加redisData空判断,避免传递null + if (redisData == null) { + log.debug("【前端推送任务】设备ID{}无Redis数据,跳过", equipmentId); + continue; + } + + // 转换为数据库实体(此时redisData非null) + DryMonitoringData dbData = convertToDryMonitoringData(redisData); + if (dbData == null) { + log.debug("【前端推送任务】设备ID{}数据转换失败,跳过", equipmentId); + continue; + } + + // 报警判断(持久化时顺便校验,不影响前端推送) + judgeAlarm(dbData); + + result.add(dbData); + } + + return result; + } + + /** + * Redis数据转换为数据库实体(增加空判断) + */ + private DryMonitoringData convertToDryMonitoringData(MqttDataHandler.EquipmentData redisData) { + // 入参空判断(避免null指针) + if (redisData == null) { + log.error("【转换失败】redisData为null,无法转换"); + return null; + } + + // equipmentId空判断 + String equipmentId = redisData.getEquipmentId(); + if (equipmentId == null || equipmentId.isEmpty()) { + log.error("【转换失败】redisData的equipmentId为空"); + return null; + } + + DryEquipmentInfo equipmentInfo = equipmentInfoCache.get(equipmentId); + if (equipmentInfo == null) { + log.warn("【持久化任务】设备ID{}无对应信息,跳过", equipmentId); + return null; + } + + DryMonitoringData dbData = new DryMonitoringData(); + dbData.setEquipmentCode(equipmentInfo.getEquipmentCode()); + dbData.setParamCode(redisData.getParamCode()); + // 数据值空判断(避免存储空值) + if (redisData.getValue() != null) { + dbData.setDataValue(String.valueOf(redisData.getValue())); + } else { + log.warn("【转换失败】设备ID{}的redis数据值为空", equipmentId); + return null; + } + dbData.setCollectTime(LocalDateTime.ofInstant( + java.time.Instant.ofEpochMilli(redisData.getTime()), + java.time.ZoneId.systemDefault() + )); + dbData.setDataStatus("1"); // 默认为正常 + dbData.setCollectorCode("MQTT-REDIS"); + dbData.setCreateBy("shgx"); + dbData.setCreateTime(DateUtils.getNowDate()); + dbData.setRemark("Redis定时持久化数据"); + + return dbData; + } + + /** + * 温度报警判断(增加空判断) + */ + private void judgeAlarm(DryMonitoringData data) { + // 入参空判断 + if (data == null) { + log.warn("【报警判断】数据为空,跳过"); + return; + } + + // 设备编码空判断 + String equipmentCode = data.getEquipmentCode(); + if (equipmentCode == null || equipmentCode.isEmpty()) { + log.warn("【报警判断】设备编码为空,跳过"); + return; + } + + DryEquipmentInfo info = equipmentInfoMapper.selectOne( + new LambdaQueryWrapper().eq(DryEquipmentInfo::getEquipmentCode, equipmentCode) + ); + if (info == null) { + log.warn("【报警判断】设备编码{}无对应设备信息,跳过", equipmentCode); + return; + } + + List thresholds = thresholdMapper.selectList( + new LambdaQueryWrapper().eq(DryParamThreshold::getEquipmentCode, equipmentCode) + ); + if (thresholds.isEmpty()) return; + + DryAlarmLog alarmLog = new DryAlarmLog(); + LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); + boolean isAlarm = false; + + for (DryParamThreshold threshold : thresholds) { + // 增加阈值对象空判断 + if (threshold == null) { + log.warn("【报警判断】设备编码{}的阈值规则为空,跳过", equipmentCode); + continue; + } + // 规则过滤(修复逻辑:用||改为&&,满足一个条件就跳过,原逻辑可能有误) + if (!threshold.getRuleCode().equals(info.getRuleCode()) + || !threshold.getIsEnabled().equals("1") + || !threshold.getParamCode().equals("TEMP")) { + continue; + } + + try { + // 阈值空判断 + if (threshold.getMaxValue() == null || threshold.getMinValue() == null) { + log.warn("【报警判断】设备编码{}的阈值为空,跳过", equipmentCode); + continue; } - if (parseThresholdValue(threshold.getMinValue()) > parseThresholdValue(data.getDataValue())) { - // 插入报警日志 - log.setAlarmType("2"); - log.setEquipmentCode(data.getEquipmentCode()); - log.setParamCode(threshold.getParamCode()); - log.setAlarmValue(data.getDataValue()); - log.setAlarmTime(LocalDateTime.now()); - log.setAlarmStatus("0"); - log.setCreateBy("shgx"); - log.setCreateTime(DateUtils.getNowDate()); - updateWrapper.eq(DryEquipmentInfo::getEquipmentCode, data.getEquipmentCode()) - .set(DryEquipmentInfo::getStatus, '2'); - // 修改设备状态 - equipmentInfoMapper.update(updateWrapper); - logMapper.insert(log); - /** MQTT向报警灯发送报警 */ - alarmLightService.triggerAllAlarmsByEquipment(data.getEquipmentCode()); - } else { - return; + double max = parseThresholdValue(threshold.getMaxValue()); + double min = parseThresholdValue(threshold.getMinValue()); + double value = parseThresholdValue(data.getDataValue()); + + if (value > max) { + alarmLog.setAlarmType("1"); // 超上限 + isAlarm = true; + } else if (value < min) { + alarmLog.setAlarmType("2"); // 超下限 + isAlarm = true; } + if (isAlarm) { + alarmLog.setEquipmentCode(equipmentCode); + alarmLog.setParamCode(threshold.getParamCode()); + alarmLog.setAlarmValue(data.getDataValue()); + alarmLog.setAlarmTime(LocalDateTime.now()); + alarmLog.setCreateBy("system"); + alarmLog.setCreateTime(DateUtils.getNowDate()); + logMapper.insert(alarmLog); + + // 更新设备状态为报警 + updateWrapper.eq(DryEquipmentInfo::getEquipmentCode, equipmentCode) + .set(DryEquipmentInfo::getStatus, '2'); + equipmentInfoMapper.update(null, updateWrapper); + + // 触发报警灯 + alarmLightService.triggerAllAlarmsByEquipment(equipmentCode); + break; + } + } catch (Exception e) { + log.error("【报警判断】设备编码{}校验异常", equipmentCode, e); } } } /** - * 安全解析阈值数值(处理包含百分号、单位等特殊情况) + * 阈值解析工具 */ private double parseThresholdValue(String valueStr) { if (valueStr == null || valueStr.trim().isEmpty()) { - throw new IllegalArgumentException("阈值数值不能为空"); + throw new IllegalArgumentException("阈值不能为空"); } - try { - // 移除所有非数字字符(除了小数点、负号) String cleanValue = valueStr.replaceAll("[^\\d.-]", "").trim(); - - if (cleanValue.isEmpty()) { - throw new IllegalArgumentException("清理后的数值为空,原始值: " + valueStr); - } - return Double.parseDouble(cleanValue); } catch (NumberFormatException e) { - log.error("解析阈值数值失败: {}, 错误: {}", valueStr, e.getMessage()); - throw new RuntimeException("阈值数值格式错误: " + valueStr, e); - } - } - - /** - * 每30s更新数据时推送"1"给前端 - */ - private void sendDataUpdateNotification() { - try { - // 发送数字 "1" 到前端 - boolean success = mqttService.sendMessage("equipment/data_update", "1"); - - if (success) { - log.info("✅ 数据更新通知已发送到前端: 1"); - } else { - log.warn("❌ 数据更新通知发送失败"); - } - - } catch (Exception e) { - log.error("发送数据更新通知异常: {}", e.getMessage()); + log.error("解析阈值失败:{}", valueStr, e); + throw new RuntimeException("阈值格式错误:" + valueStr); } } } \ No newline at end of file diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttConsole.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttConsole.java deleted file mode 100644 index 7ab6840..0000000 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttConsole.java +++ /dev/null @@ -1,149 +0,0 @@ -package com.shgx.dryingroom.mqtt; - -import com.shgx.dryingroom.alarm.AlarmLightService; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; - -import java.util.Scanner; -import java.util.function.Consumer; - -/** - * MQTT调试控制台(适配多报警灯设备) - */ -@Component -public class MqttConsole implements CommandLineRunner { - - @Autowired - private MqttService mqttService; - - @Autowired - private AlarmLightService alarmLightService; - - @Override - public void run(String... args) { - new Thread(() -> { - try { - Thread.sleep(3000); // 等待服务初始化 - - // 关键修正:添加监听器(参数类型为 Consumer,仅接收消息体) - // 若需要获取消息主题,需在 MqttService 中修改监听器定义(见下文补充) - mqttService.addMessageListener(new Consumer() { - @Override - public void accept(MqttMessage message) { - // 从消息体获取内容(若需主题,需修改 MqttService 监听器为接收 topic+message) - String payload = new String(message.getPayload()); - // 这里无法直接获取topic,若需topic需按补充方案修改MqttService - System.out.println("\n🎯 收到MQTT消息: " + payload); - } - }); - - startConsole(); // 启动控制台交互 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - System.err.println("控制台线程中断: " + e.getMessage()); - } - }).start(); - } - - // 以下方法保持不变(仅修正监听器部分,其他逻辑无问题) - private void startConsole() { - Scanner scanner = new Scanner(System.in); - - System.out.println("\n=== MQTT调试控制台 ==="); - System.out.println("命令列表:"); - System.out.println(" 打印指令:"); - System.out.println(" print [设备ID] [消息] - 发送打印指令(示例:print PRINTER_01 测试打印)"); - System.out.println(" 通用设备指令:"); - System.out.println(" device [系统] [类型] [设备ID] [消息] [QoS] - 发送设备指令(示例:device temp sensor TEMP_01 25 1)"); - System.out.println(" 报警灯指令(多设备版):"); - System.out.println(" alarm [设备ID] ON - 打开指定报警灯(示例:alarm ALARM_01 ON)"); - System.out.println(" alarm [设备ID] OFF - 关闭指定报警灯(示例:alarm ALARM_02 OFF)"); - System.out.println(" alarm [设备ID] SLOW - 慢闪(示例:alarm ALARM_03 SLOW)"); - System.out.println(" alarm [设备ID] FAST - 快闪(示例:alarm ALARM_01 FAST)"); - System.out.println(" alarm [设备ID] STATUS - 查询状态(示例:alarm ALARM_02 STATUS)"); - System.out.println(" alarm [设备ID] CONNECTION - 检查连接(示例:alarm ALARM_03 CONNECTION)"); - System.out.println(" alarm [设备ID] [十六进制指令] - 发送自定义指令(示例:alarm ALARM_01 01 06 04 0E 00 03 A9 38)"); - System.out.println(" 状态检查:"); - System.out.println(" status - 查看MQTT连接状态"); - System.out.println(" exit - 退出控制台"); - System.out.println("=============================\n"); - - while (true) { - System.out.print("mqtt> "); - String input = scanner.nextLine().trim(); - if (input.equalsIgnoreCase("exit")) { - break; - } - processCommand(input); - } - - scanner.close(); - System.out.println("控制台已退出"); - } - - private void processCommand(String input) { - try { - if (input.startsWith("print ")) { - handlePrint(input); - } else if (input.startsWith("device ")) { - handleDevice(input); - } else if (input.equalsIgnoreCase("status")) { - handleStatus(); - } else if (input.startsWith("alarm ")) { - handleAlarmLight(input); - } else if (!input.isEmpty()) { - System.out.println("❌ 未知命令,请输入 'exit' 退出或查看命令列表"); - } - } catch (Exception e) { - System.out.println("❌ 命令执行失败: " + e.getMessage()); - } - } - - private void handlePrint(String input) { - String[] parts = input.substring(6).split(" ", 2); - if (parts.length == 2) { - boolean success = mqttService.sendPrintCommand(parts[0], parts[1]); - System.out.println(success ? "✅ 打印指令发送成功" : "❌ 打印指令发送失败"); - } else { - System.out.println("❌ 格式错误,正确格式: print [设备ID] [消息]"); - } - } - - private void handleDevice(String input) { - String[] parts = input.substring(7).split(" ", 5); - if (parts.length >= 4) { - String system = parts[0]; - String commandType = parts[1]; - String deviceId = parts[2]; - String message = parts[3]; - int qos = parts.length >= 5 ? Integer.parseInt(parts[4]) : 0; - - boolean success = mqttService.sendDeviceCommand(system, commandType, deviceId, message, qos); - System.out.println(success ? "✅ 设备指令发送成功" : "❌ 设备指令发送失败"); - } else { - System.out.println("❌ 格式错误,正确格式: device [系统] [类型] [设备ID] [消息] [QoS]"); - } - } - - private void handleStatus() { - boolean connected = mqttService.isConnected(); - System.out.println(connected ? "✅ MQTT已连接到 " + mqttService.getBrokerUrl() : "❌ MQTT未连接"); - } - - private void handleAlarmLight(String input) { - String[] parts = input.substring(6).trim().split(" ", 2); - if (parts.length < 2) { - System.out.println("❌ 格式错误,正确格式: alarm [设备ID] [指令]"); - System.out.println(" 示例: alarm ALARM_01 ON (打开ID为ALARM_01的报警灯)"); - return; - } - - String deviceId = parts[0]; - String command = parts[1].toUpperCase(); - boolean success = false; - - System.out.println("设备[" + deviceId + "] " + (success ? "✅ 指令发送成功" : "❌ 指令发送失败")); - } -} \ No newline at end of file diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttDataHandler.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttDataHandler.java new file mode 100644 index 0000000..1127160 --- /dev/null +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttDataHandler.java @@ -0,0 +1,121 @@ +package com.shgx.dryingroom.mqtt; + +import com.alibaba.fastjson2.JSONObject; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import java.util.Set; +import java.util.function.Consumer; +import javax.annotation.Resource; + +import static com.shgx.dryingroom.utils.AdcToTemperatureConverter.convert; + +@Component +public class MqttDataHandler implements Consumer { + + private static final Logger log = LoggerFactory.getLogger(MqttDataHandler.class); + + // Redis 存储 Key 前缀(统一覆盖30台设备) + private static final String REDIS_EQUIP_PREFIX = "dry_equipment:"; + + // 参数编码(固定业务约定) + private static final String PARAM_TEMP = "TEMP"; // 温度参数(默认) + private static final String PARAM_DEW = "DEW";// 自定义参数(对应消息中字段"1") + + // 设备ID范围校验(30台设备:001-030,对应干燥机001-026、露点干燥机001-004) + private static final int MIN_EQUIP_ID = 1; + private static final int MAX_EQUIP_ID = 30; + + @Resource + private RedisTemplate redisTemplate; + + @Override + public void accept(MqttMessage message) { + try { + String payload = new String(message.getPayload()); + JSONObject data = JSONObject.parseObject(payload); + + // 1. 解析基础字段 + Long time = data.getLong("time"); + JSONObject params = data.getJSONObject("params"); + if (time == null || params == null) { + log.warn("MQTT消息缺少time或params字段 → 消息:{}", payload); + return; + } + + // 2. 遍历params中所有设备ID,存储对应参数(默认温度) + Set equipIdSet = params.keySet(); + for (String equipIdStr : equipIdSet) { + + // 校验设备ID合法性(必须是1-30的数字) + Integer equipId = validateEquipId(equipIdStr); + if (equipId == null) continue; + + // 格式化设备ID(补0为3位,如1→001,21→021) + String formatEquipId = String.format("%03d", equipId); + // 解析参数值 + Double paramValue = convert(params.getDouble(equipIdStr)); + if (paramValue == null) { + log.warn("设备{}参数值为空 → 消息:{}", formatEquipId, payload); + continue; + } + + // 3. 存储到Redis(设备ID-温度参数) +// String redisKey = REDIS_EQUIP_PREFIX + formatEquipId + ":" + PARAM_TEMP; + String redisKey = REDIS_EQUIP_PREFIX + formatEquipId + ":" + PARAM_TEMP; + redisTemplate.opsForValue().set(redisKey, new EquipmentData(time, paramValue, PARAM_TEMP, formatEquipId)); + log.info("✅ 存储设备数据 - 设备:{},参数:{},值:{},Key:{}", + formatEquipId, PARAM_TEMP, paramValue, redisKey); + } + } catch (Exception e) { + log.error("❌ 处理MQTT设备数据失败:{}", e.getMessage(), e); + } + } + + /** + * 校验设备ID合法性(必须是1-30的整数) + */ + private Integer validateEquipId(String equipIdStr) { + try { + int equipId = Integer.parseInt(equipIdStr); + if (equipId >= MIN_EQUIP_ID && equipId <= MAX_EQUIP_ID) { + return equipId; + } else { + log.warn("设备ID超出范围(1-30) → 非法ID:{}", equipIdStr); + return null; + } + } catch (NumberFormatException e) { + log.warn("设备ID格式非法(必须是数字) → 非法ID:{}", equipIdStr); + return null; + } + } + + /** + * 设备数据实体类(包含设备ID,便于后续持久化) + */ + public static class EquipmentData { + private Long time; // 采集时间戳 + private Double value; // 参数值 + private String paramCode; // 参数编码(TEMP/PARAM1) + private String equipmentId;// 设备ID(格式化后,如021) + + public EquipmentData(Long time, Double value, String paramCode, String equipmentId) { + this.time = time; + this.value = value; + this.paramCode = paramCode; + this.equipmentId = equipmentId; + } + + // Getter/Setter(JSON序列化必需) + public Long getTime() { return time; } + public void setTime(Long time) { this.time = time; } + public Double getValue() { return value; } + public void setValue(Double value) { this.value = value; } + public String getParamCode() { return paramCode; } + public void setParamCode(String paramCode) { this.paramCode = paramCode; } + public String getEquipmentId() { return equipmentId; } + public void setEquipmentId(String equipmentId) { this.equipmentId = equipmentId; } + } +} \ No newline at end of file diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttService.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttService.java index 3c04acc..bda59fd 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttService.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/mqtt/MqttService.java @@ -1,7 +1,6 @@ package com.shgx.dryingroom.mqtt; import com.alibaba.fastjson2.JSONObject; -import com.shgx.common.utils.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,12 +9,14 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.*; import java.util.function.Consumer; /** - * 统一的MQTT消息服务(适配多报警灯设备) + * MQTT消息服务 */ @Service public class MqttService implements MqttCallback { @@ -32,9 +33,13 @@ public class MqttService implements MqttCallback { // 消息监听器列表(支持多组件注册) private List> messageListeners = new ArrayList<>(); + @Resource + private MqttDataHandler mqttDataHandler; + @PostConstruct public void init() { connect(); + addMessageListener(mqttDataHandler); } @PreDestroy @@ -59,8 +64,8 @@ public class MqttService implements MqttCallback { // 订阅主题:报警灯控制主题(支持多设备通配符) String[] topics = { - "alarm/#", // 原有报警主题 - "alarm_light/+/control" // 新增:报警灯控制主题(格式:alarm_light/[设备ID]/control) + "alarm_light/+/control", + "shgx/equipment/data/#" }; int[] qos = {1, 1}; // 对应主题的QoS mqttClient.subscribe(topics, qos); @@ -84,13 +89,13 @@ public class MqttService implements MqttCallback { } } - // ==================== 消息接收相关(保持原有逻辑,适配多设备) ==================== + // ==================== 消息接收相关 ==================== @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload()); log.info("📨 收到消息 - 主题: {}, QoS: {}, 内容: {}", topic, message.getQos(), payload); - // 控制台显示(保持原有) + // 控制台显示 System.out.println("\n🎯 收到客户端消息:"); System.out.println(" 主题: " + topic); System.out.println(" QoS: " + message.getQos()); @@ -131,7 +136,7 @@ public class MqttService implements MqttCallback { log.info("✅ 已移除MQTT消息监听器: {}", listener.getClass().getSimpleName()); } - // ==================== 基础发送方法(保持原有) ==================== + // ==================== 基础发送方法 ==================== public boolean sendMessage(String topic, String message) { return sendMessage(topic, message, 1); } @@ -174,102 +179,6 @@ public class MqttService implements MqttCallback { } } - // ==================== 业务发送方法(新增报警灯相关) ==================== - - // 1. 原有业务方法(保持不变) - public boolean sendToUser(Long userId, String message) { - String topic = TOPIC_PREFIX + "user/" + userId; - return sendMessage(topic, message); - } - - public boolean sendSystemNotification(Long userId, String title, String content) { - String topic = TOPIC_PREFIX + "user/" + userId; - JSONObject notification = new JSONObject(); - notification.put("type", "system_notification"); - notification.put("title", title); - notification.put("content", content); - notification.put("timestamp", System.currentTimeMillis()); - return sendMessage(topic, notification.toJSONString()); - } - - public boolean sendApprovalNotification(Long userId, String processName, String status, String remark) { - String topic = TOPIC_PREFIX + "user/" + userId; - JSONObject approval = new JSONObject(); - approval.put("type", "approval_notification"); - approval.put("processName", processName); - approval.put("status", status); - approval.put("remark", StringUtils.isNotEmpty(remark) ? remark : ""); - approval.put("timestamp", System.currentTimeMillis()); - approval.put("content", String.format("您的【%s】申请已%s", processName, status)); - return sendMessage(topic, approval.toJSONString()); - } - - public boolean broadcast(String message) { - String topic = TOPIC_PREFIX + "broadcast"; - return sendMessage(topic, message); - } - - public boolean broadcastJson(String type, Object data) { - String topic = TOPIC_PREFIX + "broadcast"; - return sendJsonMessage(topic, type, data); - } - - public boolean sendOnlineCount(int count) { - JSONObject data = new JSONObject(); - data.put("onlineCount", count); - data.put("timestamp", System.currentTimeMillis()); - return broadcastJson("online_count", data); - } - - public boolean sendPrintCommand(String deviceId, String message) { - return sendDeviceCommand("shgx_mes", "print", deviceId, message, 0); - } - - public boolean sendDeviceCommand(String system, String commandType, String deviceId, String message, int qos) { - String topic = system + "/" + commandType + "/" + deviceId; - JSONObject msgBody = new JSONObject(); - msgBody.put("msg", message); - return sendMessage(topic, msgBody.toJSONString(), qos); - } - - public boolean sendRealtimeData(String dataType, Object data) { - String topic = TOPIC_PREFIX + "realtime/" + dataType; - return sendJsonMessage(topic, "data_update", data); - } - - // 2. 新增:报警灯专用发送方法(适配多设备) - /** - * 发送报警灯控制结果到MQTT(供AlarmLightService调用) - * @param deviceId 报警灯设备ID - * @param command 发送的指令(如ON、OFF) - * @param success 是否成功 - */ - public boolean sendAlarmLightControlResult(String deviceId, String command, boolean success) { - // 主题格式:alarm_light/[设备ID]/control_result(与MqttConsole和前端约定) - String topic = "alarm_light/" + deviceId + "/control_result"; - JSONObject result = new JSONObject(); - result.put("deviceId", deviceId); - result.put("command", command); - result.put("success", success); - result.put("timestamp", System.currentTimeMillis()); - return sendJsonMessage(topic, "alarm_light_control_result", result); - } - - /** - * 发送报警灯状态响应到MQTT(供AlarmLightHandler调用) - * @param deviceId 报警灯设备ID - * @param hexData 设备返回的十六进制状态数据 - */ - public boolean sendAlarmLightStatus(String deviceId, String hexData) { - // 主题格式:alarm_light/[设备ID]/response - String topic = "alarm_light/" + deviceId + "/response"; - JSONObject status = new JSONObject(); - status.put("deviceId", deviceId); - status.put("data", hexData); // 原始十六进制数据 - status.put("timestamp", System.currentTimeMillis()); - return sendJsonMessage(topic, "alarm_light_status", status); - } - // ==================== 工具方法 ==================== public boolean isConnected() { return mqttClient != null && mqttClient.isConnected(); @@ -286,7 +195,7 @@ public class MqttService implements MqttCallback { } } - // 新增:获取当前连接的Broker地址(供控制台显示) + // 获取当前连接的Broker地址(供控制台显示) public String getBrokerUrl() { return brokerUrl; } diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/utils/AdcToTemperatureConverter.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/utils/AdcToTemperatureConverter.java new file mode 100644 index 0000000..f247374 --- /dev/null +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/utils/AdcToTemperatureConverter.java @@ -0,0 +1,33 @@ +package com.shgx.dryingroom.utils; + + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AdcToTemperatureConverter { + + // 硬件参数(从表格提取,固定值) + private static final int ADC_LOW = 4000; // ADC下限 + private static final int ADC_HIGH = 20000; // ADC上限 + private static final double TEMP_LOW = 0.0; // 温度下限(°C) + private static final double TEMP_HIGH = 200.0; // 温度上限(°C) + + /** + * 核心方法:将ADC电流值转换为温度值 + * @param adcValue 电流采样值(ADC值,如11963) + * @return 温度值(°C,保留1位小数),异常时返回null + */ + public static Double convert(double adcValue) { + // 1. 校验ADC值是否在有效范围内 + if (adcValue < ADC_LOW || adcValue > ADC_HIGH) { + log.warn("ADC值{}超出有效范围[{}, {}],转换失败", adcValue, ADC_LOW, ADC_HIGH); + return null; + } + + // 2. 应用线性转换公式 + double temperature = (adcValue - ADC_LOW) * (TEMP_HIGH - TEMP_LOW) / (ADC_HIGH - ADC_LOW) + TEMP_LOW; + + // 3. 保留1位小数 + return Math.round(temperature * 10) / 10.0; + } +} \ No newline at end of file diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryAlarmLog.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryAlarmLog.java index a768b0e..eca925e 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryAlarmLog.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryAlarmLog.java @@ -58,10 +58,10 @@ public class DryAlarmLog extends BaseEntity @TableField("restore_time") private LocalDateTime restoreTime; - /** 报警状态:0-未处理,1-已处理,2-已忽略 */ - @Excel(name = "报警状态") - @TableField("alarm_status") - private String alarmStatus; +// /** 报警状态:0-未处理,1-已处理,2-已忽略 */ +// @Excel(name = "报警状态") +// @TableField("alarm_status") +// private String alarmStatus; @Override public String toString() { @@ -73,7 +73,7 @@ public class DryAlarmLog extends BaseEntity .append("alarmValue", getAlarmValue()) .append("alarmTime", getAlarmTime()) .append("restoreTime", getRestoreTime()) - .append("alarmStatus", getAlarmStatus()) +// .append("alarmStatus", getAlarmStatus()) .append("createBy", getCreateBy()) .append("createTime", getCreateTime()) .append("updateBy", getUpdateBy()) diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryEquipmentDictionary.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryEquipmentDictionary.java index 8f36a04..7ad4839 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryEquipmentDictionary.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/domain/DryEquipmentDictionary.java @@ -27,32 +27,93 @@ public class DryEquipmentDictionary extends BaseEntity @TableId(type = IdType.AUTO) private Long id; - /** 设备号 */ - @TableField("equipment_code") - private String equipmentCode; + @TableField("value01") + private String value01; - /** 设备名称 */ - @Excel(name = "设备名称") - @TableField("equipment_name") - private String equipmentName; + @TableField("value02") + private String value02; - /** 站点 */ - @Excel(name = "站点") - @TableField("site") - private String site; + @TableField("value03") + private String value03; - @Override - public String toString() { - return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) - .append("id", getId()) - .append("equipmentCode", getEquipmentCode()) - .append("equipmentName", getEquipmentName()) - .append("equipmentType", getSite()) - .append("createBy", getCreateBy()) - .append("createTime", getCreateTime()) - .append("updateBy", getUpdateBy()) - .append("updateTime", getUpdateTime()) - .append("remark", getRemark()) - .toString(); - } + @TableField("value04") + private String value04; + + @TableField("value05") + private String value05; + + @TableField("value06") + private String value06; + + @TableField("value07") + private String value07; + + @TableField("value08") + private String value08; + + @TableField("value09") + private String value09; + + @TableField("value10") + private String value10; + + @TableField("value11") + private String value11; + + @TableField("value12") + private String value12; + + @TableField("value13") + private String value13; + + @TableField("value14") + private String value14; + + @TableField("value15") + private String value15; + + @TableField("value16") + private String value16; + + @TableField("value17") + private String value17; + + @TableField("value18") + private String value18; + + @TableField("value19") + private String value19; + + @TableField("value20") + private String value20; + + @TableField("value21") + private String value21; + + @TableField("value22") + private String value22; + + @TableField("value23") + private String value23; + + @TableField("value24") + private String value24; + + @TableField("value25") + private String value25; + + @TableField("value26") + private String value26; + + @TableField("value27") + private String value27; + + @TableField("value28") + private String value28; + + @TableField("value29") + private String value29; + + @TableField("value30") + private String value30; } diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/mapper/DryEquipmentDictionaryMapper.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/mapper/DryEquipmentDictionaryMapper.java index 3b54fd3..acde9f9 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/mapper/DryEquipmentDictionaryMapper.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/mapper/DryEquipmentDictionaryMapper.java @@ -1,4 +1,5 @@ package com.shgx.dryingroom.web.mapper; public interface DryEquipmentDictionaryMapper { + } diff --git a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/service/impl/DryEquipmentScreenServiceImpl.java b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/service/impl/DryEquipmentScreenServiceImpl.java index 2e45fd1..e90eb4d 100644 --- a/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/service/impl/DryEquipmentScreenServiceImpl.java +++ b/shgx-dryingroom/src/main/java/com/shgx/dryingroom/web/service/impl/DryEquipmentScreenServiceImpl.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.shgx.common.utils.StringUtils; import com.shgx.common.utils.bean.BeanUtils; import com.shgx.dryingroom.alarm.AlarmLightService; +import com.shgx.dryingroom.mqtt.MqttDataHandler; import com.shgx.dryingroom.web.domain.DryEquipmentInfo; import com.shgx.dryingroom.web.domain.DryMonitoringData; import com.shgx.dryingroom.web.domain.DryParamThreshold; @@ -15,9 +16,12 @@ import com.shgx.dryingroom.web.mapper.DryMonitoringDataMapper; import com.shgx.dryingroom.web.mapper.DryParamThresholdMapper; import com.shgx.dryingroom.web.service.IDryEquipmentScreenService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import javax.annotation.Resource; +import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @@ -43,6 +47,15 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService @Autowired private AlarmLightService alarmLightService; + @Resource + private RedisTemplate redisTemplate; + + // Redis Key 前缀(与 AutoCollectService 保持一致) + private static final String REDIS_EQUIP_PREFIX = "dry_equipment:"; + + // 支持的参数编码(与 MqttDataHandler 一致) + private static final List SUPPORT_PARAM_CODES = Arrays.asList("TEMP", "PARAM1", "DEW"); + // 干燥机列表 @Override public List dryingList() { @@ -56,24 +69,31 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService // 按设备号字符串自然排序(GZ-001 < GZ-002 < ... < GZ-026) equipmentList.sort(Comparator.comparing(DryEquipmentInfo::getEquipmentCode)); - // 2. 提取所有干燥机设备号 - List equipmentCodes = equipmentList.stream() - .map(DryEquipmentInfo::getEquipmentCode) - .collect(Collectors.toList()); + // 2. 提取所有干燥机设备号 + 对应的设备ID(location字段,如021) + Map equipmentCodeToLocationMap = equipmentList.stream() + .collect(Collectors.toMap( + DryEquipmentInfo::getEquipmentCode, // key:设备号(如GZ-021) + DryEquipmentInfo::getLocation, // value:设备ID(如021) + (k1, k2) -> k1 // 避免重复设备号冲突 + )); + List equipmentCodes = new ArrayList<>(equipmentCodeToLocationMap.keySet()); - // 3. 调用新版多参数方法(返回 Map<设备号, Map<参数编码, 最新值>>) - Map> latestParamDataMap = getLatestDataMap(equipmentCodes); + // 3. 从 Redis 读取数据并转换为 DryMonitoringData 列表(核心改造) + List monitoringDataList = getDryMonitoringDataFromRedis(equipmentCodeToLocationMap); - // 4. 查询每个设备的阈值规则列表(不变) + // 4. 按“设备号+参数编码”分组,取最新数据(按采集时间倒序) + Map> latestDataMap = groupLatestMonitoringData(monitoringDataList); + + // 5. 查询每个设备的阈值规则列表(不变) Map> paramMap = getParamMap(equipmentCodes); - // 5. 组装DTO:仅提取TEMP(温度)参数的最新值 + // 6. 组装DTO:提取 TEMP 参数的最新值(复用原有逻辑) return equipmentList.stream() .map(equipment -> { String setTem = (equipment != null && StringUtils.isNotBlank(equipment.getRuleCode())) ? Optional.ofNullable(paramMapper.selectOne( new LambdaQueryWrapper().eq(DryParamThreshold::getRuleCode, equipment.getRuleCode()) - .eq(DryParamThreshold::getIsEnabled,"1") + .eq(DryParamThreshold::getIsEnabled, "1") )).map(DryParamThreshold::getMaxValue).orElse(null) : null; EquipmentScreenDTO.DryingDTO dto = new EquipmentScreenDTO.DryingDTO(); @@ -86,9 +106,10 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService dto.setEquipmentType("1"); dto.setParamList(paramMap.getOrDefault(equipmentCode, Collections.emptyList())); - // 关键:从多参数映射中提取TEMP(温度)值,无数据则返回空字符串 - Map paramData = latestParamDataMap.getOrDefault(equipmentCode, new HashMap<>()); - dto.setDataValue(paramData.getOrDefault("TEMP", "")); // 仅取温度值,匹配原有字段类型 + // 从分组数据中提取 TEMP 参数的最新值 + Map paramDataMap = latestDataMap.getOrDefault(equipmentCode, new HashMap<>()); + DryMonitoringData tempData = paramDataMap.get("TEMP"); + dto.setDataValue(tempData != null ? tempData.getDataValue() : ""); // 直接复用 DryMonitoringData 的 dataValue(带单位) return dto; }) @@ -96,39 +117,200 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService } /** - * 获取每个设备的最新监测值 + * 除湿干燥机列表:同时查询最新的温度(TEMP)和露点(DEW)值 */ - private Map> getLatestDataMap(List equipmentCodes) { - // 1. 批量查询设备的所有监测数据(包含参数编码,用于区分TEMP和DEW) - LambdaQueryWrapper dataWrapper = new LambdaQueryWrapper<>(); - dataWrapper.in(DryMonitoringData::getEquipmentCode, equipmentCodes) - .select(DryMonitoringData::getEquipmentCode, - DryMonitoringData::getParamCode, - DryMonitoringData::getDataValue, - DryMonitoringData::getCreateTime); // 只查需要的字段,提升性能 + @Override + public List drynessList() { + // 1. 查询所有除湿干燥机(equipment_type=2) + LambdaQueryWrapper eqWrapper = new LambdaQueryWrapper<>(); + eqWrapper.eq(DryEquipmentInfo::getEquipmentType, '2'); + List equipmentList = infoMapper.selectList(eqWrapper); + if (equipmentList.isEmpty()) { + return Collections.emptyList(); + } - List dataList = dataMapper.selectList(dataWrapper); + // 按设备号字符串自然排序(GZ-001 < GZ-002 < ... < GZ-026) + equipmentList.sort(Comparator.comparing(DryEquipmentInfo::getEquipmentCode)); - // 2. 按“设备号+参数编码”分组,每组取创建时间最新的一条数据 + // 2. 提取所有除湿干燥机的设备号 + 对应的设备ID(location字段) + Map equipmentCodeToLocationMap = equipmentList.stream() + .collect(Collectors.toMap( + DryEquipmentInfo::getEquipmentCode, + DryEquipmentInfo::getLocation, + (k1, k2) -> k1 + )); + + // 3. 从 Redis 读取数据并转换为 DryMonitoringData 列表(核心改造) + List monitoringDataList = getDryMonitoringDataFromRedis(equipmentCodeToLocationMap); + + // 4. 按“设备号+参数编码”分组,取最新数据 + Map> latestDataMap = groupLatestMonitoringData(monitoringDataList); + + // 5. 查询每个设备的阈值规则列表(不变) + Map> paramMap = getParamMap(new ArrayList<>(equipmentCodeToLocationMap.keySet())); + + // 6. 组装DTO:提取 TEMP 和 DEW 参数的最新值 + return equipmentList.stream() + .map(equipment -> { + String setTem = (equipment != null && StringUtils.isNotBlank(equipment.getRuleCode())) + ? Optional.ofNullable(paramMapper.selectOne( + new LambdaQueryWrapper().eq(DryParamThreshold::getRuleCode, equipment.getRuleCode()) + .eq(DryParamThreshold::getIsEnabled, "1") + )).map(DryParamThreshold::getMaxValue).orElse(null) + : null; + EquipmentScreenDTO.DrynessDTO dto = new EquipmentScreenDTO.DrynessDTO(); + String equipmentCode = equipment.getEquipmentCode(); + // 基础信息赋值 + dto.setEquipmentCode(equipmentCode); + dto.setEquipmentName(equipment.getEquipmentName()); + dto.setStatus(equipment.getStatus()); + dto.setSetTem(setTem); + dto.setEquipmentType("2"); + dto.setParamList(paramMap.getOrDefault(equipmentCode, Collections.emptyList())); + + // 提取 TEMP 和 DEW 参数的最新值(直接复用 DryMonitoringData 的 dataValue) + Map paramDataMap = latestDataMap.getOrDefault(equipmentCode, new HashMap<>()); + DryMonitoringData tempData = paramDataMap.get("TEMP"); + DryMonitoringData dewData = paramDataMap.get("DEW"); // 或 PARAM1,按业务映射 + + dto.setTempValue(tempData != null ? tempData.getDataValue() : ""); + dto.setDewValue(dewData != null ? dewData.getDataValue() : ""); + + return dto; + }) + .collect(Collectors.toList()); + } + + /** + * 核心改造:从 Redis 读取数据并转换为 DryMonitoringData 列表(复用 AutoCollectService 逻辑) + * @param equipmentCodeToLocationMap 设备号→设备ID映射 + * @return DryMonitoringData 列表 + */ + private List getDryMonitoringDataFromRedis(Map equipmentCodeToLocationMap) { + List resultList = new ArrayList<>(); + + // 1. 批量构建 Redis Key 前缀(按设备ID+参数编码) + List redisKeyPatterns = new ArrayList<>(); + for (String location : equipmentCodeToLocationMap.values()) { + for (String paramCode : SUPPORT_PARAM_CODES) { + String keyPattern = REDIS_EQUIP_PREFIX + location + ":" + paramCode; + redisKeyPatterns.add(keyPattern); + } + } + + // 2. 批量扫描 Redis Key + Set matchedKeys = new HashSet<>(); + for (String pattern : redisKeyPatterns) { + Set keys = redisTemplate.keys(pattern); + if (keys != null && !keys.isEmpty()) { + matchedKeys.addAll(keys); + } + } + + // 3. 批量读取 Redis 数据 + List redisDataList = redisTemplate.opsForValue().multiGet(new ArrayList<>(matchedKeys)); + if (redisDataList == null || redisDataList.isEmpty()) { + return resultList; + } + + // 4. 解析并转换为 DryMonitoringData(复用 AutoCollectService 的转换逻辑) + for (int i = 0; i < matchedKeys.size(); i++) { + String redisKey = new ArrayList<>(matchedKeys).get(i); + Object redisData = redisDataList.get(i); + + if (redisData == null || !(redisData instanceof MqttDataHandler.EquipmentData)) { + continue; + } + + // 解析 Redis Key:提取设备ID、参数编码 + String[] keyParts = redisKey.split(":"); + if (keyParts.length != 3) { + continue; // 非法 Key 格式,跳过 + } + String location = keyParts[1]; // 设备ID(如021) + String paramCode = keyParts[2]; // 参数编码(如TEMP) + + // 找到对应的设备号(如 GZ-021) + String equipmentCode = null; + for (Map.Entry entry : equipmentCodeToLocationMap.entrySet()) { + if (entry.getValue().equals(location)) { + equipmentCode = entry.getKey(); + break; + } + } + if (equipmentCode == null) { + continue; // 未找到对应设备,跳过 + } + + // 转换为 DryMonitoringData(完全复用 AutoCollectService 的 getDryMonitoringData 逻辑) + MqttDataHandler.EquipmentData equipmentData = (MqttDataHandler.EquipmentData) redisData; + DryMonitoringData monitoringData = convertToDryMonitoringData(equipmentData, equipmentCode, paramCode); + resultList.add(monitoringData); + } + + return resultList; + } + + /** + * 转换为 DryMonitoringData(完全复用 AutoCollectService 的逻辑,确保格式一致) + */ + private DryMonitoringData convertToDryMonitoringData(MqttDataHandler.EquipmentData redisData, String equipmentCode, String paramCode) { + DryMonitoringData dbData = new DryMonitoringData(); + + // 设备编码(直接从映射中获取,无需再次查询数据库) + dbData.setEquipmentCode(equipmentCode); + // 参数编码(TEMP/PARAM1/DEW) + dbData.setParamCode(paramCode); + + // 数据值(带单位,与 AutoCollectService 保持一致) + String unit = "°C"; + if ("PARAM1".equals(paramCode)) { + unit = "自定义单位"; // 按业务调整 + } else if ("DEW".equals(paramCode)) { + unit = "°C"; // 露点单位 + } + dbData.setDataValue(String.format("%.1f%s", redisData.getValue(), unit)); + + // 采集时间(优先使用 Redis 中的时间戳,无则用当前时间) + LocalDateTime collectTime = redisData.getTime() != null + ? LocalDateTime.ofInstant( + java.time.Instant.ofEpochMilli(redisData.getTime()), + java.time.ZoneId.systemDefault() + ) + : LocalDateTime.now(); + dbData.setCollectTime(collectTime); + + // 其他字段(与 AutoCollectService 一致) + dbData.setDataStatus("1"); // 默认为正常状态(可按业务调整) + dbData.setCollectorCode("MQTT-REDIS"); + dbData.setCreateBy("system"); + dbData.setCreateTime(new Date()); + dbData.setRemark("Redis实时查询数据"); + + return dbData; + } + + /** + * 按“设备号+参数编码”分组,取每组最新的一条数据(按采集时间倒序) + */ + private Map> groupLatestMonitoringData(List dataList) { return dataList.stream() .collect(Collectors.groupingBy( - // 第一层分组:按设备号 - DryMonitoringData::getEquipmentCode, - // 第二层分组:按参数编码(TEMP/DEW),并取每组最新的一条数据 + DryMonitoringData::getEquipmentCode, // 第一层:设备号 Collectors.groupingBy( - DryMonitoringData::getParamCode, + DryMonitoringData::getParamCode, // 第二层:参数编码 Collectors.collectingAndThen( - // 按创建时间倒序,取第一条(最新) - Collectors.maxBy(Comparator.comparing(DryMonitoringData::getCreateTime)), - // 提取最新数据的监测值(无数据时返回空字符串) - opt -> opt.map(DryMonitoringData::getDataValue).orElse("") + // 按采集时间倒序,取第一条(最新) + Collectors.maxBy(Comparator.comparing(DryMonitoringData::getCollectTime)), + // 提取最新数据(无数据时返回null) + opt -> opt.orElse(null) ) ) )); } /** - * 获取每个设备的阈值规则列表 + * 保留原有方法:获取每个设备的阈值规则列表(无需修改) */ private Map> getParamMap(List equipmentCodes) { // 批量查询设备的所有阈值规则 @@ -150,72 +332,7 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService } /** - * 除湿干燥机列表 - * @return - */ - /** - * 除湿干燥机列表:同时查询最新的温度(TEMP)和露点(DEW)值 - */ - @Override - public List drynessList() { - // 1. 查询所有除湿干燥机(equipment_type=2) - LambdaQueryWrapper eqWrapper = new LambdaQueryWrapper<>(); - eqWrapper.eq(DryEquipmentInfo::getEquipmentType, '2'); - List equipmentList = infoMapper.selectList(eqWrapper); - if (equipmentList.isEmpty()) { - return Collections.emptyList(); - } - - // 按设备号字符串自然排序(GZ-001 < GZ-002 < ... < GZ-026) - equipmentList.sort(Comparator.comparing(DryEquipmentInfo::getEquipmentCode)); - - // 2. 提取所有除湿干燥机的设备号 - List equipmentCodes = equipmentList.stream() - .map(DryEquipmentInfo::getEquipmentCode) - .collect(Collectors.toList()); - - // 3. 调用优化后的方法,获取“设备号→{参数编码→最新值}”的映射 - Map> latestParamDataMap = getLatestDataMap(equipmentCodes); - - // 4. 查询每个设备的阈值规则列表(复用原有方法,无需修改) - Map> paramMap = getParamMap(equipmentCodes); - - // 5. 组装DrynessDTO:分别提取TEMP和DEW的最新值 - return equipmentList.stream() - .map(equipment -> { - String setTem = (equipment != null && StringUtils.isNotBlank(equipment.getRuleCode())) - ? Optional.ofNullable(paramMapper.selectOne( - new LambdaQueryWrapper().eq(DryParamThreshold::getRuleCode, equipment.getRuleCode()) - .eq(DryParamThreshold::getIsEnabled,"1") - )).map(DryParamThreshold::getMaxValue).orElse(null) - : null; - EquipmentScreenDTO.DrynessDTO dto = new EquipmentScreenDTO.DrynessDTO(); - String equipmentCode = equipment.getEquipmentCode(); - // 基础信息赋值 - dto.setEquipmentCode(equipmentCode); - dto.setEquipmentName(equipment.getEquipmentName()); - dto.setStatus(equipment.getStatus()); - dto.setSetTem(setTem); - dto.setEquipmentType("2"); - // 阈值规则列表 - dto.setParamList(paramMap.getOrDefault(equipmentCode, Collections.emptyList())); - - // 关键:提取当前设备的TEMP和DEW最新值(无数据时返回空字符串) - Map paramData = latestParamDataMap.getOrDefault(equipmentCode, new HashMap<>()); - String tempValue = paramData.getOrDefault("TEMP", ""); // 温度值 - String dewValue = paramData.getOrDefault("DEW", ""); // 露点值 - - dto.setTempValue(tempValue); - dto.setDewValue(dewValue); - - return dto; - }) - .collect(Collectors.toList()); - } - - /** - * 一键关闭报警 - * @return + * 一键关闭报警(无需修改) */ @Override @Transactional @@ -226,4 +343,4 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService int result = infoMapper.update(null, updateWrapper); return result; } -} +} \ No newline at end of file