MQTT采集功能实现

This commit is contained in:
杨晓东 2025-11-25 16:23:46 +08:00
parent 2c76b1c751
commit 84f3bd6807
12 changed files with 819 additions and 670 deletions

View File

@ -4,7 +4,6 @@ import com.shgx.common.core.controller.BaseController;
import com.shgx.common.core.domain.AjaxResult;
import com.shgx.dryingroom.web.domain.dto.EquipmentScreenDTO;
import com.shgx.dryingroom.web.service.IDryEquipmentScreenService;
import com.shgx.dryingroom.mqtt.MqttConsole;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;

View File

@ -51,6 +51,21 @@
<scope>compile</scope>
</dependency>
<!-- MapStruct 核心依赖 -->
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.5.5.Final</version>
</dependency>
<!-- 编译时生成映射代码 -->
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>1.5.5.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -7,14 +7,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 报警灯TCP消息处理器多设备版本携带deviceId
* 报警灯TCP消息处理器
*/
public class AlarmLightHandler extends SimpleChannelInboundHandler<byte[]> {
private static final Logger log = LoggerFactory.getLogger(AlarmLightHandler.class);
private final MqttService mqttService;
private final String deviceId; // 新增当前处理器对应的设备ID
// 当前处理器对应的设备ID
private final String deviceId;
/**
* 构造方法传入MQTT服务和设备ID
@ -25,7 +27,7 @@ public class AlarmLightHandler extends SimpleChannelInboundHandler<byte[]> {
}
/**
* 读取设备响应核心修改响应中携带deviceId
* 读取设备响应
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) {
@ -41,7 +43,7 @@ public class AlarmLightHandler extends SimpleChannelInboundHandler<byte[]> {
}
}
// ---------------------- 以下方法增加deviceId日志和主题 ----------------------
// ---------------------- deviceId日志和主题 ----------------------
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("🔗 设备[{}]报警灯TCP连接已建立", deviceId);

View File

@ -1,26 +1,31 @@
package com.shgx.dryingroom.collect;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.shgx.common.utils.SecurityUtils;
import com.shgx.common.utils.DateUtils;
import com.shgx.dryingroom.alarm.AlarmLightService;
import com.shgx.dryingroom.mqtt.MqttDataHandler;
import com.shgx.dryingroom.mqtt.MqttService;
import com.shgx.dryingroom.web.domain.DryAlarmLog;
import com.shgx.dryingroom.web.mapper.DryAlarmLogMapper;
import lombok.extern.slf4j.Slf4j;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.shgx.common.utils.DateUtils;
import com.shgx.dryingroom.web.domain.DryEquipmentInfo;
import com.shgx.dryingroom.web.domain.DryMonitoringData;
import com.shgx.dryingroom.web.domain.DryParamThreshold;
import com.shgx.dryingroom.web.mapper.DryAlarmLogMapper;
import com.shgx.dryingroom.web.mapper.DryEquipmentInfoMapper;
import com.shgx.dryingroom.web.mapper.DryMonitoringDataMapper;
import com.shgx.dryingroom.web.mapper.DryParamThresholdMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@ -50,310 +55,345 @@ public class AutoCollectService {
@Resource
private AlarmLightService alarmLightService;
// 设备编码列表延迟加载首次采集时初始化
private List<String> allEquipmentCodes;
// 设备缓存设备ID设备详情仅加载一次
private Map<String, DryEquipmentInfo> equipmentInfoCache;
// 采集次数计数器用于判断报警轮次
private int collectCount = 0;
// Redis Key 前缀
private static final String REDIS_EQUIP_PREFIX = "dry_equipment:";
// 前端 MQTT 推送主题
private static final String FRONTEND_TOPIC = "equipment/data_update";
// 设备总数30台
private static final int TOTAL_EQUIPMENT = 30;
// 时间格式化器
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 随机数生成器
private final Random random = new Random();
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 定时任务每10秒执行一次数据采集
*/
@Scheduled(cron = "0/59 * * * * ?")
// ==================== 独立任务1每5分钟持久化Redis数据到数据库 ====================
@Scheduled(cron = "0 0/5 * * * ?")
@Transactional(rollbackFor = Exception.class)
public void autoCollectData() {
log.info("===== 开始执行自动采集任务 =====");
public void persistRedisDataToDb() {
log.info("===== 【持久化任务】开始执行Redis数据→数据库 =====");
// 1. 加载设备列表首次执行时
if (allEquipmentCodes == null || allEquipmentCodes.isEmpty()) {
loadEquipmentCodes();
if (allEquipmentCodes.isEmpty()) {
log.warn("未加载到任何设备,本次采集任务终止");
// 1. 加载设备缓存首次执行时
if (equipmentInfoCache == null || equipmentInfoCache.isEmpty()) {
loadEquipmentCache();
if (equipmentInfoCache.isEmpty()) {
log.warn("【持久化任务】未加载到设备信息,任务终止");
return;
}
log.info("成功加载{}台设备,开始生成监测数据", allEquipmentCodes.size());
}
// 2. 判断报警轮次
collectCount++;
boolean isAlarmRound = collectCount % 2 == 0;
List<String> alarmEquipCodes = new ArrayList<>();
if (isAlarmRound) {
alarmEquipCodes = selectRandomEquipments(1);
log.info("本次为报警轮次随机选中1台报警设备{}", alarmEquipCodes);
} else {
log.info("本次为正常轮次(第{}次采集),无报警设备", collectCount);
}
// 3. 生成并循环插入监测数据
LocalDateTime collectTime = LocalDateTime.now();
int insertSuccess = 0;
int total = allEquipmentCodes.size();
for (String equipCode : allEquipmentCodes) {
try {
if (equipmentInfoMapper.selectOne(new LambdaQueryWrapper<DryEquipmentInfo>()
.eq(DryEquipmentInfo::getEquipmentCode, equipCode))
.getEquipmentType()
.equals("2")) {
// 生成露点数据
DryMonitoringData data1 = generateMonitoringData(equipCode, alarmEquipCodes.contains(equipCode), collectTime, "DEW");
monitoringDataMapper.insert(data1);
judgeAlarm(data1, "DEW");
log.debug("设备[{}]数据插入成功:{}(状态:{}",
equipCode, data1.getDataValue(),
data1.getDataStatus().equals("1") ? "报警" : "正常");
}
// 生成温度数据
DryMonitoringData data2 = generateMonitoringData(equipCode, alarmEquipCodes.contains(equipCode), collectTime, "TEMP");
monitoringDataMapper.insert(data2);
// 判断是否需要报警
judgeAlarm(data2, "TEMP");
insertSuccess++;
log.debug("设备[{}]数据插入成功:{}(状态:{}",
equipCode, data2.getDataValue(),
data2.getDataStatus().equals("1") ? "报警" : "正常");
} catch (Exception e) {
log.error("设备[{}]数据插入失败", equipCode, e);
// 若需保证所有数据要么全成功要么全失败可抛出异常触发事务回滚
// throw new RuntimeException("设备[" + equipCode + "]数据插入失败", e);
}
}
// 4. 每30秒推送消息 "1" 给前端
sendDataUpdateNotification();
// 4. 打印插入结果汇总
log.info("===== 本次采集任务结束 =====");
log.info("数据插入汇总:共需插入{}条,成功{}条,失败{}条",
total, insertSuccess, total - insertSuccess);
}
/**
* 加载所有设备首次采集时执行
*/
private void loadEquipmentCodes() {
LambdaQueryWrapper<DryEquipmentInfo> queryWrapper = new LambdaQueryWrapper<>();
// List<DryEquipmentInfo> equipmentList = equipmentInfoMapper.selectList(queryWrapper);
// System.out.println("从数据库查询到的设备数量:" + equipmentList.size() ); // 打印设备数量
allEquipmentCodes = equipmentInfoMapper.selectList(queryWrapper)
.stream()
.map(DryEquipmentInfo::getEquipmentCode)
.collect(Collectors.toList());
}
/**
* 随机选择N台设备作为报警设备
*/
private List<String> selectRandomEquipments(int n) {
// 复制设备列表并打乱顺序
List<String> shuffled = new ArrayList<>(allEquipmentCodes);
Collections.shuffle(shuffled);
// 取前N台设备若设备总数不足N返回全部
return shuffled.subList(0, Math.min(n, shuffled.size()));
}
/**
* 生成单台设备的监测数据
* @param equipCode 设备编码
* @param isAlarm 是否强制报警超阈值
* @param collectTime 采集时间
*/
private DryMonitoringData generateMonitoringData(String equipCode, boolean isAlarm, LocalDateTime collectTime, String paramCode) {
// 查询该设备的阈值规则
LambdaQueryWrapper<DryParamThreshold> thresholdWrapper = new LambdaQueryWrapper<>();
thresholdWrapper.eq(DryParamThreshold::getEquipmentCode, equipCode).eq(DryParamThreshold::getParamCode, paramCode);
DryParamThreshold threshold = thresholdMapper.selectOne(thresholdWrapper);
// 新建监测数据
DryMonitoringData data = new DryMonitoringData();
data.setEquipmentCode(equipCode);
// 判断设备类型
if ("TEMP".equals(paramCode)) {
data.setParamCode("TEMP");
} else {
data.setParamCode("DEW");
}
data.setCollectorCode("AUTO-COLLECTOR-001");
data.setCreateBy("shgx");
data.setCreateTime(DateUtils.getNowDate());
data.setCollectTime(collectTime);
data.setRemark("自动采集数据");
// 如果是没有采集规则
if (threshold == null) {
double value;
if (isAlarm) {
// 报警生成超阈值数据
if (paramCode == "TEMP") {
double min = 50;
double max = 60;
value = random.nextBoolean()
? min - (1 + random.nextDouble() * 4) // 低于最小值
: max + (1 + random.nextDouble() * 4); // 高于最大值
data.setDataStatus("1"); // 异常状态
} else {
double min = 5;
double max = 6;
value = random.nextBoolean()
? min - (1 + random.nextDouble() * 4) // 低于最小值
: max + (1 + random.nextDouble() * 4); // 高于最大值
data.setDataStatus("1"); // 异常状态
}
} else {
// 正常生成阈值范围内的随机值
if (paramCode == "TEMP") {
double min = 50;
double max = 60;
value = min + random.nextDouble() * (max - min);
data.setDataStatus("0"); // 正常状态
} else {
double min = 5;
double max = 6;
value = min + random.nextDouble() * (max - min);
data.setDataStatus("0"); // 正常状态
}
}
// 格式化数据保留1位小数拼接单位
String unit = "°C";
data.setDataValue(String.format("%.1f%s", value, unit));
log.info("生成设备[{}]的监测数据:{},状态:{}",
equipCode,
data.getDataValue(),
isAlarm ? "报警" : "正常");
return data;
}
// 有采集规则
else {
// 安全解析阈值
double min = parseThresholdValue(threshold.getMinValue());
double max = parseThresholdValue(threshold.getMaxValue());
double value;
if (isAlarm) {
// 报警生成超阈值数据
value = random.nextBoolean()
? min - (1 + random.nextDouble() * 4) // 低于最小值
: max + (1 + random.nextDouble() * 4); // 高于最大值
data.setDataStatus("1"); // 异常状态
} else {
// 正常生成阈值范围内的随机值
value = min + random.nextDouble() * (max - min);
data.setDataStatus("0"); // 正常状态
}
// 格式化数据保留1位小数拼接单位
String unit = threshold.getUnit();
data.setDataValue(String.format("%.1f%s", value, unit));
log.info("生成设备[{}]的监测数据:{},状态:{}",
equipCode,
data.getDataValue(),
isAlarm ? "报警" : "正常");
return data;
}
}
/**
* 判断是否报警
*/
private void judgeAlarm(DryMonitoringData data, String paramCode) {
LambdaUpdateWrapper<DryEquipmentInfo> updateWrapper = new LambdaUpdateWrapper<>();
// 查询该设备规则列表
List<DryParamThreshold> list = thresholdMapper.selectList(new LambdaQueryWrapper<DryParamThreshold>().eq(DryParamThreshold::getEquipmentCode, data.getEquipmentCode()));
if (list == null || list.size() == 0) {
// 2. 扫描Redis中所有设备数据Key
Set<String> redisKeys = redisTemplate.keys(REDIS_EQUIP_PREFIX + "*:*");
if (redisKeys == null || redisKeys.isEmpty()) {
log.warn("【持久化任务】Redis中无待持久化数据任务终止");
return;
}
// 新建报警日志对象
DryAlarmLog log = new DryAlarmLog();
// 循环规则列表
for (DryParamThreshold threshold : list) {
if (threshold.getParamCode().equals(paramCode)) {
if (parseThresholdValue(threshold.getMaxValue()) < parseThresholdValue(data.getDataValue())) {
// 插入报警日志
log.setAlarmType("1");
log.setEquipmentCode(data.getEquipmentCode());
log.setParamCode(threshold.getParamCode());
log.setAlarmValue(data.getDataValue());
log.setAlarmTime(LocalDateTime.now());
log.setAlarmStatus("0");
log.setCreateBy("shgx");
log.setCreateTime(DateUtils.getNowDate());
logMapper.insert(log);
// 修改设备状态
updateWrapper.eq(DryEquipmentInfo::getEquipmentCode, data.getEquipmentCode())
.set(DryEquipmentInfo::getStatus, '2');
equipmentInfoMapper.update(updateWrapper);
/** MQTT向报警灯发送报警 */
alarmLightService.triggerAllAlarmsByEquipment(data.getEquipmentCode());
// 3. 解析Redis数据转换为DryMonitoringData
List<DryMonitoringData> dbDataList = new ArrayList<>();
for (String redisKey : redisKeys) {
try {
MqttDataHandler.EquipmentData redisData = (MqttDataHandler.EquipmentData) redisTemplate.opsForValue().get(redisKey);
if (redisData == null) continue;
// 转换为数据库实体
DryMonitoringData dbData = convertToDryMonitoringData(redisData);
if (dbData == null) continue;
// 报警判断持久化时顺便校验不影响前端推送
judgeAlarm(dbData);
dbDataList.add(dbData);
} catch (Exception e) {
log.error("【持久化任务】解析Redis数据失败 → Key{}", redisKey, e);
}
}
// 4. 批量写入数据库
if (!dbDataList.isEmpty()) {
monitoringDataMapper.batchInsert(dbDataList);
log.info("【持久化任务】✅ 成功 - 共{}条数据写入dry_monitoring_data表", dbDataList.size());
} else {
log.warn("【持久化任务】无有效数据可持久化");
}
log.info("===== 【持久化任务】执行结束 =====");
}
// ==================== 独立任务2每10秒从Redis推数据给前端 ====================
@Scheduled(cron = "0/10 * * * * ?")
public void pushRedisDataToFrontend() {
log.info("===== 【前端推送任务】开始执行Redis数据→前端 =====");
// 1. 加载设备缓存首次执行时
if (equipmentInfoCache == null || equipmentInfoCache.isEmpty()) {
loadEquipmentCache();
if (equipmentInfoCache.isEmpty()) {
log.warn("【前端推送任务】未加载到设备信息,任务终止");
return;
}
}
// 2. 从Redis读取30台设备的最新数据仅读不修改Redis
List<DryMonitoringData> result = buildFrontendDataList();
// 3. 过滤掉空数据只保留有效数据
List<DryMonitoringData> validResult = result.stream()
.filter(Objects::nonNull) // 过滤null对象
.filter(data -> data.getEquipmentCode() != null && !data.getEquipmentCode().isEmpty()) // 设备编码非空
.filter(data -> data.getDataValue() != null && !data.getDataValue().isEmpty()) // 数据值非空
.collect(Collectors.toList());
// 4. 组装JSON并推送只推送有效数据
try {
JSONObject pushData = new JSONObject();
pushData.put("data", validResult); // 推送过滤后的有效数据
pushData.put("pushTime", LocalDateTime.now().format(DATETIME_FORMATTER));
pushData.put("total", validResult.size()); // 有效数据条数
String jsonStr = pushData.toJSONString();
// 发送到前端MQTT主题仅推送不涉及数据库
boolean success = mqttService.sendMessage(FRONTEND_TOPIC, jsonStr);
if (success) {
log.info("【前端推送任务】✅ 成功 → 主题:{},有效数据条数:{}", FRONTEND_TOPIC, validResult.size());
} else {
log.warn("【前端推送任务】❌ 失败 → 主题:{}", FRONTEND_TOPIC);
}
} catch (Exception e) {
log.error("【前端推送任务】推送异常", e);
}
log.info("===== 【前端推送任务】执行结束 =====");
}
// ==================== 工具方法仅内部调用分离职责 ====================
/**
* 加载设备缓存
*/
private void loadEquipmentCache() {
log.info("加载设备信息缓存...");
LambdaQueryWrapper<DryEquipmentInfo> queryWrapper = new LambdaQueryWrapper<>();
List<DryEquipmentInfo> equipmentList = equipmentInfoMapper.selectList(queryWrapper);
// 构建缓存key=设备ID格式化后001-030value=设备详情
equipmentInfoCache = equipmentList.stream()
.collect(Collectors.toMap(
eq -> {
// 增加空判断避免格式转换异常
if (eq.getLocation() == null || eq.getLocation().isEmpty()) {
log.warn("设备location为空跳过该设备 → 设备编码:{}", eq.getEquipmentCode());
return "";
}
try {
return String.format("%03d", Integer.parseInt(eq.getLocation()));
} catch (NumberFormatException e) {
log.warn("设备location格式异常 → location{},设备编码:{}", eq.getLocation(), eq.getEquipmentCode());
return eq.getLocation();
}
},
eq -> eq,
(k1, k2) -> k1 // 避免重复ID冲突
))
// 过滤掉key为空的缓存项
.entrySet().stream()
.filter(entry -> !entry.getKey().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
log.info("设备缓存加载完成,共{}台有效设备", equipmentInfoCache.size());
}
/**
* 从Redis读取数据
*/
private List<DryMonitoringData> buildFrontendDataList() {
List<DryMonitoringData> result = new ArrayList<>();
// 遍历30台设备001-030
for (int i = 1; i <= TOTAL_EQUIPMENT; i++) {
String equipmentId = String.format("%03d", i); // 设备ID001-030
DryEquipmentInfo equipmentInfo = equipmentInfoCache.get(equipmentId);
if (equipmentInfo == null) {
log.warn("【前端推送任务】设备ID{}无对应信息,跳过", equipmentId);
continue;
}
// 从Redis读取该设备的最新数据Key格式dry_equipment:001:TEMP
String redisKey = REDIS_EQUIP_PREFIX + equipmentId + ":TEMP"; // 假设参数编码是TEMP
MqttDataHandler.EquipmentData redisData = (MqttDataHandler.EquipmentData) redisTemplate.opsForValue().get(redisKey);
// 增加redisData空判断避免传递null
if (redisData == null) {
log.debug("【前端推送任务】设备ID{}无Redis数据跳过", equipmentId);
continue;
}
// 转换为数据库实体此时redisData非null
DryMonitoringData dbData = convertToDryMonitoringData(redisData);
if (dbData == null) {
log.debug("【前端推送任务】设备ID{}数据转换失败,跳过", equipmentId);
continue;
}
// 报警判断持久化时顺便校验不影响前端推送
judgeAlarm(dbData);
result.add(dbData);
}
return result;
}
/**
* Redis数据转换为数据库实体增加空判断
*/
private DryMonitoringData convertToDryMonitoringData(MqttDataHandler.EquipmentData redisData) {
// 入参空判断避免null指针
if (redisData == null) {
log.error("【转换失败】redisData为null无法转换");
return null;
}
// equipmentId空判断
String equipmentId = redisData.getEquipmentId();
if (equipmentId == null || equipmentId.isEmpty()) {
log.error("【转换失败】redisData的equipmentId为空");
return null;
}
DryEquipmentInfo equipmentInfo = equipmentInfoCache.get(equipmentId);
if (equipmentInfo == null) {
log.warn("【持久化任务】设备ID{}无对应信息,跳过", equipmentId);
return null;
}
DryMonitoringData dbData = new DryMonitoringData();
dbData.setEquipmentCode(equipmentInfo.getEquipmentCode());
dbData.setParamCode(redisData.getParamCode());
// 数据值空判断避免存储空值
if (redisData.getValue() != null) {
dbData.setDataValue(String.valueOf(redisData.getValue()));
} else {
log.warn("【转换失败】设备ID{}的redis数据值为空", equipmentId);
return null;
}
dbData.setCollectTime(LocalDateTime.ofInstant(
java.time.Instant.ofEpochMilli(redisData.getTime()),
java.time.ZoneId.systemDefault()
));
dbData.setDataStatus("1"); // 默认为正常
dbData.setCollectorCode("MQTT-REDIS");
dbData.setCreateBy("shgx");
dbData.setCreateTime(DateUtils.getNowDate());
dbData.setRemark("Redis定时持久化数据");
return dbData;
}
/**
* 温度报警判断增加空判断
*/
private void judgeAlarm(DryMonitoringData data) {
// 入参空判断
if (data == null) {
log.warn("【报警判断】数据为空,跳过");
return;
}
// 设备编码空判断
String equipmentCode = data.getEquipmentCode();
if (equipmentCode == null || equipmentCode.isEmpty()) {
log.warn("【报警判断】设备编码为空,跳过");
return;
}
DryEquipmentInfo info = equipmentInfoMapper.selectOne(
new LambdaQueryWrapper<DryEquipmentInfo>().eq(DryEquipmentInfo::getEquipmentCode, equipmentCode)
);
if (info == null) {
log.warn("【报警判断】设备编码{}无对应设备信息,跳过", equipmentCode);
return;
}
List<DryParamThreshold> thresholds = thresholdMapper.selectList(
new LambdaQueryWrapper<DryParamThreshold>().eq(DryParamThreshold::getEquipmentCode, equipmentCode)
);
if (thresholds.isEmpty()) return;
DryAlarmLog alarmLog = new DryAlarmLog();
LambdaUpdateWrapper<DryEquipmentInfo> updateWrapper = new LambdaUpdateWrapper<>();
boolean isAlarm = false;
for (DryParamThreshold threshold : thresholds) {
// 增加阈值对象空判断
if (threshold == null) {
log.warn("【报警判断】设备编码{}的阈值规则为空,跳过", equipmentCode);
continue;
}
// 规则过滤修复逻辑||改为&&满足一个条件就跳过原逻辑可能有误
if (!threshold.getRuleCode().equals(info.getRuleCode())
|| !threshold.getIsEnabled().equals("1")
|| !threshold.getParamCode().equals("TEMP")) {
continue;
}
try {
// 阈值空判断
if (threshold.getMaxValue() == null || threshold.getMinValue() == null) {
log.warn("【报警判断】设备编码{}的阈值为空,跳过", equipmentCode);
continue;
}
if (parseThresholdValue(threshold.getMinValue()) > parseThresholdValue(data.getDataValue())) {
// 插入报警日志
log.setAlarmType("2");
log.setEquipmentCode(data.getEquipmentCode());
log.setParamCode(threshold.getParamCode());
log.setAlarmValue(data.getDataValue());
log.setAlarmTime(LocalDateTime.now());
log.setAlarmStatus("0");
log.setCreateBy("shgx");
log.setCreateTime(DateUtils.getNowDate());
updateWrapper.eq(DryEquipmentInfo::getEquipmentCode, data.getEquipmentCode())
.set(DryEquipmentInfo::getStatus, '2');
// 修改设备状态
equipmentInfoMapper.update(updateWrapper);
logMapper.insert(log);
/** MQTT向报警灯发送报警 */
alarmLightService.triggerAllAlarmsByEquipment(data.getEquipmentCode());
} else {
return;
double max = parseThresholdValue(threshold.getMaxValue());
double min = parseThresholdValue(threshold.getMinValue());
double value = parseThresholdValue(data.getDataValue());
if (value > max) {
alarmLog.setAlarmType("1"); // 超上限
isAlarm = true;
} else if (value < min) {
alarmLog.setAlarmType("2"); // 超下限
isAlarm = true;
}
if (isAlarm) {
alarmLog.setEquipmentCode(equipmentCode);
alarmLog.setParamCode(threshold.getParamCode());
alarmLog.setAlarmValue(data.getDataValue());
alarmLog.setAlarmTime(LocalDateTime.now());
alarmLog.setCreateBy("system");
alarmLog.setCreateTime(DateUtils.getNowDate());
logMapper.insert(alarmLog);
// 更新设备状态为报警
updateWrapper.eq(DryEquipmentInfo::getEquipmentCode, equipmentCode)
.set(DryEquipmentInfo::getStatus, '2');
equipmentInfoMapper.update(null, updateWrapper);
// 触发报警灯
alarmLightService.triggerAllAlarmsByEquipment(equipmentCode);
break;
}
} catch (Exception e) {
log.error("【报警判断】设备编码{}校验异常", equipmentCode, e);
}
}
}
/**
* 安全解析阈值数值处理包含百分号单位等特殊情况
* 阈值解析工具
*/
private double parseThresholdValue(String valueStr) {
if (valueStr == null || valueStr.trim().isEmpty()) {
throw new IllegalArgumentException("阈值数值不能为空");
throw new IllegalArgumentException("阈值不能为空");
}
try {
// 移除所有非数字字符除了小数点负号
String cleanValue = valueStr.replaceAll("[^\\d.-]", "").trim();
if (cleanValue.isEmpty()) {
throw new IllegalArgumentException("清理后的数值为空,原始值: " + valueStr);
}
return Double.parseDouble(cleanValue);
} catch (NumberFormatException e) {
log.error("解析阈值数值失败: {}, 错误: {}", valueStr, e.getMessage());
throw new RuntimeException("阈值数值格式错误: " + valueStr, e);
}
}
/**
* 每30s更新数据时推送"1"给前端
*/
private void sendDataUpdateNotification() {
try {
// 发送数字 "1" 到前端
boolean success = mqttService.sendMessage("equipment/data_update", "1");
if (success) {
log.info("✅ 数据更新通知已发送到前端: 1");
} else {
log.warn("❌ 数据更新通知发送失败");
}
} catch (Exception e) {
log.error("发送数据更新通知异常: {}", e.getMessage());
log.error("解析阈值失败:{}", valueStr, e);
throw new RuntimeException("阈值格式错误:" + valueStr);
}
}
}

View File

@ -1,149 +0,0 @@
package com.shgx.dryingroom.mqtt;
import com.shgx.dryingroom.alarm.AlarmLightService;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.Scanner;
import java.util.function.Consumer;
/**
* MQTT调试控制台适配多报警灯设备
*/
@Component
public class MqttConsole implements CommandLineRunner {
@Autowired
private MqttService mqttService;
@Autowired
private AlarmLightService alarmLightService;
@Override
public void run(String... args) {
new Thread(() -> {
try {
Thread.sleep(3000); // 等待服务初始化
// 关键修正添加监听器参数类型为 Consumer<MqttMessage>仅接收消息体
// 若需要获取消息主题需在 MqttService 中修改监听器定义见下文补充
mqttService.addMessageListener(new Consumer<MqttMessage>() {
@Override
public void accept(MqttMessage message) {
// 从消息体获取内容若需主题需修改 MqttService 监听器为接收 topic+message
String payload = new String(message.getPayload());
// 这里无法直接获取topic若需topic需按补充方案修改MqttService
System.out.println("\n🎯 收到MQTT消息: " + payload);
}
});
startConsole(); // 启动控制台交互
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("控制台线程中断: " + e.getMessage());
}
}).start();
}
// 以下方法保持不变仅修正监听器部分其他逻辑无问题
private void startConsole() {
Scanner scanner = new Scanner(System.in);
System.out.println("\n=== MQTT调试控制台 ===");
System.out.println("命令列表:");
System.out.println(" 打印指令:");
System.out.println(" print [设备ID] [消息] - 发送打印指令示例print PRINTER_01 测试打印)");
System.out.println(" 通用设备指令:");
System.out.println(" device [系统] [类型] [设备ID] [消息] [QoS] - 发送设备指令示例device temp sensor TEMP_01 25 1");
System.out.println(" 报警灯指令(多设备版):");
System.out.println(" alarm [设备ID] ON - 打开指定报警灯示例alarm ALARM_01 ON");
System.out.println(" alarm [设备ID] OFF - 关闭指定报警灯示例alarm ALARM_02 OFF");
System.out.println(" alarm [设备ID] SLOW - 慢闪示例alarm ALARM_03 SLOW");
System.out.println(" alarm [设备ID] FAST - 快闪示例alarm ALARM_01 FAST");
System.out.println(" alarm [设备ID] STATUS - 查询状态示例alarm ALARM_02 STATUS");
System.out.println(" alarm [设备ID] CONNECTION - 检查连接示例alarm ALARM_03 CONNECTION");
System.out.println(" alarm [设备ID] [十六进制指令] - 发送自定义指令示例alarm ALARM_01 01 06 04 0E 00 03 A9 38");
System.out.println(" 状态检查:");
System.out.println(" status - 查看MQTT连接状态");
System.out.println(" exit - 退出控制台");
System.out.println("=============================\n");
while (true) {
System.out.print("mqtt> ");
String input = scanner.nextLine().trim();
if (input.equalsIgnoreCase("exit")) {
break;
}
processCommand(input);
}
scanner.close();
System.out.println("控制台已退出");
}
private void processCommand(String input) {
try {
if (input.startsWith("print ")) {
handlePrint(input);
} else if (input.startsWith("device ")) {
handleDevice(input);
} else if (input.equalsIgnoreCase("status")) {
handleStatus();
} else if (input.startsWith("alarm ")) {
handleAlarmLight(input);
} else if (!input.isEmpty()) {
System.out.println("❌ 未知命令,请输入 'exit' 退出或查看命令列表");
}
} catch (Exception e) {
System.out.println("❌ 命令执行失败: " + e.getMessage());
}
}
private void handlePrint(String input) {
String[] parts = input.substring(6).split(" ", 2);
if (parts.length == 2) {
boolean success = mqttService.sendPrintCommand(parts[0], parts[1]);
System.out.println(success ? "✅ 打印指令发送成功" : "❌ 打印指令发送失败");
} else {
System.out.println("❌ 格式错误,正确格式: print [设备ID] [消息]");
}
}
private void handleDevice(String input) {
String[] parts = input.substring(7).split(" ", 5);
if (parts.length >= 4) {
String system = parts[0];
String commandType = parts[1];
String deviceId = parts[2];
String message = parts[3];
int qos = parts.length >= 5 ? Integer.parseInt(parts[4]) : 0;
boolean success = mqttService.sendDeviceCommand(system, commandType, deviceId, message, qos);
System.out.println(success ? "✅ 设备指令发送成功" : "❌ 设备指令发送失败");
} else {
System.out.println("❌ 格式错误,正确格式: device [系统] [类型] [设备ID] [消息] [QoS]");
}
}
private void handleStatus() {
boolean connected = mqttService.isConnected();
System.out.println(connected ? "✅ MQTT已连接到 " + mqttService.getBrokerUrl() : "❌ MQTT未连接");
}
private void handleAlarmLight(String input) {
String[] parts = input.substring(6).trim().split(" ", 2);
if (parts.length < 2) {
System.out.println("❌ 格式错误,正确格式: alarm [设备ID] [指令]");
System.out.println(" 示例: alarm ALARM_01 ON 打开ID为ALARM_01的报警灯");
return;
}
String deviceId = parts[0];
String command = parts[1].toUpperCase();
boolean success = false;
System.out.println("设备[" + deviceId + "] " + (success ? "✅ 指令发送成功" : "❌ 指令发送失败"));
}
}

View File

@ -0,0 +1,121 @@
package com.shgx.dryingroom.mqtt;
import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Resource;
import static com.shgx.dryingroom.utils.AdcToTemperatureConverter.convert;
@Component
public class MqttDataHandler implements Consumer<MqttMessage> {
private static final Logger log = LoggerFactory.getLogger(MqttDataHandler.class);
// Redis 存储 Key 前缀统一覆盖30台设备
private static final String REDIS_EQUIP_PREFIX = "dry_equipment:";
// 参数编码固定业务约定
private static final String PARAM_TEMP = "TEMP"; // 温度参数默认
private static final String PARAM_DEW = "DEW";// 自定义参数对应消息中字段"1"
// 设备ID范围校验30台设备001-030对应干燥机001-026露点干燥机001-004
private static final int MIN_EQUIP_ID = 1;
private static final int MAX_EQUIP_ID = 30;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void accept(MqttMessage message) {
try {
String payload = new String(message.getPayload());
JSONObject data = JSONObject.parseObject(payload);
// 1. 解析基础字段
Long time = data.getLong("time");
JSONObject params = data.getJSONObject("params");
if (time == null || params == null) {
log.warn("MQTT消息缺少time或params字段 → 消息:{}", payload);
return;
}
// 2. 遍历params中所有设备ID存储对应参数默认温度
Set<String> equipIdSet = params.keySet();
for (String equipIdStr : equipIdSet) {
// 校验设备ID合法性必须是1-30的数字
Integer equipId = validateEquipId(equipIdStr);
if (equipId == null) continue;
// 格式化设备ID补0为3位如100121021
String formatEquipId = String.format("%03d", equipId);
// 解析参数值
Double paramValue = convert(params.getDouble(equipIdStr));
if (paramValue == null) {
log.warn("设备{}参数值为空 → 消息:{}", formatEquipId, payload);
continue;
}
// 3. 存储到Redis设备ID-温度参数
// String redisKey = REDIS_EQUIP_PREFIX + formatEquipId + ":" + PARAM_TEMP;
String redisKey = REDIS_EQUIP_PREFIX + formatEquipId + ":" + PARAM_TEMP;
redisTemplate.opsForValue().set(redisKey, new EquipmentData(time, paramValue, PARAM_TEMP, formatEquipId));
log.info("✅ 存储设备数据 - 设备:{},参数:{},值:{}Key{}",
formatEquipId, PARAM_TEMP, paramValue, redisKey);
}
} catch (Exception e) {
log.error("❌ 处理MQTT设备数据失败{}", e.getMessage(), e);
}
}
/**
* 校验设备ID合法性必须是1-30的整数
*/
private Integer validateEquipId(String equipIdStr) {
try {
int equipId = Integer.parseInt(equipIdStr);
if (equipId >= MIN_EQUIP_ID && equipId <= MAX_EQUIP_ID) {
return equipId;
} else {
log.warn("设备ID超出范围1-30 → 非法ID{}", equipIdStr);
return null;
}
} catch (NumberFormatException e) {
log.warn("设备ID格式非法必须是数字 → 非法ID{}", equipIdStr);
return null;
}
}
/**
* 设备数据实体类包含设备ID便于后续持久化
*/
public static class EquipmentData {
private Long time; // 采集时间戳
private Double value; // 参数值
private String paramCode; // 参数编码TEMP/PARAM1
private String equipmentId;// 设备ID格式化后如021
public EquipmentData(Long time, Double value, String paramCode, String equipmentId) {
this.time = time;
this.value = value;
this.paramCode = paramCode;
this.equipmentId = equipmentId;
}
// Getter/SetterJSON序列化必需
public Long getTime() { return time; }
public void setTime(Long time) { this.time = time; }
public Double getValue() { return value; }
public void setValue(Double value) { this.value = value; }
public String getParamCode() { return paramCode; }
public void setParamCode(String paramCode) { this.paramCode = paramCode; }
public String getEquipmentId() { return equipmentId; }
public void setEquipmentId(String equipmentId) { this.equipmentId = equipmentId; }
}
}

View File

@ -1,7 +1,6 @@
package com.shgx.dryingroom.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.shgx.common.utils.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -10,12 +9,14 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* 统一的MQTT消息服务适配多报警灯设备
* MQTT消息服务
*/
@Service
public class MqttService implements MqttCallback {
@ -32,9 +33,13 @@ public class MqttService implements MqttCallback {
// 消息监听器列表支持多组件注册
private List<Consumer<MqttMessage>> messageListeners = new ArrayList<>();
@Resource
private MqttDataHandler mqttDataHandler;
@PostConstruct
public void init() {
connect();
addMessageListener(mqttDataHandler);
}
@PreDestroy
@ -59,8 +64,8 @@ public class MqttService implements MqttCallback {
// 订阅主题报警灯控制主题支持多设备通配符
String[] topics = {
"alarm/#", // 原有报警主题
"alarm_light/+/control" // 新增报警灯控制主题格式alarm_light/[设备ID]/control
"alarm_light/+/control",
"shgx/equipment/data/#"
};
int[] qos = {1, 1}; // 对应主题的QoS
mqttClient.subscribe(topics, qos);
@ -84,13 +89,13 @@ public class MqttService implements MqttCallback {
}
}
// ==================== 消息接收相关保持原有逻辑适配多设备 ====================
// ==================== 消息接收相关 ====================
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
log.info("📨 收到消息 - 主题: {}, QoS: {}, 内容: {}", topic, message.getQos(), payload);
// 控制台显示保持原有
// 控制台显示
System.out.println("\n🎯 收到客户端消息:");
System.out.println(" 主题: " + topic);
System.out.println(" QoS: " + message.getQos());
@ -131,7 +136,7 @@ public class MqttService implements MqttCallback {
log.info("✅ 已移除MQTT消息监听器: {}", listener.getClass().getSimpleName());
}
// ==================== 基础发送方法保持原有 ====================
// ==================== 基础发送方法 ====================
public boolean sendMessage(String topic, String message) {
return sendMessage(topic, message, 1);
}
@ -174,102 +179,6 @@ public class MqttService implements MqttCallback {
}
}
// ==================== 业务发送方法新增报警灯相关 ====================
// 1. 原有业务方法保持不变
public boolean sendToUser(Long userId, String message) {
String topic = TOPIC_PREFIX + "user/" + userId;
return sendMessage(topic, message);
}
public boolean sendSystemNotification(Long userId, String title, String content) {
String topic = TOPIC_PREFIX + "user/" + userId;
JSONObject notification = new JSONObject();
notification.put("type", "system_notification");
notification.put("title", title);
notification.put("content", content);
notification.put("timestamp", System.currentTimeMillis());
return sendMessage(topic, notification.toJSONString());
}
public boolean sendApprovalNotification(Long userId, String processName, String status, String remark) {
String topic = TOPIC_PREFIX + "user/" + userId;
JSONObject approval = new JSONObject();
approval.put("type", "approval_notification");
approval.put("processName", processName);
approval.put("status", status);
approval.put("remark", StringUtils.isNotEmpty(remark) ? remark : "");
approval.put("timestamp", System.currentTimeMillis());
approval.put("content", String.format("您的【%s】申请已%s", processName, status));
return sendMessage(topic, approval.toJSONString());
}
public boolean broadcast(String message) {
String topic = TOPIC_PREFIX + "broadcast";
return sendMessage(topic, message);
}
public boolean broadcastJson(String type, Object data) {
String topic = TOPIC_PREFIX + "broadcast";
return sendJsonMessage(topic, type, data);
}
public boolean sendOnlineCount(int count) {
JSONObject data = new JSONObject();
data.put("onlineCount", count);
data.put("timestamp", System.currentTimeMillis());
return broadcastJson("online_count", data);
}
public boolean sendPrintCommand(String deviceId, String message) {
return sendDeviceCommand("shgx_mes", "print", deviceId, message, 0);
}
public boolean sendDeviceCommand(String system, String commandType, String deviceId, String message, int qos) {
String topic = system + "/" + commandType + "/" + deviceId;
JSONObject msgBody = new JSONObject();
msgBody.put("msg", message);
return sendMessage(topic, msgBody.toJSONString(), qos);
}
public boolean sendRealtimeData(String dataType, Object data) {
String topic = TOPIC_PREFIX + "realtime/" + dataType;
return sendJsonMessage(topic, "data_update", data);
}
// 2. 新增报警灯专用发送方法适配多设备
/**
* 发送报警灯控制结果到MQTT供AlarmLightService调用
* @param deviceId 报警灯设备ID
* @param command 发送的指令如ONOFF
* @param success 是否成功
*/
public boolean sendAlarmLightControlResult(String deviceId, String command, boolean success) {
// 主题格式alarm_light/[设备ID]/control_result与MqttConsole和前端约定
String topic = "alarm_light/" + deviceId + "/control_result";
JSONObject result = new JSONObject();
result.put("deviceId", deviceId);
result.put("command", command);
result.put("success", success);
result.put("timestamp", System.currentTimeMillis());
return sendJsonMessage(topic, "alarm_light_control_result", result);
}
/**
* 发送报警灯状态响应到MQTT供AlarmLightHandler调用
* @param deviceId 报警灯设备ID
* @param hexData 设备返回的十六进制状态数据
*/
public boolean sendAlarmLightStatus(String deviceId, String hexData) {
// 主题格式alarm_light/[设备ID]/response
String topic = "alarm_light/" + deviceId + "/response";
JSONObject status = new JSONObject();
status.put("deviceId", deviceId);
status.put("data", hexData); // 原始十六进制数据
status.put("timestamp", System.currentTimeMillis());
return sendJsonMessage(topic, "alarm_light_status", status);
}
// ==================== 工具方法 ====================
public boolean isConnected() {
return mqttClient != null && mqttClient.isConnected();
@ -286,7 +195,7 @@ public class MqttService implements MqttCallback {
}
}
// 新增获取当前连接的Broker地址供控制台显示
// 获取当前连接的Broker地址供控制台显示
public String getBrokerUrl() {
return brokerUrl;
}

View File

@ -0,0 +1,33 @@
package com.shgx.dryingroom.utils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AdcToTemperatureConverter {
// 硬件参数从表格提取固定值
private static final int ADC_LOW = 4000; // ADC下限
private static final int ADC_HIGH = 20000; // ADC上限
private static final double TEMP_LOW = 0.0; // 温度下限°C
private static final double TEMP_HIGH = 200.0; // 温度上限°C
/**
* 核心方法将ADC电流值转换为温度值
* @param adcValue 电流采样值ADC值如11963
* @return 温度值°C保留1位小数异常时返回null
*/
public static Double convert(double adcValue) {
// 1. 校验ADC值是否在有效范围内
if (adcValue < ADC_LOW || adcValue > ADC_HIGH) {
log.warn("ADC值{}超出有效范围[{}, {}],转换失败", adcValue, ADC_LOW, ADC_HIGH);
return null;
}
// 2. 应用线性转换公式
double temperature = (adcValue - ADC_LOW) * (TEMP_HIGH - TEMP_LOW) / (ADC_HIGH - ADC_LOW) + TEMP_LOW;
// 3. 保留1位小数
return Math.round(temperature * 10) / 10.0;
}
}

View File

@ -58,10 +58,10 @@ public class DryAlarmLog extends BaseEntity
@TableField("restore_time")
private LocalDateTime restoreTime;
/** 报警状态0-未处理1-已处理2-已忽略 */
@Excel(name = "报警状态")
@TableField("alarm_status")
private String alarmStatus;
// /** 报警状态0-未处理1-已处理2-已忽略 */
// @Excel(name = "报警状态")
// @TableField("alarm_status")
// private String alarmStatus;
@Override
public String toString() {
@ -73,7 +73,7 @@ public class DryAlarmLog extends BaseEntity
.append("alarmValue", getAlarmValue())
.append("alarmTime", getAlarmTime())
.append("restoreTime", getRestoreTime())
.append("alarmStatus", getAlarmStatus())
// .append("alarmStatus", getAlarmStatus())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
.append("updateBy", getUpdateBy())

View File

@ -27,32 +27,93 @@ public class DryEquipmentDictionary extends BaseEntity
@TableId(type = IdType.AUTO)
private Long id;
/** 设备号 */
@TableField("equipment_code")
private String equipmentCode;
@TableField("value01")
private String value01;
/** 设备名称 */
@Excel(name = "设备名称")
@TableField("equipment_name")
private String equipmentName;
@TableField("value02")
private String value02;
/** 站点 */
@Excel(name = "站点")
@TableField("site")
private String site;
@TableField("value03")
private String value03;
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("equipmentCode", getEquipmentCode())
.append("equipmentName", getEquipmentName())
.append("equipmentType", getSite())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
.append("updateBy", getUpdateBy())
.append("updateTime", getUpdateTime())
.append("remark", getRemark())
.toString();
}
@TableField("value04")
private String value04;
@TableField("value05")
private String value05;
@TableField("value06")
private String value06;
@TableField("value07")
private String value07;
@TableField("value08")
private String value08;
@TableField("value09")
private String value09;
@TableField("value10")
private String value10;
@TableField("value11")
private String value11;
@TableField("value12")
private String value12;
@TableField("value13")
private String value13;
@TableField("value14")
private String value14;
@TableField("value15")
private String value15;
@TableField("value16")
private String value16;
@TableField("value17")
private String value17;
@TableField("value18")
private String value18;
@TableField("value19")
private String value19;
@TableField("value20")
private String value20;
@TableField("value21")
private String value21;
@TableField("value22")
private String value22;
@TableField("value23")
private String value23;
@TableField("value24")
private String value24;
@TableField("value25")
private String value25;
@TableField("value26")
private String value26;
@TableField("value27")
private String value27;
@TableField("value28")
private String value28;
@TableField("value29")
private String value29;
@TableField("value30")
private String value30;
}

View File

@ -1,4 +1,5 @@
package com.shgx.dryingroom.web.mapper;
public interface DryEquipmentDictionaryMapper {
}

View File

@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.shgx.common.utils.StringUtils;
import com.shgx.common.utils.bean.BeanUtils;
import com.shgx.dryingroom.alarm.AlarmLightService;
import com.shgx.dryingroom.mqtt.MqttDataHandler;
import com.shgx.dryingroom.web.domain.DryEquipmentInfo;
import com.shgx.dryingroom.web.domain.DryMonitoringData;
import com.shgx.dryingroom.web.domain.DryParamThreshold;
@ -15,9 +16,12 @@ import com.shgx.dryingroom.web.mapper.DryMonitoringDataMapper;
import com.shgx.dryingroom.web.mapper.DryParamThresholdMapper;
import com.shgx.dryingroom.web.service.IDryEquipmentScreenService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@ -43,6 +47,15 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService
@Autowired
private AlarmLightService alarmLightService;
@Resource
private RedisTemplate<String, Object> redisTemplate;
// Redis Key 前缀 AutoCollectService 保持一致
private static final String REDIS_EQUIP_PREFIX = "dry_equipment:";
// 支持的参数编码 MqttDataHandler 一致
private static final List<String> SUPPORT_PARAM_CODES = Arrays.asList("TEMP", "PARAM1", "DEW");
// 干燥机列表
@Override
public List<EquipmentScreenDTO.DryingDTO> dryingList() {
@ -56,24 +69,31 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService
// 按设备号字符串自然排序GZ-001 < GZ-002 < ... < GZ-026
equipmentList.sort(Comparator.comparing(DryEquipmentInfo::getEquipmentCode));
// 2. 提取所有干燥机设备号
List<String> equipmentCodes = equipmentList.stream()
.map(DryEquipmentInfo::getEquipmentCode)
.collect(Collectors.toList());
// 2. 提取所有干燥机设备号 + 对应的设备IDlocation字段如021
Map<String, String> equipmentCodeToLocationMap = equipmentList.stream()
.collect(Collectors.toMap(
DryEquipmentInfo::getEquipmentCode, // key设备号如GZ-021
DryEquipmentInfo::getLocation, // value设备ID如021
(k1, k2) -> k1 // 避免重复设备号冲突
));
List<String> equipmentCodes = new ArrayList<>(equipmentCodeToLocationMap.keySet());
// 3. 调用新版多参数方法返回 Map<设备号, Map<参数编码, 最新值>>
Map<String, Map<String, String>> latestParamDataMap = getLatestDataMap(equipmentCodes);
// 3. Redis 读取数据并转换为 DryMonitoringData 列表核心改造
List<DryMonitoringData> monitoringDataList = getDryMonitoringDataFromRedis(equipmentCodeToLocationMap);
// 4. 查询每个设备的阈值规则列表不变
// 4. 设备号+参数编码分组取最新数据按采集时间倒序
Map<String, Map<String, DryMonitoringData>> latestDataMap = groupLatestMonitoringData(monitoringDataList);
// 5. 查询每个设备的阈值规则列表不变
Map<String, List<EquipmentScreenDTO.ParamDTO>> paramMap = getParamMap(equipmentCodes);
// 5. 组装DTO仅提取TEMP温度参数的最新值
// 6. 组装DTO提取 TEMP 参数的最新值复用原有逻辑
return equipmentList.stream()
.map(equipment -> {
String setTem = (equipment != null && StringUtils.isNotBlank(equipment.getRuleCode()))
? Optional.ofNullable(paramMapper.selectOne(
new LambdaQueryWrapper<DryParamThreshold>().eq(DryParamThreshold::getRuleCode, equipment.getRuleCode())
.eq(DryParamThreshold::getIsEnabled,"1")
.eq(DryParamThreshold::getIsEnabled, "1")
)).map(DryParamThreshold::getMaxValue).orElse(null)
: null;
EquipmentScreenDTO.DryingDTO dto = new EquipmentScreenDTO.DryingDTO();
@ -86,9 +106,10 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService
dto.setEquipmentType("1");
dto.setParamList(paramMap.getOrDefault(equipmentCode, Collections.emptyList()));
// 关键从多参数映射中提取TEMP温度无数据则返回空字符串
Map<String, String> paramData = latestParamDataMap.getOrDefault(equipmentCode, new HashMap<>());
dto.setDataValue(paramData.getOrDefault("TEMP", "")); // 仅取温度值匹配原有字段类型
// 从分组数据中提取 TEMP 参数的最新值
Map<String, DryMonitoringData> paramDataMap = latestDataMap.getOrDefault(equipmentCode, new HashMap<>());
DryMonitoringData tempData = paramDataMap.get("TEMP");
dto.setDataValue(tempData != null ? tempData.getDataValue() : ""); // 直接复用 DryMonitoringData dataValue带单位
return dto;
})
@ -96,39 +117,200 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService
}
/**
* 获取每个设备的最新监测
* 除湿干燥机列表同时查询最新的温度TEMP和露点DEW
*/
private Map<String, Map<String, String>> getLatestDataMap(List<String> equipmentCodes) {
// 1. 批量查询设备的所有监测数据包含参数编码用于区分TEMP和DEW
LambdaQueryWrapper<DryMonitoringData> dataWrapper = new LambdaQueryWrapper<>();
dataWrapper.in(DryMonitoringData::getEquipmentCode, equipmentCodes)
.select(DryMonitoringData::getEquipmentCode,
DryMonitoringData::getParamCode,
DryMonitoringData::getDataValue,
DryMonitoringData::getCreateTime); // 只查需要的字段提升性能
@Override
public List<EquipmentScreenDTO.DrynessDTO> drynessList() {
// 1. 查询所有除湿干燥机equipment_type=2
LambdaQueryWrapper<DryEquipmentInfo> eqWrapper = new LambdaQueryWrapper<>();
eqWrapper.eq(DryEquipmentInfo::getEquipmentType, '2');
List<DryEquipmentInfo> equipmentList = infoMapper.selectList(eqWrapper);
if (equipmentList.isEmpty()) {
return Collections.emptyList();
}
List<DryMonitoringData> dataList = dataMapper.selectList(dataWrapper);
// 按设备号字符串自然排序GZ-001 < GZ-002 < ... < GZ-026
equipmentList.sort(Comparator.comparing(DryEquipmentInfo::getEquipmentCode));
// 2. 设备号+参数编码分组每组取创建时间最新的一条数据
// 2. 提取所有除湿干燥机的设备号 + 对应的设备IDlocation字段
Map<String, String> equipmentCodeToLocationMap = equipmentList.stream()
.collect(Collectors.toMap(
DryEquipmentInfo::getEquipmentCode,
DryEquipmentInfo::getLocation,
(k1, k2) -> k1
));
// 3. Redis 读取数据并转换为 DryMonitoringData 列表核心改造
List<DryMonitoringData> monitoringDataList = getDryMonitoringDataFromRedis(equipmentCodeToLocationMap);
// 4. 设备号+参数编码分组取最新数据
Map<String, Map<String, DryMonitoringData>> latestDataMap = groupLatestMonitoringData(monitoringDataList);
// 5. 查询每个设备的阈值规则列表不变
Map<String, List<EquipmentScreenDTO.ParamDTO>> paramMap = getParamMap(new ArrayList<>(equipmentCodeToLocationMap.keySet()));
// 6. 组装DTO提取 TEMP DEW 参数的最新值
return equipmentList.stream()
.map(equipment -> {
String setTem = (equipment != null && StringUtils.isNotBlank(equipment.getRuleCode()))
? Optional.ofNullable(paramMapper.selectOne(
new LambdaQueryWrapper<DryParamThreshold>().eq(DryParamThreshold::getRuleCode, equipment.getRuleCode())
.eq(DryParamThreshold::getIsEnabled, "1")
)).map(DryParamThreshold::getMaxValue).orElse(null)
: null;
EquipmentScreenDTO.DrynessDTO dto = new EquipmentScreenDTO.DrynessDTO();
String equipmentCode = equipment.getEquipmentCode();
// 基础信息赋值
dto.setEquipmentCode(equipmentCode);
dto.setEquipmentName(equipment.getEquipmentName());
dto.setStatus(equipment.getStatus());
dto.setSetTem(setTem);
dto.setEquipmentType("2");
dto.setParamList(paramMap.getOrDefault(equipmentCode, Collections.emptyList()));
// 提取 TEMP DEW 参数的最新值直接复用 DryMonitoringData dataValue
Map<String, DryMonitoringData> paramDataMap = latestDataMap.getOrDefault(equipmentCode, new HashMap<>());
DryMonitoringData tempData = paramDataMap.get("TEMP");
DryMonitoringData dewData = paramDataMap.get("DEW"); // PARAM1按业务映射
dto.setTempValue(tempData != null ? tempData.getDataValue() : "");
dto.setDewValue(dewData != null ? dewData.getDataValue() : "");
return dto;
})
.collect(Collectors.toList());
}
/**
* 核心改造 Redis 读取数据并转换为 DryMonitoringData 列表复用 AutoCollectService 逻辑
* @param equipmentCodeToLocationMap 设备号设备ID映射
* @return DryMonitoringData 列表
*/
private List<DryMonitoringData> getDryMonitoringDataFromRedis(Map<String, String> equipmentCodeToLocationMap) {
List<DryMonitoringData> resultList = new ArrayList<>();
// 1. 批量构建 Redis Key 前缀按设备ID+参数编码
List<String> redisKeyPatterns = new ArrayList<>();
for (String location : equipmentCodeToLocationMap.values()) {
for (String paramCode : SUPPORT_PARAM_CODES) {
String keyPattern = REDIS_EQUIP_PREFIX + location + ":" + paramCode;
redisKeyPatterns.add(keyPattern);
}
}
// 2. 批量扫描 Redis Key
Set<String> matchedKeys = new HashSet<>();
for (String pattern : redisKeyPatterns) {
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
matchedKeys.addAll(keys);
}
}
// 3. 批量读取 Redis 数据
List<Object> redisDataList = redisTemplate.opsForValue().multiGet(new ArrayList<>(matchedKeys));
if (redisDataList == null || redisDataList.isEmpty()) {
return resultList;
}
// 4. 解析并转换为 DryMonitoringData复用 AutoCollectService 的转换逻辑
for (int i = 0; i < matchedKeys.size(); i++) {
String redisKey = new ArrayList<>(matchedKeys).get(i);
Object redisData = redisDataList.get(i);
if (redisData == null || !(redisData instanceof MqttDataHandler.EquipmentData)) {
continue;
}
// 解析 Redis Key提取设备ID参数编码
String[] keyParts = redisKey.split(":");
if (keyParts.length != 3) {
continue; // 非法 Key 格式跳过
}
String location = keyParts[1]; // 设备ID如021
String paramCode = keyParts[2]; // 参数编码如TEMP
// 找到对应的设备号 GZ-021
String equipmentCode = null;
for (Map.Entry<String, String> entry : equipmentCodeToLocationMap.entrySet()) {
if (entry.getValue().equals(location)) {
equipmentCode = entry.getKey();
break;
}
}
if (equipmentCode == null) {
continue; // 未找到对应设备跳过
}
// 转换为 DryMonitoringData完全复用 AutoCollectService getDryMonitoringData 逻辑
MqttDataHandler.EquipmentData equipmentData = (MqttDataHandler.EquipmentData) redisData;
DryMonitoringData monitoringData = convertToDryMonitoringData(equipmentData, equipmentCode, paramCode);
resultList.add(monitoringData);
}
return resultList;
}
/**
* 转换为 DryMonitoringData完全复用 AutoCollectService 的逻辑确保格式一致
*/
private DryMonitoringData convertToDryMonitoringData(MqttDataHandler.EquipmentData redisData, String equipmentCode, String paramCode) {
DryMonitoringData dbData = new DryMonitoringData();
// 设备编码直接从映射中获取无需再次查询数据库
dbData.setEquipmentCode(equipmentCode);
// 参数编码TEMP/PARAM1/DEW
dbData.setParamCode(paramCode);
// 数据值带单位 AutoCollectService 保持一致
String unit = "°C";
if ("PARAM1".equals(paramCode)) {
unit = "自定义单位"; // 按业务调整
} else if ("DEW".equals(paramCode)) {
unit = "°C"; // 露点单位
}
dbData.setDataValue(String.format("%.1f%s", redisData.getValue(), unit));
// 采集时间优先使用 Redis 中的时间戳无则用当前时间
LocalDateTime collectTime = redisData.getTime() != null
? LocalDateTime.ofInstant(
java.time.Instant.ofEpochMilli(redisData.getTime()),
java.time.ZoneId.systemDefault()
)
: LocalDateTime.now();
dbData.setCollectTime(collectTime);
// 其他字段 AutoCollectService 一致
dbData.setDataStatus("1"); // 默认为正常状态可按业务调整
dbData.setCollectorCode("MQTT-REDIS");
dbData.setCreateBy("system");
dbData.setCreateTime(new Date());
dbData.setRemark("Redis实时查询数据");
return dbData;
}
/**
* 设备号+参数编码分组取每组最新的一条数据按采集时间倒序
*/
private Map<String, Map<String, DryMonitoringData>> groupLatestMonitoringData(List<DryMonitoringData> dataList) {
return dataList.stream()
.collect(Collectors.groupingBy(
// 第一层分组按设备号
DryMonitoringData::getEquipmentCode,
// 第二层分组按参数编码TEMP/DEW并取每组最新的一条数据
DryMonitoringData::getEquipmentCode, // 第一层设备号
Collectors.groupingBy(
DryMonitoringData::getParamCode,
DryMonitoringData::getParamCode, // 第二层参数编码
Collectors.collectingAndThen(
// 按创建时间倒序取第一条最新
Collectors.maxBy(Comparator.comparing(DryMonitoringData::getCreateTime)),
// 提取最新数据的监测值无数据时返回空字符串
opt -> opt.map(DryMonitoringData::getDataValue).orElse("")
// 采集时间倒序取第一条最新
Collectors.maxBy(Comparator.comparing(DryMonitoringData::getCollectTime)),
// 提取最新数据无数据时返回null
opt -> opt.orElse(null)
)
)
));
}
/**
* 获取每个设备的阈值规则列表
* 保留原有方法获取每个设备的阈值规则列表无需修改
*/
private Map<String, List<EquipmentScreenDTO.ParamDTO>> getParamMap(List<String> equipmentCodes) {
// 批量查询设备的所有阈值规则
@ -150,72 +332,7 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService
}
/**
* 除湿干燥机列表
* @return
*/
/**
* 除湿干燥机列表同时查询最新的温度TEMP和露点DEW
*/
@Override
public List<EquipmentScreenDTO.DrynessDTO> drynessList() {
// 1. 查询所有除湿干燥机equipment_type=2
LambdaQueryWrapper<DryEquipmentInfo> eqWrapper = new LambdaQueryWrapper<>();
eqWrapper.eq(DryEquipmentInfo::getEquipmentType, '2');
List<DryEquipmentInfo> equipmentList = infoMapper.selectList(eqWrapper);
if (equipmentList.isEmpty()) {
return Collections.emptyList();
}
// 按设备号字符串自然排序GZ-001 < GZ-002 < ... < GZ-026
equipmentList.sort(Comparator.comparing(DryEquipmentInfo::getEquipmentCode));
// 2. 提取所有除湿干燥机的设备号
List<String> equipmentCodes = equipmentList.stream()
.map(DryEquipmentInfo::getEquipmentCode)
.collect(Collectors.toList());
// 3. 调用优化后的方法获取设备号{参数编码最新值}的映射
Map<String, Map<String, String>> latestParamDataMap = getLatestDataMap(equipmentCodes);
// 4. 查询每个设备的阈值规则列表复用原有方法无需修改
Map<String, List<EquipmentScreenDTO.ParamDTO>> paramMap = getParamMap(equipmentCodes);
// 5. 组装DrynessDTO分别提取TEMP和DEW的最新值
return equipmentList.stream()
.map(equipment -> {
String setTem = (equipment != null && StringUtils.isNotBlank(equipment.getRuleCode()))
? Optional.ofNullable(paramMapper.selectOne(
new LambdaQueryWrapper<DryParamThreshold>().eq(DryParamThreshold::getRuleCode, equipment.getRuleCode())
.eq(DryParamThreshold::getIsEnabled,"1")
)).map(DryParamThreshold::getMaxValue).orElse(null)
: null;
EquipmentScreenDTO.DrynessDTO dto = new EquipmentScreenDTO.DrynessDTO();
String equipmentCode = equipment.getEquipmentCode();
// 基础信息赋值
dto.setEquipmentCode(equipmentCode);
dto.setEquipmentName(equipment.getEquipmentName());
dto.setStatus(equipment.getStatus());
dto.setSetTem(setTem);
dto.setEquipmentType("2");
// 阈值规则列表
dto.setParamList(paramMap.getOrDefault(equipmentCode, Collections.emptyList()));
// 关键提取当前设备的TEMP和DEW最新值无数据时返回空字符串
Map<String, String> paramData = latestParamDataMap.getOrDefault(equipmentCode, new HashMap<>());
String tempValue = paramData.getOrDefault("TEMP", ""); // 温度值
String dewValue = paramData.getOrDefault("DEW", ""); // 露点值
dto.setTempValue(tempValue);
dto.setDewValue(dewValue);
return dto;
})
.collect(Collectors.toList());
}
/**
* 一键关闭报警
* @return
* 一键关闭报警无需修改
*/
@Override
@Transactional
@ -226,4 +343,4 @@ public class DryEquipmentScreenServiceImpl implements IDryEquipmentScreenService
int result = infoMapper.update(null, updateWrapper);
return result;
}
}
}