kafka数据接收并存库,出入口图片生成工具修改

This commit is contained in:
hanrenchun 2025-12-11 11:17:50 +08:00
parent d467ca1d89
commit e28ed3508f
16 changed files with 1104 additions and 258 deletions

View File

@ -54,6 +54,11 @@
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.zxing</groupId>
<artifactId>core</artifactId>

View File

@ -0,0 +1,57 @@
package com.ruoyi.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
public class KafkaConfig {
private String bootstrapServers = "kafka.hikparking.com:30090,kafka.hikparking.com:30091,kafka.hikparking.com:30092";
private String groupId = "gp_32856UK2Rr9BT";
private String topic = "topic_pass32856ik8M5V";
private String username = "103251113CU2C82TCA6M9";
private String password = "ZR56NoW%qxov$Gl4";
/**
* 创建Kafka消费者配置
*/
@Bean
public Properties kafkaConsumerProperties() {
Properties properties = new Properties();
// 基础配置
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("group.id", groupId);
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("heartbeat.interval.ms", "10000");
properties.setProperty("max.poll.records", "500");
// SASL认证配置 - 特别注意SCRAM-SHA-256不是你截图中的SCRAM-SMA-256
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "SCRAM-SHA-256"); // 修正为SCRAM-SHA-256
// 注意正确的类名是scram不是scran
String jaasConfig = String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"%s\" password=\"%s\";",
username, password
);
properties.put("sasl.jaas.config", jaasConfig);
return properties;
}
}

View File

