diff --git a/src/main/java/com/zhitan/MQTTGatewayApplication.java b/src/main/java/com/zhitan/MQTTGatewayApplication.java index 01e4c81..fa60e05 100644 --- a/src/main/java/com/zhitan/MQTTGatewayApplication.java +++ b/src/main/java/com/zhitan/MQTTGatewayApplication.java @@ -1,6 +1,8 @@ package com.zhitan; +import com.zhitan.collcet.WaterDataCollector; import org.mybatis.spring.annotation.MapperScan; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; diff --git a/src/main/java/com/zhitan/handler/MqttMessageHandler.java b/src/main/java/com/zhitan/handler/MqttMessageHandler.java index d1f8b83..4388ea6 100644 --- a/src/main/java/com/zhitan/handler/MqttMessageHandler.java +++ b/src/main/java/com/zhitan/handler/MqttMessageHandler.java @@ -1,6 +1,7 @@ package com.zhitan.handler; import com.fasterxml.jackson.databind.ObjectMapper; +import com.zhitan.model.dto.ElectricDTO; import com.zhitan.model.entity.ElectricPower; import com.zhitan.service.IDataService; import lombok.extern.slf4j.Slf4j; @@ -31,8 +32,8 @@ public class MqttMessageHandler implements MessageHandler { ObjectMapper objectMapper = new ObjectMapper(); try { // 将 JSON 字符串转换为 SensorData 对象 - ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class); - dataService.writeTimeSeriesData(electricPower); + ElectricDTO electricDTO = objectMapper.readValue(payload, ElectricDTO.class); + dataService.writeTimeSeriesData(electricDTO); } catch (Exception e) { log.error(e.getMessage()); } diff --git a/src/main/java/com/zhitan/job/WaterDataTimingJob.java b/src/main/java/com/zhitan/job/WaterDataTimingJob.java index 77ae2d5..564ad2b 100644 --- a/src/main/java/com/zhitan/job/WaterDataTimingJob.java +++ b/src/main/java/com/zhitan/job/WaterDataTimingJob.java @@ -24,6 +24,8 @@ public class WaterDataTimingJob { * 代表【每天凌晨1点整】执行一次 */ @Scheduled(cron = "0 0 1 * * ?") + // 每天 11:04 执行(24小时制,若要11点04分下午则用 23 代替 11) +// @Scheduled(cron = "0 4 11 * * ?") public void autoCollectWaterData() { // 校验:如果上一次任务还在执行,本次直接跳过,避免重复采集数据 if (isRunning) { diff --git a/src/main/java/com/zhitan/model/dto/ElectricDTO.java b/src/main/java/com/zhitan/model/dto/ElectricDTO.java new file mode 100644 index 0000000..ebac7d5 --- /dev/null +++ b/src/main/java/com/zhitan/model/dto/ElectricDTO.java @@ -0,0 +1,24 @@ +package com.zhitan.model.dto; + +public class ElectricDTO { + + private Long time; + + private Object params; + + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + public Object getParams() { + return params; + } + + public void setParams(Object params) { + this.params = params; + } +} diff --git a/src/main/java/com/zhitan/model/entity/ElectricPower.java b/src/main/java/com/zhitan/model/entity/ElectricPower.java index 325ef98..70ac1b2 100644 --- a/src/main/java/com/zhitan/model/entity/ElectricPower.java +++ b/src/main/java/com/zhitan/model/entity/ElectricPower.java @@ -8,77 +8,10 @@ import lombok.Data; */ @Data public class ElectricPower { - @JsonProperty("SN") - private String sn; - @JsonProperty("Pt") - private double pt; + private String meterId; - @JsonProperty("Ua") - private double ua; + private Double cumulant; - @JsonProperty("Ub") - private double ub; - - @JsonProperty("Uc") - private double uc; - - @JsonProperty("Uab") - private double uab; - - @JsonProperty("Ubc") - private double ubc; - - @JsonProperty("Uca") - private double uca; - @JsonProperty("Ia") - private double ia; - - @JsonProperty("Ib") - private double ib; - - @JsonProperty("Ic") - private double ic; - - @JsonProperty("Pw") - private double pw; - - @JsonProperty("Pwa") - private double pwa; - - @JsonProperty("Pwb") - private double pwb; - - @JsonProperty("Pwc") - private double pwc; - - @JsonProperty("Pq") - private double pq; - - @JsonProperty("Pqa") - private double pqa; - - @JsonProperty("Pqb") - private double pqb; - - @JsonProperty("Pqc") - private double pqc; - - @JsonProperty("Q") - private double q; - - @JsonProperty("Qa") - private double qa; - - @JsonProperty("Qb") - private double qb; - - @JsonProperty("Qc") - private double qc; - - @JsonProperty("Time") - private String time; - - @JsonProperty("Type") - private int type; + private String gatewayKey = "zt"; } diff --git a/src/main/java/com/zhitan/service/IDataService.java b/src/main/java/com/zhitan/service/IDataService.java index f04f0cc..73ecd7b 100644 --- a/src/main/java/com/zhitan/service/IDataService.java +++ b/src/main/java/com/zhitan/service/IDataService.java @@ -1,5 +1,6 @@ package com.zhitan.service; +import com.zhitan.model.dto.ElectricDTO; import com.zhitan.model.entity.ElectricPower; import org.jetbrains.annotations.NotNull; @@ -18,7 +19,7 @@ public interface IDataService { /** * 写入电力相关数据-固定格式,可自定义修改 * - * @param electricPower 固定格式的数据 + * @param electricDTO 固定格式的数据 */ - void writeTimeSeriesData(@NotNull ElectricPower electricPower); + void writeTimeSeriesData(@NotNull ElectricDTO electricDTO); } diff --git a/src/main/java/com/zhitan/service/impl/DataServiceImpl.java b/src/main/java/com/zhitan/service/impl/DataServiceImpl.java index fc1af3d..7851449 100644 --- a/src/main/java/com/zhitan/service/impl/DataServiceImpl.java +++ b/src/main/java/com/zhitan/service/impl/DataServiceImpl.java @@ -8,6 +8,7 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.config.influxdb.InfluxdbConfig; import com.zhitan.model.IndexTemplate; +import com.zhitan.model.dto.ElectricDTO; import com.zhitan.model.entity.ElectricPower; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.mapper.CommonMapper; @@ -33,6 +34,7 @@ public class DataServiceImpl implements IDataService { private final String TAG = "tag"; private final String FIELD_VALUE = "value"; + private final String TYPE = "type"; private final InfluxdbRepository repository; private final InfluxdbConfig influxdbConfig; private final CommonMapper commonMapper; @@ -82,40 +84,92 @@ public class DataServiceImpl implements IDataService { } /** - * 写入电力相关数据-固定格式,可自定义修改 - * - * @param electricPower 固定格式的数据 + * 写入电力相关数据-固定格式,完美适配:MQTT→ElectricDTO→ElectricPower→InfluxDB + * @param electricDTO 固定格式的数据 */ @Override - public void writeTimeSeriesData(@NotNull ElectricPower electricPower) { - List templates = getIndexTemplate(); - // 获取类中所有声明的字段 - Field[] fields = electricPower.getClass().getDeclaredFields(); + public void writeTimeSeriesData(@NotNull ElectricDTO electricDTO) { + // 1. 定义返回的点位集合,批量入库用 List points = new ArrayList<>(); - for (Field field : fields) { - IndexTemplate indexTemplate = templates.stream().filter(template -> - field.getName().equalsIgnoreCase(template.getGatewayKey())) - .findFirst().orElse(null); - if (indexTemplate != null) { + // 2. 安全获取params并强转Map,解决Object接收的问题,核心修复 + Object paramsObj = electricDTO.getParams(); + if (!(paramsObj instanceof Map)) { + log.error("ElectricDTO的params不是Map格式,数据异常:{}", paramsObj); + return; + } + // 万能接收params: key=电表ID(40001/40002), value=电能值(100/200) + Map paramsMap = (Map) paramsObj; + + // 3. 获取设备上报的时间戳,转Instant用于InfluxDB,优先用设备时间,无则用当前时间 + Long dataTimeStamp = electricDTO.getTime(); + // 👇 直接用毫秒时间戳写入,指定WritePrecision.MS,时区+精度双正确 + Instant dataInstant = dataTimeStamp != null && dataTimeStamp > 0 + ? Instant.ofEpochMilli(dataTimeStamp) + : Instant.now(); + + // 4. 获取数据库中的模板配置 (gatewayKey对应电表ID,有redis缓存,不用改) + List templates = getIndexTemplate(); + + // 5. 遍历paramsMap,核心逻辑:把Map的key-value转成ElectricPower实体+匹配模板 + for (Map.Entry entry : paramsMap.entrySet()) { + // ========== 核心步骤1:把MQTT的Map数据 赋值给你的ElectricPower实体 ========== + ElectricPower electricPower = new ElectricPower(); + String meterId = entry.getKey(); // params的key → 电表ID → ElectricPower.meterId + Object valueObj = entry.getValue(); // params的value → 电能值 + String gatewayKey = electricPower.getGatewayKey(); + + // 安全转换数值:Integer/Double 都能转成Double,解决类型转换报错,核心修复 + Double cumulant = null; + if (valueObj instanceof Number) { + cumulant = ((Number) valueObj).doubleValue(); + } + // 给实体赋值,这就是你要存的最终数据格式 ✅ + electricPower.setMeterId(meterId); + electricPower.setCumulant(cumulant); + + // ========== 核心步骤2:用【电表ID】匹配数据库的IndexTemplate模板 ========== + if (electricPower.getCumulant() == null) { + log.warn("电表ID:{} 无有效电能值,跳过", meterId); + continue; + } + // 匹配规则:模板的gatewayKey + IndexTemplate matchTemplate = templates.stream() + .filter(template -> gatewayKey.equalsIgnoreCase(template.getGatewayKey())) + .findFirst() + .orElse(null); + + // ========== 核心步骤3:模板匹配成功,封装InfluxDB的Point对象 ========== + if (matchTemplate != null) { Point point = Point - .measurement(influxdbConfig.getMeasurement()) - .addTag(TAG, electricPower.getSn() + "_" + indexTemplate.getCode()) - .time(Instant.now(), WritePrecision.S); - // 设置字段可访问,允许访问私有字段 - field.setAccessible(true); - if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { - try { - // 获取字段值 - double value = field.getDouble(electricPower); - point.addField(FIELD_VALUE, value); - points.add(point); - } catch (IllegalAccessException e) { - log.error("获取属性值失败:{}", e.getMessage()); - } - } + .measurement(influxdbConfig.getMeasurement()) // 库名,配置文件读取 + // Tag拼接:电表ID_模板code (和你原反射逻辑一致) + .addTag(TAG, electricPower.getMeterId()) + .addTag(TYPE, "electricity") + .time(dataInstant, WritePrecision.MS) // 数据采集时间 + .addField(FIELD_VALUE, electricPower.getCumulant()); // 存入的电能值 + + points.add(point); + } else { + log.warn("电表ID:{} 未匹配到数据库模板,跳过", meterId); } } - repository.writePoints(points); + + // ========== 核心步骤4:批量写入InfluxDB,循环外写入,性能最优 ========== + if (!points.isEmpty()) { + // ======== 新增:打印每条写入InfluxDB的【最终完整格式】 ======== + log.info("========== 开始打印本次写入InfluxDB的完整数据格式 =========="); + points.forEach(point -> { + String lineProtocol = point.toLineProtocol(); + log.info("写入InfluxDB行协议:{}", lineProtocol); + }); + log.info("========================================================="); + + // 原有写入逻辑不变 + repository.writePoints(points); + log.info("成功处理电表数据,写入InfluxDB共{}条点位数据", points.size()); + } else { + log.info("本次处理电表数据,无有效点位可写入"); + } } /** @@ -123,6 +177,7 @@ public class DataServiceImpl implements IDataService { */ protected List getIndexTemplate() { String TEMPLATE_KEY = "template"; + redisCache.deleteObject(TEMPLATE_KEY); List result = redisCache.getCacheList(TEMPLATE_KEY); if (result == null || result.isEmpty()) { result = commonMapper.getIndexTemplate();