水表接入

This commit is contained in:
杨晓东 2026-02-09 14:08:45 +08:00
parent 730dc7c420
commit 57728719b2
7 changed files with 120 additions and 102 deletions

View File

@ -1,6 +1,8 @@
package com.zhitan;
import com.zhitan.collcet.WaterDataCollector;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

View File

@ -1,6 +1,7 @@
package com.zhitan.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhitan.model.dto.ElectricDTO;
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.service.IDataService;
import lombok.extern.slf4j.Slf4j;
@ -31,8 +32,8 @@ public class MqttMessageHandler implements MessageHandler {
ObjectMapper objectMapper = new ObjectMapper();
try {
// JSON 字符串转换为 SensorData 对象
ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class);
dataService.writeTimeSeriesData(electricPower);
ElectricDTO electricDTO = objectMapper.readValue(payload, ElectricDTO.class);
dataService.writeTimeSeriesData(electricDTO);
} catch (Exception e) {
log.error(e.getMessage());
}

View File

@ -24,6 +24,8 @@ public class WaterDataTimingJob {
* 代表每天凌晨1点整执行一次
*/
@Scheduled(cron = "0 0 1 * * ?")
// 每天 11:04 执行24小时制若要11点04分下午则用 23 代替 11
// @Scheduled(cron = "0 4 11 * * ?")
public void autoCollectWaterData() {
// 校验如果上一次任务还在执行本次直接跳过避免重复采集数据
if (isRunning) {

View File

@ -0,0 +1,24 @@
package com.zhitan.model.dto;
public class ElectricDTO {
private Long time;
private Object params;
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public Object getParams() {
return params;
}
public void setParams(Object params) {
this.params = params;
}
}

View File

@ -8,77 +8,10 @@ import lombok.Data;
*/
@Data
public class ElectricPower {
@JsonProperty("SN")
private String sn;
@JsonProperty("Pt")
private double pt;
private String meterId;
@JsonProperty("Ua")
private double ua;
private Double cumulant;
@JsonProperty("Ub")
private double ub;
@JsonProperty("Uc")
private double uc;
@JsonProperty("Uab")
private double uab;
@JsonProperty("Ubc")
private double ubc;
@JsonProperty("Uca")
private double uca;
@JsonProperty("Ia")
private double ia;
@JsonProperty("Ib")
private double ib;
@JsonProperty("Ic")
private double ic;
@JsonProperty("Pw")
private double pw;
@JsonProperty("Pwa")
private double pwa;
@JsonProperty("Pwb")
private double pwb;
@JsonProperty("Pwc")
private double pwc;
@JsonProperty("Pq")
private double pq;
@JsonProperty("Pqa")
private double pqa;
@JsonProperty("Pqb")
private double pqb;
@JsonProperty("Pqc")
private double pqc;
@JsonProperty("Q")
private double q;
@JsonProperty("Qa")
private double qa;
@JsonProperty("Qb")
private double qb;
@JsonProperty("Qc")
private double qc;
@JsonProperty("Time")
private String time;
@JsonProperty("Type")
private int type;
private String gatewayKey = "zt";
}

View File

@ -1,5 +1,6 @@
package com.zhitan.service;
import com.zhitan.model.dto.ElectricDTO;
import com.zhitan.model.entity.ElectricPower;
import org.jetbrains.annotations.NotNull;
@ -18,7 +19,7 @@ public interface IDataService {
/**
* 写入电力相关数据-固定格式可自定义修改
*
* @param electricPower 固定格式的数据
* @param electricDTO 固定格式的数据
*/
void writeTimeSeriesData(@NotNull ElectricPower electricPower);
void writeTimeSeriesData(@NotNull ElectricDTO electricDTO);
}

View File

@ -8,6 +8,7 @@ import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.zhitan.config.influxdb.InfluxdbConfig;
import com.zhitan.model.IndexTemplate;
import com.zhitan.model.dto.ElectricDTO;
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.influxdb.InfluxdbRepository;
import com.zhitan.mapper.CommonMapper;
@ -33,6 +34,7 @@ public class DataServiceImpl implements IDataService {
private final String TAG = "tag";
private final String FIELD_VALUE = "value";
private final String TYPE = "type";
private final InfluxdbRepository repository;
private final InfluxdbConfig influxdbConfig;
private final CommonMapper commonMapper;
@ -82,40 +84,92 @@ public class DataServiceImpl implements IDataService {
}
/**
* 写入电力相关数据-固定格式可自定义修改
*
* @param electricPower 固定格式的数据
* 写入电力相关数据-固定格式完美适配MQTTElectricDTOElectricPowerInfluxDB
* @param electricDTO 固定格式的数据
*/
@Override
public void writeTimeSeriesData(@NotNull ElectricPower electricPower) {
List<IndexTemplate> templates = getIndexTemplate();
// 获取类中所有声明的字段
Field[] fields = electricPower.getClass().getDeclaredFields();
public void writeTimeSeriesData(@NotNull ElectricDTO electricDTO) {
// 1. 定义返回的点位集合批量入库用
List<Point> points = new ArrayList<>();
for (Field field : fields) {
IndexTemplate indexTemplate = templates.stream().filter(template ->
field.getName().equalsIgnoreCase(template.getGatewayKey()))
.findFirst().orElse(null);
if (indexTemplate != null) {
// 2. 安全获取params并强转Map解决Object接收的问题核心修复
Object paramsObj = electricDTO.getParams();
if (!(paramsObj instanceof Map)) {
log.error("ElectricDTO的params不是Map格式数据异常:{}", paramsObj);
return;
}
// 万能接收params: key=电表ID(40001/40002), value=电能值(100/200)
Map<String, Object> paramsMap = (Map<String, Object>) paramsObj;
// 3. 获取设备上报的时间戳转Instant用于InfluxDB优先用设备时间无则用当前时间
Long dataTimeStamp = electricDTO.getTime();
// 👇 直接用毫秒时间戳写入指定WritePrecision.MS时区+精度双正确
Instant dataInstant = dataTimeStamp != null && dataTimeStamp > 0
? Instant.ofEpochMilli(dataTimeStamp)
: Instant.now();
// 4. 获取数据库中的模板配置 (gatewayKey对应电表ID有redis缓存不用改)
List<IndexTemplate> templates = getIndexTemplate();
// 5. 遍历paramsMap核心逻辑把Map的key-value转成ElectricPower实体+匹配模板
for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {
// ========== 核心步骤1把MQTT的Map数据 赋值给你的ElectricPower实体 ==========
ElectricPower electricPower = new ElectricPower();
String meterId = entry.getKey(); // params的key 电表ID ElectricPower.meterId
Object valueObj = entry.getValue(); // params的value 电能值
String gatewayKey = electricPower.getGatewayKey();
// 安全转换数值Integer/Double 都能转成Double解决类型转换报错核心修复
Double cumulant = null;
if (valueObj instanceof Number) {
cumulant = ((Number) valueObj).doubleValue();
}
// 给实体赋值这就是你要存的最终数据格式
electricPower.setMeterId(meterId);
electricPower.setCumulant(cumulant);
// ========== 核心步骤2电表ID匹配数据库的IndexTemplate模板 ==========
if (electricPower.getCumulant() == null) {
log.warn("电表ID:{} 无有效电能值,跳过", meterId);
continue;
}
// 匹配规则模板的gatewayKey
IndexTemplate matchTemplate = templates.stream()
.filter(template -> gatewayKey.equalsIgnoreCase(template.getGatewayKey()))
.findFirst()
.orElse(null);
// ========== 核心步骤3模板匹配成功封装InfluxDB的Point对象 ==========
if (matchTemplate != null) {
Point point = Point
.measurement(influxdbConfig.getMeasurement())
.addTag(TAG, electricPower.getSn() + "_" + indexTemplate.getCode())
.time(Instant.now(), WritePrecision.S);
// 设置字段可访问允许访问私有字段
field.setAccessible(true);
if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) {
try {
// 获取字段值
double value = field.getDouble(electricPower);
point.addField(FIELD_VALUE, value);
points.add(point);
} catch (IllegalAccessException e) {
log.error("获取属性值失败:{}", e.getMessage());
}
}
.measurement(influxdbConfig.getMeasurement()) // 库名配置文件读取
// Tag拼接电表ID_模板code (和你原反射逻辑一致)
.addTag(TAG, electricPower.getMeterId())
.addTag(TYPE, "electricity")
.time(dataInstant, WritePrecision.MS) // 数据采集时间
.addField(FIELD_VALUE, electricPower.getCumulant()); // 存入的电能值
points.add(point);
} else {
log.warn("电表ID:{} 未匹配到数据库模板,跳过", meterId);
}
}
repository.writePoints(points);
// ========== 核心步骤4批量写入InfluxDB循环外写入性能最优 ==========
if (!points.isEmpty()) {
// ======== 新增打印每条写入InfluxDB的最终完整格式 ========
log.info("========== 开始打印本次写入InfluxDB的完整数据格式 ==========");
points.forEach(point -> {
String lineProtocol = point.toLineProtocol();
log.info("写入InfluxDB行协议{}", lineProtocol);
});
log.info("=========================================================");
// 原有写入逻辑不变
repository.writePoints(points);
log.info("成功处理电表数据写入InfluxDB共{}条点位数据", points.size());
} else {
log.info("本次处理电表数据,无有效点位可写入");
}
}
/**
@ -123,6 +177,7 @@ public class DataServiceImpl implements IDataService {
*/
protected List<IndexTemplate> getIndexTemplate() {
String TEMPLATE_KEY = "template";
redisCache.deleteObject(TEMPLATE_KEY);
List<IndexTemplate> result = redisCache.getCacheList(TEMPLATE_KEY);
if (result == null || result.isEmpty()) {
result = commonMapper.getIndexTemplate();