@ -48,15 +48,21 @@ public class SmallProgramController extends BaseController {
@ApiOperation("根据车牌号查询缴费订单")
@Transactional
public AjaxResult getOrder(String plateNo) {
ParkingBillInfo b = parkingBillInfoService.saveOrUpdateParkingBillInfo(plateNo);
if (b == null) {
// ParkingBillInfo b = parkingBillInfoService.saveOrUpdateParkingBillInfo(plateNo);
ParkingBillInfo one = parkingBillInfoService.lambdaQuery()
.eq(ParkingBillInfo::getPlateNo, plateNo)
.orderByDesc(ParkingBillInfo::getCreateTime)
.last("limit 1")
.one();
if (one == null) {
return AjaxResult.error("未查询到订单");
}else {
ParkingBillInfoVO parkingBillInfoVO = new ParkingBillInfoVO();
BeanUtils.copyProperties(b, parkingBillInfoVO);
parkingBillInfoVO.setTotalCostYuan(b.getTotalCostYuan().toString() + "");
parkingBillInfoVO.setPayMoneyYuan(b.getPayMoneyYuan().toString() + "");
parkingBillInfoVO.setDeductMoneyYuan(b.getDeductMoneyYuan().toString() + "");
BeanUtils.copyProperties(one, parkingBillInfoVO);
parkingBillInfoVO.setTotalCostYuan(one.getTotalCostYuan().toString() + "");
parkingBillInfoVO.setPayMoneyYuan(one.getPayMoneyYuan().toString() + "");
parkingBillInfoVO.setDeductMoneyYuan(one.getDeductMoneyYuan().toString() + "");
return AjaxResult.success(parkingBillInfoVO);
}
}
@ -174,10 +180,10 @@ public class SmallProgramController extends BaseController {
if (one == null) {
return AjaxResult.error("订单查询失败");
}
one = parkingBillInfoService.saveOrUpdateParkingBillInfo(one.getPlateNo());
if (one == null) {
return AjaxResult.error("订单查询失败");
}
// one = parkingBillInfoService.saveOrUpdateParkingBillInfo(one.getPlateNo());
// if (one == null) {
// return AjaxResult.error("订单查询失败");
// }
String generate = generate(loginUserByPhone.getPhone());
String openId = getOpenId(payFees);
String clientIp = wechatMiniProgramPayService.getClientIpAddress(request);

View File

@ -0,0 +1,130 @@
package com.ruoyi.database.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.ruoyi.common.annotation.Excel;
import com.ruoyi.common.core.domain.BaseEntity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* 停车场过车消息表(PassVehicleInfo)实体类
*
* @author makejava
* @since 2025-12-11 10:14:26
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName(value = "pass_vehicle_info")
@ApiModel(value = "PassVehicleInfo", description = "停车场过车消息表")
public class PassVehicleInfo {
/**
* 主键
*/
@ApiModelProperty("主键")
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 创建时间
*/
@ApiModelProperty("创建时间")
@Excel(name = "创建时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
/**
* 创建者
*/
@ApiModelProperty("创建者")
@Excel(name = "创建者")
private Integer createBy;
/**
* 更新时间
*/
@ApiModelProperty("更新时间")
@Excel(name = "更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
/**
* 更新者
*/
@ApiModelProperty("更新者")
@Excel(name = "更新者")
private Integer updateBy;
/**
* 出入方向
*/
@ApiModelProperty("出入方向")
@Excel(name = "出入方向")
private Integer direct;
/**
* 车牌号
*/
@ApiModelProperty("车牌号")
@Excel(name = "车牌号")
private String plateNo;
/**
* 停车场编号
*/
@ApiModelProperty("停车场编号")
@Excel(name = "停车场编号")
private String parkCode;
/**
* 出入口编号
*/
@ApiModelProperty("出入口编号")
@Excel(name = "出入口编号")
private String gateCode;
/**
* 车道编号
*/
@ApiModelProperty("车道编号")
@Excel(name = "车道编号")
private String laneNo;
/**
* 入车唯一编号
*/
@ApiModelProperty("入车唯一编号")
@Excel(name = "入车唯一编号")
private String inUnid;
/**
* 出车唯一编号
*/
@ApiModelProperty("出车唯一编号")
@Excel(name = "出车唯一编号")
private String outUnid;
/**
* 进场时间
*/
@ApiModelProperty("进场时间")
@Excel(name = "进场时间")
private String inTime;
/**
* 出场时间
*/
@ApiModelProperty("出场时间")
@Excel(name = "出场时间")
private String outTime;
}

View File

@ -0,0 +1,151 @@
package com.ruoyi.database.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.ruoyi.common.annotation.Excel;
import com.ruoyi.common.core.domain.BaseEntity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* 车辆请求放行表(ReleaseRequestInfo)实体类
*
* @author makejava
* @since 2025-12-11 10:16:48
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName(value = "release_request_info")
@ApiModel(value = "ReleaseRequestInfo", description = "车辆请求放行表")
public class ReleaseRequestInfo {
/**
* 主键
*/
@ApiModelProperty("主键")
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 创建时间
*/
@ApiModelProperty("创建时间")
@Excel(name = "创建时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
/**
* 创建者
*/
@ApiModelProperty("创建者")
@Excel(name = "创建者")
private Integer createBy;
/**
* 更新时间
*/
@ApiModelProperty("更新时间")
@Excel(name = "更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
/**
* 更新者
*/
@ApiModelProperty("更新者")
@Excel(name = "更新者")
private Integer updateBy;
/**
* 出入方向
*/
@ApiModelProperty("出入方向")
@Excel(name = "出入方向")
private Integer direct;
/**
* 车牌号
*/
@ApiModelProperty("车牌号")
@Excel(name = "车牌号")
private String plateNo;
/**
* 过车时间
*/
@ApiModelProperty("过车时间")
@Excel(name = "过车时间")
private String passTime;
/**
* 停车场编号
*/
@ApiModelProperty("停车场编号")
@Excel(name = "停车场编号")
private String parkCode;
/**
* 出入口编号
*/
@ApiModelProperty("出入口编号")
@Excel(name = "出入口编号")
private String gateCode;
/**
* 车道编号
*/
@ApiModelProperty("车道编号")
@Excel(name = "车道编号")
private String laneNo;
/**
* 入车唯一编号
*/
@ApiModelProperty("入车唯一编号")
@Excel(name = "入车唯一编号")
private String inUnid;
/**
* 出车唯一编号
*/
@ApiModelProperty("出车唯一编号")
@Excel(name = "出车唯一编号")
private String outUnid;
/**
* 停车类型
*/
@ApiModelProperty("停车类型")
@Excel(name = "停车类型")
private Integer parkType;
/**
* 车牌类型
*/
@ApiModelProperty("车牌类型")
@Excel(name = "车牌类型")
private Integer plateType;
/**
* 车牌颜色
*/
@ApiModelProperty("车牌颜色")
@Excel(name = "车牌颜色")
private Integer plateColor;
/**
* 车辆类型
*/
@ApiModelProperty("车辆类型")
@Excel(name = "车辆类型")
private Integer carType;
}

View File

@ -0,0 +1,16 @@
package com.ruoyi.database.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.database.domain.PassVehicleInfo;
import org.apache.ibatis.annotations.Mapper;
/**
* 停车场过车消息表(PassVehicleInfo) Mapper 接口
*
* @author makejava
* @since ${date}
*/
@Mapper
public interface PassVehicleInfoMapper extends BaseMapper<PassVehicleInfo> {
}

View File

@ -0,0 +1,16 @@
package com.ruoyi.database.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.database.domain.ReleaseRequestInfo;
import org.apache.ibatis.annotations.Mapper;
/**
* 车辆请求放行表(ReleaseRequestInfo) Mapper 接口
*
* @author makejava
* @since ${date}
*/
@Mapper
public interface ReleaseRequestInfoMapper extends BaseMapper<ReleaseRequestInfo> {
}

View File

@ -1,248 +0,0 @@
package com.ruoyi.database.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.database.domain.VehicleRecord;
import org.apache.kafka.clients.consumer.*;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class HikKafkaConsumerService implements InitializingBean, DisposableBean {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService executorService;
private KafkaConsumer<String, String> consumer;
// 配置参数 - 建议放到配置文件中
private final String bootstrapServers = "kafka.hikparking.com:38999,kafka.hikparking.com:38991,kafka.hikparking.com:38992";
private final String groupId = "gp_32856UK2Rr9BT";
private final String topic = "topic_pass32856ik8M5V";
private final String username = "103251113CU2C82TCA6M9";
private final String password = "ZR56NoW%qxov$Gl4";
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
startConsumer();
}
@Override
public void destroy() throws Exception {
stopConsumer();
}
private Properties createConsumerConfig() {
Properties properties = new Properties();
// 基础配置
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// SASL认证配置 - 修正了你的配置中的拼写错误
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "SCRAM-SHA-256"); // 修正sssl -> sasl
// JAAS配置 - 修正了配置格式
String jaasConfig = String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",
username, password
);
properties.put("sasl.jaas.config", jaasConfig); // 修正sssl -> sasl
// 其他优化配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
return properties;
}
private void startConsumer() {
try {
Properties config = createConsumerConfig();
consumer = new KafkaConsumer<>(config);
consumer.subscribe(Arrays.asList(topic));
running.set(true);
executorService = Executors.newSingleThreadExecutor();
executorService.submit(this::consumeMessages);
System.out.println("Kafka消费者启动成功开始监听主题: " + topic);
} catch (Exception e) {
System.err.println("Kafka消费者启动失败: " + e.getMessage());
e.printStackTrace();
}
}
private void consumeMessages() {
int emptyPollCount = 0;
final int MAX_EMPTY_POLLS = 10; // 连续空轮询次数阈值
while (running.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records != null && !records.isEmpty()) {
emptyPollCount = 0; // 重置空轮询计数
System.out.println("拉取到 " + records.count() + " 条消息");
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 手动提交偏移量
consumer.commitSync();
System.out.println("偏移量提交成功");
} else {
emptyPollCount++;
// 如果连续多次拉取不到消息记录日志可选
if (emptyPollCount >= MAX_EMPTY_POLLS) {
System.out.println("连续 " + MAX_EMPTY_POLLS + " 次未拉取到消息,消费者运行正常...");
emptyPollCount = 0; // 重置计数
}
}
} catch (Exception e) {
System.err.println("消费消息时发生异常: " + e.getMessage());
e.printStackTrace();
// 异常后短暂休眠
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
private void processRecord(ConsumerRecord<String, String> record) {
try {
// 这里实现你的业务逻辑
System.out.printf("接收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
VehicleRecord vehicleRecord = objectMapper.readValue(record.value(), VehicleRecord.class);
// 示例业务处理
handleBusinessLogic(vehicleRecord, record);
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 根据业务需求决定是否抛出异常
// 如果抛出异常这批消息的偏移量不会提交会被重新消费
}
}
private void handleBusinessLogic(VehicleRecord record, ConsumerRecord<String, String> kafkaRecord) {
try {
// 根据方向处理不同的业务逻辑
if (record.getDirect() == 0) {
processInRecord(record); // 进场记录
} else if (record.getDirect() == 1) {
processOutRecord(record); // 出场记录
} else {
System.err.println("未知的方向类型: " + record.getDirect());
}
} catch (Exception e) {
System.err.println("业务处理失败: " + e.getMessage());
throw new RuntimeException("业务处理异常", e);
}
}
/**
* 处理进场记录
*/
private void processInRecord(VehicleRecord record) {
System.out.println("=== 处理进场记录 ===");
System.out.println("停车场: " + record.getParkCode());
System.out.println("车牌: " + record.getPlateNo());
System.out.println("进场时间: " + record.getInTime());
System.out.println("车道: " + record.getLaneNo());
System.out.println("进场图片数量: " +
(record.getInImages() != null ? record.getInImages().size() : 0));
// 这里添加具体的进场业务逻辑
// 例如保存到数据库发送通知等
}
/**
* 处理出场记录
*/
private void processOutRecord(VehicleRecord record) {
System.out.println("=== 处理出场记录 ===");
System.out.println("停车场: " + record.getParkCode());
System.out.println("车牌: " + record.getPlateNo());
System.out.println("出场时间: " + record.getOutTime());
System.out.println("停车时长: " + calculateParkingDuration(record));
System.out.println("总费用: " + record.getTotalCost() + "");
System.out.println("实付费用: " + record.getPayCost() + "");
System.out.println("出场图片数量: " +
(record.getOutImages() != null ? record.getOutImages().size() : 0));
// 这里添加具体的出场业务逻辑
// 例如计算费用更新停车记录扣费等
}
/**
* 计算停车时长分钟
*/
private Long calculateParkingDuration(VehicleRecord record) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date inTime = sdf.parse(record.getInTime());
Date outTime = sdf.parse(record.getOutTime());
return (outTime.getTime() - inTime.getTime()) / (1000 * 60);
} catch (Exception e) {
System.err.println("计算停车时长失败: " + e.getMessage());
return null;
}
}
private void stopConsumer() {
System.out.println("正在停止Kafka消费者...");
running.set(false);
if (executorService != null) {
executorService.shutdown();
}
if (consumer != null) {
consumer.close();
System.out.println("Kafka消费者已关闭");
}
}
/**
* 手动重启消费者可选
*/
public void restartConsumer() {
stopConsumer();
try {
Thread.sleep(3000);
startConsumer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -0,0 +1,14 @@
package com.ruoyi.database.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ruoyi.database.domain.PassVehicleInfo;
/**
* (停车场过车消息表)Service
*
* @author makejava
* @since 2025-12-11 10:14:27
*/
public interface PassVehicleInfoService extends IService<PassVehicleInfo> {
}

View File

@ -0,0 +1,14 @@
package com.ruoyi.database.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ruoyi.database.domain.ReleaseRequestInfo;
/**
* (车辆请求放行表)Service
*
* @author makejava
* @since 2025-12-11 10:16:48
*/
public interface ReleaseRequestInfoService extends IService<ReleaseRequestInfo> {
}

View File

@ -0,0 +1,55 @@
package com.ruoyi.database.service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.database.domain.PassVehicleInfo;
import com.ruoyi.database.domain.ReleaseRequestInfo;
import com.ruoyi.database.util.SimpleJsonConverter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 简单消息处理服务
*/
@Slf4j
@Service
public class SimpleKafkaMessageService {
@Autowired
private SimpleJsonConverter jsonConverter;
@Autowired
private PassVehicleInfoService passVehicleInfoService;
@Autowired
private ReleaseRequestInfoService releaseRequestInfoService;
/**
* 处理Kafka消息
*/
public void processKafkaMessage(String topic, String message) {
try {
JSONObject json = JSON.parseObject(message);
if (topic.contains("topic_pass32856ik8M5V")) {
// 过车消息
PassVehicleInfo info = jsonConverter.toPassVehicleInfo(json);
if (info != null) {
passVehicleInfoService.save(info);
log.info("保存过车消息成功, 车牌: {}", info.getPlateNo());
}
} else if (topic.contains("topic_releaseRequest32856dSpuPV")) {
// 放行请求消息
ReleaseRequestInfo info = jsonConverter.toReleaseRequestInfo(json);
if (info != null) {
releaseRequestInfoService.save(info);
log.info("保存放行请求成功, 车牌: {}", info.getPlateNo());
}
}
} catch (Exception e) {
log.error("处理Kafka消息失败, topic: {}, message: {}", topic, message, e);
}
}
}

View File

@ -0,0 +1,19 @@
package com.ruoyi.database.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.database.domain.PassVehicleInfo;
import com.ruoyi.database.mapper.PassVehicleInfoMapper;
import com.ruoyi.database.service.PassVehicleInfoService;
import org.springframework.stereotype.Service;
/**
* (停车场过车消息表)ServiceImpl
*
* @author makejava
* @since 2025-12-11 10:14:27
*/
@Service
public class PassVehicleInfoServiceImpl extends ServiceImpl<PassVehicleInfoMapper, PassVehicleInfo> implements PassVehicleInfoService {
}

View File

@ -0,0 +1,19 @@
package com.ruoyi.database.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.database.domain.ReleaseRequestInfo;
import com.ruoyi.database.mapper.ReleaseRequestInfoMapper;
import com.ruoyi.database.service.ReleaseRequestInfoService;
import org.springframework.stereotype.Service;
/**
* (车辆请求放行表)ServiceImpl
*
* @author makejava
* @since 2025-12-11 10:16:48
*/
@Service
public class ReleaseRequestInfoServiceImpl extends ServiceImpl<ReleaseRequestInfoMapper, ReleaseRequestInfo> implements ReleaseRequestInfoService {
}

View File

@ -0,0 +1,278 @@
package com.ruoyi.database.util;
import com.ruoyi.database.service.SimpleKafkaMessageService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class HikKafkaConsumer implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(HikKafkaConsumer.class);
private final AtomicBoolean running = new AtomicBoolean(true);
private ExecutorService executorService;
private KafkaConsumer<String, String> consumer;
@Autowired
private Properties kafkaConsumerProperties;
@Autowired
private SimpleKafkaMessageService simpleKafkaMessageService;
private String topic = "topic_releaseRequest32856dSpuPV,topic_pass32856ik8M5V";
private boolean autoStart = true;
private int consumerThreads = 1;
@Override
public void run(String... args) throws Exception {
if (autoStart) {
startConsumer();
}
}
/**
* 启动消费者
*/
public synchronized void startConsumer() {
if (executorService != null && !executorService.isShutdown()) {
logger.warn("消费者已经在运行");
return;
}
try {
// 创建消费者实例
consumer = new KafkaConsumer<>(kafkaConsumerProperties);
consumer.subscribe(Arrays.asList(topic.split(",")));
executorService = Executors.newFixedThreadPool(consumerThreads);
for (int i = 0; i < consumerThreads; i++) {
final int threadId = i;
executorService.submit(() -> consumeMessages(threadId));
}
logger.info("海康Kafka消费者启动成功");
printStartupInfo();
} catch (Exception e) {
logger.error("启动Kafka消费者失败", e);
System.err.println("❌ 启动Kafka消费者失败: " + e.getMessage());
}
}
/**
* 消费消息
*/
private void consumeMessages(int threadId) {
String threadName = "kafka-consumer-" + threadId;
Thread.currentThread().setName(threadName);
logger.info("消费者线程 {} 启动", threadName);
while (running.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
// 没有消息时稍微休眠避免CPU空转
Thread.sleep(100);
continue;
}
for (ConsumerRecord<String, String> record : records) {
processRecord(record, threadId);
}
// 异步提交偏移量如果开启了自动提交则不需要
// consumer.commitAsync();
} catch (Exception e) {
logger.error("消费者线程 {} 消费消息异常", threadName, e);
// 发生异常时等待一段时间再重试
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
logger.info("消费者线程 {} 停止", threadName);
}
/**
* 处理单条记录
*/
private void processRecord(ConsumerRecord<String, String> record, int threadId) {
try {
// 格式化输出到控制台
String consoleOutput = formatConsoleOutput(record, threadId);
System.out.println(consoleOutput);
// 记录到日志文件INFO级别
logger.info("消费者线程 {} - 主题: {}, 分区: {}, 偏移量: {}, 键: {}, 值: {}",
threadId, record.topic(), record.partition(),
record.offset(), record.key(), record.value());
// 如果需要这里可以添加业务处理逻辑
simpleKafkaMessageService.processKafkaMessage(record.topic(), record.value());
} catch (Exception e) {
logger.error("处理消息记录失败", e);
System.err.println("处理消息失败: " + e.getMessage());
}
}
/**
* 格式化控制台输出
*/
private String formatConsoleOutput(ConsumerRecord<String, String> record, int threadId) {
StringBuilder sb = new StringBuilder();
sb.append("\n");
sb.append("╔══════════════════════════════════════════════════════════════════╗\n");
sb.append(String.format("║ 【海康Kafka消息】线程: %-6d ║\n", threadId));
sb.append("╠══════════════════════════════════════════════════════════════════╣\n");
sb.append(String.format("║ 主题 : %-50s ║\n", record.topic()));
sb.append(String.format("║ 分区 : %-50d ║\n", record.partition()));
sb.append(String.format("║ 偏移量 : %-50d ║\n", record.offset()));
sb.append(String.format("║ 时间戳 : %-50s ║\n", new java.util.Date(record.timestamp())));
sb.append(String.format("║ 键 : %-50s ║\n",
record.key() != null ? record.key() : "null"));
// 处理消息值如果太长则截断
String value = record.value();
if (value.length() > 50) {
value = value.substring(0, 47) + "...";
}
sb.append(String.format("║ 值 : %-50s ║\n", value));
sb.append("╚══════════════════════════════════════════════════════════════════╝\n");
return sb.toString();
}
/**
* 打印启动信息
*/
private void printStartupInfo() {
String servers = kafkaConsumerProperties.getProperty("bootstrap.servers");
String groupId = kafkaConsumerProperties.getProperty("group.id");
String username = extractUsernameFromJaasConfig();
String startupInfo = String.format(
"\n" +
"██████╗ ███████╗██╗ ██╗███████╗██╗ ██╗ ██╗ ██╗██╗██╗ ██╗\n" +
"██╔══██╗██╔════╝██║ ██║██╔════╝██║ ██╔╝ ██║ ██╔╝██║██║ ██╔╝\n" +
"██████╔╝█████╗ ██║ ██║███████╗█████╔╝ █████╔╝ ██║█████╔╝ \n" +
"██╔══██╗██╔══╝ ╚██╗ ██╔╝╚════██║██╔═██╗ ██╔═██╗ ██║██╔═██╗ \n" +
"██║ ██║███████╗ ╚████╔╝ ███████║██║ ██╗ ██║ ██╗██║██║ ██╗\n" +
"╚═╝ ╚═╝╚══════╝ ╚═══╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝╚═╝ ╚═╝\n" +
"\n" +
"════════════════════ Kafka消费者启动成功 ═════════════════════\n" +
"服务器地址 : %s\n" +
"消费主题 : %s\n" +
"消费者组 : %s\n" +
"认证用户 : %s\n" +
"认证机制 : SCRAM-SHA-256\n" +
"线程数量 : %d\n" +
"═══════════════════════════════════════════════════════════\n",
servers, topic, groupId, username, consumerThreads
);
System.out.println(startupInfo);
logger.info(startupInfo);
}
/**
* 从JAAS配置中提取用户名
*/
private String extractUsernameFromJaasConfig() {
String jaasConfig = (String) kafkaConsumerProperties.get("sasl.jaas.config");
if (jaasConfig != null) {
// 从类似 "username=\"userxxxxx\"" 的字符串中提取用户名
int start = jaasConfig.indexOf("username=\"");
if (start != -1) {
start += "username=\"".length();
int end = jaasConfig.indexOf("\"", start);
if (end != -1) {
return jaasConfig.substring(start, end);
}
}
}
return "未知";
}
/**
* 停止消费者
*/
public synchronized void stopConsumer() {
running.set(false);
if (executorService != null) {
executorService.shutdown();
logger.info("正在停止消费者线程池...");
}
if (consumer != null) {
consumer.close();
logger.info("Kafka消费者已关闭");
}
System.out.println("Kafka消费者已停止");
}
/**
* 重新启动消费者
*/
public void restartConsumer() {
stopConsumer();
running.set(true);
try {
Thread.sleep(2000); // 等待2秒再重启
startConsumer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 测试消费一条消息用于验证连接
*/
public void testConsumeOne() {
try (KafkaConsumer<String, String> testConsumer = new KafkaConsumer<>(kafkaConsumerProperties)) {
testConsumer.subscribe(Arrays.asList(topic.split(",")));
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(5000));
if (records.isEmpty()) {
System.out.println("测试消费:未收到任何消息(可能没有新消息)");
} else {
System.out.println("测试消费:收到 " + records.count() + " 条消息");
for (ConsumerRecord<String, String> record : records) {
System.out.println("主题: " + record.topic() + ", 值: " + record.value());
}
}
} catch (Exception e) {
System.err.println("测试消费失败: " + e.getMessage());
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,252 @@
package com.ruoyi.database.util;
import com.google.zxing.BarcodeFormat;
import com.google.zxing.EncodeHintType;
import com.google.zxing.MultiFormatWriter;
import com.google.zxing.WriterException;
import com.google.zxing.client.j2se.MatrixToImageConfig;
import com.google.zxing.client.j2se.MatrixToImageWriter;
import com.google.zxing.common.BitMatrix;
import com.google.zxing.qrcode.QRCodeWriter;
import com.google.zxing.qrcode.decoder.ErrorCorrectionLevel;
import javax.imageio.ImageIO;
import java.awt.*;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class QRCodeGenerator1080x640 {
public static void generateHighQualityQRCode(String content, String filePath) throws Exception {
Map<EncodeHintType, Object> hints = new HashMap<>();
// === 关键参数配置 ===
hints.put(EncodeHintType.CHARACTER_SET, "UTF-8");
hints.put(EncodeHintType.MARGIN, 4); // 边距
hints.put(EncodeHintType.ERROR_CORRECTION, ErrorCorrectionLevel.H); // 高容错
hints.put(EncodeHintType.QR_VERSION, 10); // 版本
// 生成大小根据内容动态调整
int qrSize = 500; // 固定大小确保清晰度
QRCodeWriter writer = new QRCodeWriter();
BitMatrix matrix = writer.encode(content, BarcodeFormat.QR_CODE, qrSize, qrSize, hints);
// === 提高图像质量 ===
int scale = 4; // 缩放因子提高分辨率
BufferedImage qrImage = new BufferedImage(
qrSize * scale,
qrSize * scale,
BufferedImage.TYPE_INT_RGB
);
Graphics2D g = qrImage.createGraphics();
g.setColor(Color.WHITE);
g.fillRect(0, 0, qrSize * scale, qrSize * scale);
g.setColor(Color.BLACK);
// 绘制高分辨率二维码
for (int x = 0; x < qrSize; x++) {
for (int y = 0; y < qrSize; y++) {
if (matrix.get(x, y)) {
g.fillRect(x * scale, y * scale, scale, scale);
}
}
}
g.dispose();
// === 创建1080x640画布 ===
BufferedImage finalImage = new BufferedImage(1080, 640, BufferedImage.TYPE_INT_RGB);
Graphics2D finalG = finalImage.createGraphics();
finalG.setColor(Color.WHITE);
finalG.fillRect(0, 0, 1080, 640);
// 居中放置
int x = (1080 - qrImage.getWidth()) / 2;
int y = (640 - qrImage.getHeight()) / 2;
finalG.drawImage(qrImage, x, y, null);
finalG.dispose();
// 保存
ImageIO.write(finalImage, "PNG", new File(filePath));
}
/**
* 简单的调试工具检查二维码内容
*/
public static void testQRCode(String filePath) {
try {
System.out.println("=== 二维码生成配置 ===");
System.out.println("1. 确保二维码核心部分是正方形");
System.out.println("2. 有足够的边距margin >= 2");
System.out.println("3. 使用高容错等级ErrorCorrectionLevel.H");
System.out.println("4. 像素点清晰,无模糊");
System.out.println("5. 背景纯白,前景纯黑(对比度高)");
// 加载并检查图像
BufferedImage img = ImageIO.read(new File(filePath));
System.out.println("\n图像信息");
System.out.println("尺寸: " + img.getWidth() + "x" + img.getHeight());
System.out.println("类型: " + img.getType());
} catch (Exception e) {
e.printStackTrace();
}
}
// public static void main(String[] args) {
// String simpleContent = "https://tingche.csckl.com"; // 先用简单内容测试
//
// // 方法1直接生成正方形先确保能扫描
// Map<EncodeHintType, Object> hints = new HashMap<>();
// hints.put(EncodeHintType.CHARACTER_SET, "UTF-8");
// hints.put(EncodeHintType.MARGIN, 4);
//
// QRCodeWriter writer = new QRCodeWriter();
// BitMatrix matrix = null;
// try {
// matrix = writer.encode(simpleContent, BarcodeFormat.QR_CODE, 600, 1024, hints);
// // 直接保存正方形二维码
// File squareFile = new File("C:\\Users\\28758\\Desktop\\test_square.jpg");
// MatrixToImageWriter.writeToPath(matrix, "JPG", squareFile.toPath());
//
// System.out.println("请先扫描 test_square.png");
// System.out.println("如果能扫描,再尝试矩形版本");
// } catch (WriterException e) {
// throw new RuntimeException(e);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
//
//
// }
// public static void main(String[] args) {
// int width = 600;
// int height = 1024;
// String outputPath = "hikvision_control.jpg";
//
// try {
// createHikvisionImage(width, height, outputPath);
// System.out.println("图片生成成功: " + outputPath);
// } catch (IOException e) {
// System.err.println("图片生成失败: " + e.getMessage());
// }
// }
public static void createHikvisionImage(int width, int height, String outputPath) throws IOException {
// 创建BufferedImage对象
BufferedImage image = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);
// 获取Graphics2D对象
Graphics2D g2d = image.createGraphics();
// 设置渲染质量
g2d.setRenderingHint(RenderingHints.KEY_ANTIALIASING, RenderingHints.VALUE_ANTIALIAS_ON);
// 设置蓝色背景海康常用蓝色
Color hikBlue = new Color(0, 123, 184); // 海康威视标准蓝色
g2d.setColor(hikBlue);
g2d.fillRect(0, 0, width, height);
// 可选添加文字或logo
addHikvisionText(g2d, width, height);
// 释放资源
g2d.dispose();
// 保存图片
ImageIO.write(image, "jpg", new File(outputPath));
}
private static void addHikvisionText(Graphics2D g2d, int width, int height) {
// 设置文字颜色为白色
// g2d.setColor(Color.WHITE);
// 设置字体
Font font = new Font("Microsoft YaHei", Font.BOLD, 48);
g2d.setFont(font);
// 计算文字位置使其居中
String text = "出入口控制";
FontMetrics fm = g2d.getFontMetrics();
int textWidth = fm.stringWidth(text);
int textHeight = fm.getHeight();
int x = (width - textWidth) / 2;
int y = (height - textHeight) / 2 + fm.getAscent();
// 绘制文字
// g2d.drawString(text, x, y);
// 添加副标题
Font subFont = new Font("Microsoft YaHei", Font.PLAIN, 24);
g2d.setFont(subFont);
String subText = "海康威视 HIKVISION";
FontMetrics subFm = g2d.getFontMetrics();
// int subTextWidth = subFm.stringWidth(subText);
// int subX = (width - subTextWidth) / 2;
int subY = y + 80;
// g2d.drawString(subText, subX, subY);
}
public static void main(String[] args) {
try {
createSimpleParkingQR(
"请扫码缴费",
"https://tingche.csckl.com/wisdomPark/doc.html",
"simple_qr.jpg"
);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void createSimpleParkingQR(String text, String qrUrl, String outputPath) throws Exception {
int width = 600;
int height = 1024;
// 创建主图像
BufferedImage image = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);
Graphics2D g = image.createGraphics();
// 设置背景色
g.setColor(new Color(0, 123, 184));
g.fillRect(0, 0, width, height);
// 绘制文字
g.setColor(Color.WHITE);
g.setFont(new Font("宋体", Font.BOLD, 50));
FontMetrics fm = g.getFontMetrics();
int textWidth = fm.stringWidth(text);
g.drawString(text, (width - textWidth) / 2, 200);
// 生成二维码
QRCodeWriter qrCodeWriter = new QRCodeWriter();
BitMatrix bitMatrix = qrCodeWriter.encode(qrUrl, BarcodeFormat.QR_CODE, 400, 400);
BufferedImage qrImage = MatrixToImageWriter.toBufferedImage(bitMatrix);
// 绘制二维码
g.drawImage(qrImage, (width - 400) / 2, 300, null);
// 保存
ImageIO.write(image, "JPG", new File(outputPath));
g.dispose();
}
}

View File

@ -0,0 +1,62 @@
package com.ruoyi.database.util;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.database.domain.PassVehicleInfo;
import com.ruoyi.database.domain.ReleaseRequestInfo;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 简单JSON转换工具
*/
@Component
public class SimpleJsonConverter {
/**
* 将JSON转换为PassVehicleInfo
*/
public PassVehicleInfo toPassVehicleInfo(JSONObject json) {
if (json == null) return null;
PassVehicleInfo info = new PassVehicleInfo();
// 直接赋值
info.setDirect(json.getInteger("direct"));
info.setPlateNo(json.getString("plateNo"));
info.setParkCode(json.getString("parkCode"));
info.setGateCode(json.getString("gateCode"));
info.setLaneNo(json.getString("laneNo"));
info.setInUnid(json.getString("inUnid"));
info.setOutUnid(json.getString("outUnid"));
info.setInTime(json.getString("inTime"));
info.setOutTime(json.getString("outTime"));
return info;
}
/**
* 将JSON转换为ReleaseRequestInfo
*/
public ReleaseRequestInfo toReleaseRequestInfo(JSONObject json) {
if (json == null) return null;
ReleaseRequestInfo info = new ReleaseRequestInfo();
// 直接赋值
info.setDirect(json.getInteger("direct"));
info.setPlateNo(json.getString("plateNo"));
info.setPassTime(json.getString("passTime"));
info.setParkCode(json.getString("parkCode"));
info.setGateCode(json.getString("gateCode"));
info.setLaneNo(json.getString("laneNo"));
info.setInUnid(json.getString("inUnid"));
info.setOutUnid(json.getString("outUnid"));
info.setParkType(json.getInteger("parkType"));
info.setPlateType(json.getInteger("plateType"));
info.setPlateColor(json.getInteger("plateColor"));
info.setCarType(json.getInteger("carType"));
return info;
}
}