diff --git a/gather-app/pom.xml b/gather-app/pom.xml index 4752621..4c10c7a 100644 --- a/gather-app/pom.xml +++ b/gather-app/pom.xml @@ -47,5 +47,12 @@ alipay-sdk-java 4.40.523.ALL + + + org.apache.kafka + kafka-clients + 2.3.0 + + diff --git a/gather-app/src/main/java/com/ruoyi/database/domain/VehicleRecord.java b/gather-app/src/main/java/com/ruoyi/database/domain/VehicleRecord.java new file mode 100644 index 0000000..1083121 --- /dev/null +++ b/gather-app/src/main/java/com/ruoyi/database/domain/VehicleRecord.java @@ -0,0 +1,122 @@ +package com.ruoyi.database.domain; + +import com.alipay.api.domain.ImageInfo; +import lombok.Data; +import java.util.List; + +/** + * 车辆进出场记录消息 + */ +@Data +public class VehicleRecord { + + /** + * 车位编码 + */ + private String berthCode; + + /** + * 车位号码 + */ + private String berthNumber; + + /** + * 车辆类型 + */ + private Integer carType; + + /** + * 经销商编码 + */ + private String dealerCode; + + /** + * 方向 (0-进场, 1-出场) + */ + private Integer direct; + + /** + * 停车类型 + */ + private Integer parkType; + + /** + * 通行时间 + */ + private String passTime; + + /** + * 车牌颜色 + */ + private Integer plateColor; + + /** + * 车牌号码 + */ + private String plateNo; + + /** + * 车牌类型 + */ + private Integer plateType; + + /** + * 停车场编码 + */ + private String parkCode; + + /** + * 进场唯一ID + */ + private String inUnid; + + /** + * 出场唯一ID + */ + private String outUnid; + + /** + * 免费时长(分钟) + */ + private Integer freeTime; + + /** + * 进场时间 + */ + private String inTime; + + /** + * 出场时间 + */ + private String outTime; + + /** + * 道闸编码 + */ + private String gateCode; + + /** + * 车道编号 + */ + private String laneNo; + + /** + * 进场图片列表 + */ + private List inImages; + + /** + * 出场图片列表 + */ + private List outImages; + + /** + * 总费用(分) + */ + private Integer totalCost; + + /** + * 实付费用(分) + */ + private Integer payCost; +} \ No newline at end of file diff --git a/gather-app/src/main/java/com/ruoyi/database/service/HikKafkaConsumerService.java b/gather-app/src/main/java/com/ruoyi/database/service/HikKafkaConsumerService.java new file mode 100644 index 0000000..7f4d3bd --- /dev/null +++ b/gather-app/src/main/java/com/ruoyi/database/service/HikKafkaConsumerService.java @@ -0,0 +1,248 @@ +package com.ruoyi.database.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ruoyi.database.domain.VehicleRecord; +import org.apache.kafka.clients.consumer.*; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Arrays; +import java.util.Date; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +@Component +public class HikKafkaConsumerService implements InitializingBean, DisposableBean { + + private final AtomicBoolean running = new AtomicBoolean(false); + private ExecutorService executorService; + private KafkaConsumer consumer; + + // 配置参数 - 建议放到配置文件中 + private final String bootstrapServers = "kafka.hikparking.com:38999,kafka.hikparking.com:38991,kafka.hikparking.com:38992"; + private final String groupId = "gp_32856UK2Rr9BT"; + private final String topic = "topic_pass32856ik8M5V"; + private final String username = "103251113CU2C82TCA6M9"; + private final String password = "ZR56NoW%qxov$Gl4"; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void afterPropertiesSet() throws Exception { + startConsumer(); + } + + @Override + public void destroy() throws Exception { + stopConsumer(); + } + + private Properties createConsumerConfig() { + Properties properties = new Properties(); + + // 基础配置 + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + + // SASL认证配置 - 修正了你的配置中的拼写错误 + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "SCRAM-SHA-256"); // 修正:sssl -> sasl + + // JAAS配置 - 修正了配置格式 + String jaasConfig = String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", + username, password + ); + properties.put("sasl.jaas.config", jaasConfig); // 修正:sssl -> sasl + + // 其他优化配置 + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); + + return properties; + } + + private void startConsumer() { + try { + Properties config = createConsumerConfig(); + consumer = new KafkaConsumer<>(config); + consumer.subscribe(Arrays.asList(topic)); + + running.set(true); + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(this::consumeMessages); + + System.out.println("Kafka消费者启动成功,开始监听主题: " + topic); + } catch (Exception e) { + System.err.println("Kafka消费者启动失败: " + e.getMessage()); + e.printStackTrace(); + } + } + + private void consumeMessages() { + int emptyPollCount = 0; + final int MAX_EMPTY_POLLS = 10; // 连续空轮询次数阈值 + + while (running.get()) { + try { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + if (records != null && !records.isEmpty()) { + emptyPollCount = 0; // 重置空轮询计数 + System.out.println("拉取到 " + records.count() + " 条消息"); + + for (ConsumerRecord record : records) { + processRecord(record); + } + + // 手动提交偏移量 + consumer.commitSync(); + System.out.println("偏移量提交成功"); + + } else { + emptyPollCount++; + // 如果连续多次拉取不到消息,记录日志(可选) + if (emptyPollCount >= MAX_EMPTY_POLLS) { + System.out.println("连续 " + MAX_EMPTY_POLLS + " 次未拉取到消息,消费者运行正常..."); + emptyPollCount = 0; // 重置计数 + } + } + + } catch (Exception e) { + System.err.println("消费消息时发生异常: " + e.getMessage()); + e.printStackTrace(); + + // 异常后短暂休眠 + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + private void processRecord(ConsumerRecord record) { + try { + // 这里实现你的业务逻辑 + System.out.printf("接收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n", + record.topic(), record.partition(), record.offset(), + record.key(), record.value()); + VehicleRecord vehicleRecord = objectMapper.readValue(record.value(), VehicleRecord.class); + // 示例业务处理 + handleBusinessLogic(vehicleRecord, record); + + } catch (Exception e) { + System.err.println("处理消息失败: " + e.getMessage()); + // 根据业务需求决定是否抛出异常 + // 如果抛出异常,这批消息的偏移量不会提交,会被重新消费 + } + } + + private void handleBusinessLogic(VehicleRecord record, ConsumerRecord kafkaRecord) { + try { + // 根据方向处理不同的业务逻辑 + if (record.getDirect() == 0) { + processInRecord(record); // 进场记录 + } else if (record.getDirect() == 1) { + processOutRecord(record); // 出场记录 + } else { + System.err.println("未知的方向类型: " + record.getDirect()); + } + + } catch (Exception e) { + System.err.println("业务处理失败: " + e.getMessage()); + throw new RuntimeException("业务处理异常", e); + } + } + + + + /** + * 处理进场记录 + */ + private void processInRecord(VehicleRecord record) { + System.out.println("=== 处理进场记录 ==="); + System.out.println("停车场: " + record.getParkCode()); + System.out.println("车牌: " + record.getPlateNo()); + System.out.println("进场时间: " + record.getInTime()); + System.out.println("车道: " + record.getLaneNo()); + System.out.println("进场图片数量: " + + (record.getInImages() != null ? record.getInImages().size() : 0)); + + // 这里添加具体的进场业务逻辑 + // 例如:保存到数据库、发送通知等 + } + + /** + * 处理出场记录 + */ + private void processOutRecord(VehicleRecord record) { + System.out.println("=== 处理出场记录 ==="); + System.out.println("停车场: " + record.getParkCode()); + System.out.println("车牌: " + record.getPlateNo()); + System.out.println("出场时间: " + record.getOutTime()); + System.out.println("停车时长: " + calculateParkingDuration(record)); + System.out.println("总费用: " + record.getTotalCost() + "分"); + System.out.println("实付费用: " + record.getPayCost() + "分"); + System.out.println("出场图片数量: " + + (record.getOutImages() != null ? record.getOutImages().size() : 0)); + + // 这里添加具体的出场业务逻辑 + // 例如:计算费用、更新停车记录、扣费等 + } + + /** + * 计算停车时长(分钟) + */ + private Long calculateParkingDuration(VehicleRecord record) { + try { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date inTime = sdf.parse(record.getInTime()); + Date outTime = sdf.parse(record.getOutTime()); + return (outTime.getTime() - inTime.getTime()) / (1000 * 60); + } catch (Exception e) { + System.err.println("计算停车时长失败: " + e.getMessage()); + return null; + } + } + + private void stopConsumer() { + System.out.println("正在停止Kafka消费者..."); + running.set(false); + + if (executorService != null) { + executorService.shutdown(); + } + + if (consumer != null) { + consumer.close(); + System.out.println("Kafka消费者已关闭"); + } + } + + /** + * 手动重启消费者(可选) + */ + public void restartConsumer() { + stopConsumer(); + try { + Thread.sleep(3000); + startConsumer(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +}