海康云平台kafka车辆进出消息监控处理

This commit is contained in:
hanrenchun 2025-12-01 11:26:34 +08:00
parent a8f324ee5c
commit 66e9d8f877
3 changed files with 377 additions and 0 deletions

View File

@ -47,5 +47,12 @@
<artifactId>alipay-sdk-java</artifactId>
<version>4.40.523.ALL</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,122 @@
package com.ruoyi.database.domain;
import com.alipay.api.domain.ImageInfo;
import lombok.Data;
import java.util.List;
/**
* 车辆进出场记录消息
*/
@Data
public class VehicleRecord {
/**
* 车位编码
*/
private String berthCode;
/**
* 车位号码
*/
private String berthNumber;
/**
* 车辆类型
*/
private Integer carType;
/**
* 经销商编码
*/
private String dealerCode;
/**
* 方向 (0-进场, 1-出场)
*/
private Integer direct;
/**
* 停车类型
*/
private Integer parkType;
/**
* 通行时间
*/
private String passTime;
/**
* 车牌颜色
*/
private Integer plateColor;
/**
* 车牌号码
*/
private String plateNo;
/**
* 车牌类型
*/
private Integer plateType;
/**
* 停车场编码
*/
private String parkCode;
/**
* 进场唯一ID
*/
private String inUnid;
/**
* 出场唯一ID
*/
private String outUnid;
/**
* 免费时长(分钟)
*/
private Integer freeTime;
/**
* 进场时间
*/
private String inTime;
/**
* 出场时间
*/
private String outTime;
/**
* 道闸编码
*/
private String gateCode;
/**
* 车道编号
*/
private String laneNo;
/**
* 进场图片列表
*/
private List<ImageInfo> inImages;
/**
* 出场图片列表
*/
private List<ImageInfo> outImages;
/**
* 总费用()
*/
private Integer totalCost;
/**
* 实付费用()
*/
private Integer payCost;
}

View File

@ -0,0 +1,248 @@
package com.ruoyi.database.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.database.domain.VehicleRecord;
import org.apache.kafka.clients.consumer.*;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class HikKafkaConsumerService implements InitializingBean, DisposableBean {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService executorService;
private KafkaConsumer<String, String> consumer;
// 配置参数 - 建议放到配置文件中
private final String bootstrapServers = "kafka.hikparking.com:38999,kafka.hikparking.com:38991,kafka.hikparking.com:38992";
private final String groupId = "gp_32856UK2Rr9BT";
private final String topic = "topic_pass32856ik8M5V";
private final String username = "103251113CU2C82TCA6M9";
private final String password = "ZR56NoW%qxov$Gl4";
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
startConsumer();
}
@Override
public void destroy() throws Exception {
stopConsumer();
}
private Properties createConsumerConfig() {
Properties properties = new Properties();
// 基础配置
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// SASL认证配置 - 修正了你的配置中的拼写错误
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "SCRAM-SHA-256"); // 修正sssl -> sasl
// JAAS配置 - 修正了配置格式
String jaasConfig = String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",
username, password
);
properties.put("sasl.jaas.config", jaasConfig); // 修正sssl -> sasl
// 其他优化配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
return properties;
}
private void startConsumer() {
try {
Properties config = createConsumerConfig();
consumer = new KafkaConsumer<>(config);
consumer.subscribe(Arrays.asList(topic));
running.set(true);
executorService = Executors.newSingleThreadExecutor();
executorService.submit(this::consumeMessages);
System.out.println("Kafka消费者启动成功开始监听主题: " + topic);
} catch (Exception e) {
System.err.println("Kafka消费者启动失败: " + e.getMessage());
e.printStackTrace();
}
}
private void consumeMessages() {
int emptyPollCount = 0;
final int MAX_EMPTY_POLLS = 10; // 连续空轮询次数阈值
while (running.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records != null && !records.isEmpty()) {
emptyPollCount = 0; // 重置空轮询计数
System.out.println("拉取到 " + records.count() + " 条消息");
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 手动提交偏移量
consumer.commitSync();
System.out.println("偏移量提交成功");
} else {
emptyPollCount++;
// 如果连续多次拉取不到消息记录日志可选
if (emptyPollCount >= MAX_EMPTY_POLLS) {
System.out.println("连续 " + MAX_EMPTY_POLLS + " 次未拉取到消息,消费者运行正常...");
emptyPollCount = 0; // 重置计数
}
}
} catch (Exception e) {
System.err.println("消费消息时发生异常: " + e.getMessage());
e.printStackTrace();
// 异常后短暂休眠
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
private void processRecord(ConsumerRecord<String, String> record) {
try {
// 这里实现你的业务逻辑
System.out.printf("接收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
VehicleRecord vehicleRecord = objectMapper.readValue(record.value(), VehicleRecord.class);
// 示例业务处理
handleBusinessLogic(vehicleRecord, record);
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 根据业务需求决定是否抛出异常
// 如果抛出异常这批消息的偏移量不会提交会被重新消费
}
}
private void handleBusinessLogic(VehicleRecord record, ConsumerRecord<String, String> kafkaRecord) {
try {
// 根据方向处理不同的业务逻辑
if (record.getDirect() == 0) {
processInRecord(record); // 进场记录
} else if (record.getDirect() == 1) {
processOutRecord(record); // 出场记录
} else {
System.err.println("未知的方向类型: " + record.getDirect());
}
} catch (Exception e) {
System.err.println("业务处理失败: " + e.getMessage());
throw new RuntimeException("业务处理异常", e);
}
}
/**
* 处理进场记录
*/
private void processInRecord(VehicleRecord record) {
System.out.println("=== 处理进场记录 ===");
System.out.println("停车场: " + record.getParkCode());
System.out.println("车牌: " + record.getPlateNo());
System.out.println("进场时间: " + record.getInTime());
System.out.println("车道: " + record.getLaneNo());
System.out.println("进场图片数量: " +
(record.getInImages() != null ? record.getInImages().size() : 0));
// 这里添加具体的进场业务逻辑
// 例如保存到数据库发送通知等
}
/**
* 处理出场记录
*/
private void processOutRecord(VehicleRecord record) {
System.out.println("=== 处理出场记录 ===");
System.out.println("停车场: " + record.getParkCode());
System.out.println("车牌: " + record.getPlateNo());
System.out.println("出场时间: " + record.getOutTime());
System.out.println("停车时长: " + calculateParkingDuration(record));
System.out.println("总费用: " + record.getTotalCost() + "");
System.out.println("实付费用: " + record.getPayCost() + "");
System.out.println("出场图片数量: " +
(record.getOutImages() != null ? record.getOutImages().size() : 0));
// 这里添加具体的出场业务逻辑
// 例如计算费用更新停车记录扣费等
}
/**
* 计算停车时长分钟
*/
private Long calculateParkingDuration(VehicleRecord record) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date inTime = sdf.parse(record.getInTime());
Date outTime = sdf.parse(record.getOutTime());
return (outTime.getTime() - inTime.getTime()) / (1000 * 60);
} catch (Exception e) {
System.err.println("计算停车时长失败: " + e.getMessage());
return null;
}
}
private void stopConsumer() {
System.out.println("正在停止Kafka消费者...");
running.set(false);
if (executorService != null) {
executorService.shutdown();
}
if (consumer != null) {
consumer.close();
System.out.println("Kafka消费者已关闭");
}
}
/**
* 手动重启消费者可选
*/
public void restartConsumer() {
stopConsumer();
try {
Thread.sleep(3000);
startConsumer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}