Kafka Java API
Kafka是一个基于ZooKeeper的高吞吐量低延迟的分布式的发布与订阅消息系统
Kafka提供了Java客户端API进行消息的创建与接收。
依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
创建生产者:
public class MyProducer2 {
public static void main(String[] args) {
//1. 使用properties定义kafka环境属性
Properties props = new Properties();
//设置生产者 broker服务器连接地址
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos01:9092,centos02:9092,centos03:9092");
//设置序列化key程序类
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置序列化value程序类,此处不一定非得是Integer,也可以是String
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
//2. 定义消息生产者对象,依靠此对象可以进行消息的传递
Producer<String, Integer> producer = new KafkaProducer<String, Integer>(props);
//3. 循环发送10条消息
for (int i = 0; i < 10; i++) {
//发送消息,第一个参数为主题名称,第二个参数为消息的key值,第三个参数为消息的value值
//此方式只负责发送消息,不知道是否发送成功
// producer.send(new ProducerRecord<String, Integer>("topictest", "hello kafka " + i, i));
//同步方式发送消息
//使用生产者对象的send()方法发送消息,会返回一个Future对象,然后调用Future对象的get()方法进行等待,就可以知道消息是否发送成功。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量。最简单的同步发送消息的代码如下:
/*try {
producer.send(new ProducerRecord<String, Integer>("topictest", "hello kafka " + i, i)).get();
} catch (Exception e) {
e.printStackTrace();
}*/
//异步方式发送消息,可以指定一个回调函数,服务器返回响应时会调用该函数。我们可以在该函数中对一些异常信息进行处理,比如记录错误日志或者把消息写入“错误消息”文件以便日后分析。
producer.send(new ProducerRecord<String, Integer>("topictest", "hello kafka " + i, i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(recordMetadata.toString());
if (e != null) {
e.printStackTrace();
}
}
});
}
//4.关闭生产者
producer.close();
}
}
创建消费者:
public class MyConsumer {
public static void main(String[] args) {
//1. 使用Properties定义配置属性
Properties props = new Properties();
//设置消费者 Broker服务器的连接地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos01:9092,centos02:9092,centos03:9092");
//设置反序列化key的程序类,与生产者对应
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//设置反序列化value的程序类,与生产者对应
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
//设置消费者组ID,即组名称,值可自定义。组名称相同的消费者进程属于同一个消费者组。
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "groupid-1");
//2. 定义消费者对象
Consumer<String, Integer> consumer = new KafkaConsumer<String, Integer>(props);
//3. 设置消费者读取的主题名称,可以设置多个
consumer.subscribe(Arrays.asList("topictest"));
//4. 不停的读取消息
while (true) {
//拉取消息,并设置超时时间为10秒
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, Integer> record : records) {
//打印消息关键信息
System.out.println("key:" + record.key() + ",value:" + record.value() + ",partition:" + record.partition() + ",offset:" + record.offset());
}
}
}
}
书籍:Hadoop大数据技术开发实战 11.9 Java API操作
https://gitee.com/caoyeoo0/xc-springboot/blob/kafka/src/main/java/com/xc/xcspringboot/test/MyProducer2.java

浙公网安备 33010602011771号