//package com.bootdo.util.kafka; // //import java.util.Properties; // //import com.alibaba.fastjson.JSONObject; //import com.bootdo.dataaccess.dianxin.domain.dfwl.DfwlVehicleRecord; //import org.apache.kafka.clients.producer.KafkaProducer; //import org.apache.kafka.clients.producer.ProducerRecord; // //public class ProducerDemo { // // private final KafkaProducer producer; // // public final static String TOPIC = "third_vehicle_access_log"; // // private ProducerDemo() { // Properties props = new Properties(); // props.put("bootstrap.servers", "32.132.2.71:9092");//xxx服务器ip //// props.put("bootstrap.servers", "xxx:9092,1xxx:9092,xxx:9092");//xxx服务器ip // props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed" // props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:) // props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 // //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms // props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 // props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。 // props.put("key.serializer", // "org.apache.kafka.common.serialization.IntegerSerializer"); // props.put("value.serializer", // "org.apache.kafka.common.serialization.StringSerializer"); // // producer = new KafkaProducer(props); // } // // public void produce() { // int messageNo = 1; // final int COUNT = 5; // // while(messageNo < COUNT) { // String key = String.valueOf(messageNo); // // DfwlVehicleRecord newBean = new DfwlVehicleRecord() ; // newBean.setEquipmentIP("192.168.11.2"); // newBean.setVillageCode("WT_320903_LYXC_2021052615"); // newBean.setEnterOrOut("0"); // newBean.setPlatePicUrl("url"); // newBean.setCarUrl("url"); // newBean.setSenceUrl("url"); // newBean.setImageFormat("base64"); // newBean.setPassTime("2021-01-01 00:00:00"); // newBean.setPlateNo("1"); // newBean.setAvObj("1"); // newBean.setAvObjName("test"); // newBean.setSource("4"); // // String data = String.format(JSONObject.toJSONString(newBean), key); // // try { // producer.send(new ProducerRecord(TOPIC, data)); // } catch (Exception e) { // e.printStackTrace(); // } // messageNo++; // } // // producer.close(); // } // //}