由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Kafka on Pulsar(KOP)

中间件 西门飞冰 1349℃
[隐藏]

1.KOP 介绍

KoP(Kafka on Pulsar)通过在 Pulsar 代理上引入 Kafka 协议处理程序,为 Apache Pulsar 带来了原生的Apache Kafka协议支持。通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到Pulsar,而无需修改代码。这使 Kafka 应用程序能够利用 Pulsar 的强大功能.

KoP 作为 Pulsar协议处理程序插件实现,协议名称为“kafka”,在 Pulsar broker 启动时加载。它通过在Apache Pulsar上提供原生 Kafka 协议支持,这样可以大大降低学习Pulsar的成本。基于KOP方案, 整合两个流行的事件流生态系统软件。使用ApachePulsar 构建真正统一的事件流平台,以加速实时应用程序和服务的开发。

image-20221014154148377

2.KOP 配置

1、下载KOP包

wget https://github.com/streamnative/kop/releases/download/v2.8.1.30/pulsar-protocol-handler-kafka-2.8.1.30.nar

2、将KOP NAR包上传到Pulsar的protocols目录中, 如果没有此目录, 直接创建即可

3、在broker配置中设置KOP相关信息

]# vim conf/broker.conf
# 添加以下内容
messagingProtocols=kafka
protocolHandlerDirectory=./protocols
kafkaListeners=PLAINTEXT://pulsar01:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
# 修改以下配置
allowAutoTopicCreationType=partitioned
brokerDeleteInactiveTopicsEnabled=false

4、剩余broker节点也需要上传KOP nar包并修改配置文件,其中kafkaListeners需要修改为自己的IP地址

5、重启各个Broker节点

3.KOP测试

当pulsar基于Kafka协议后,此时我们完全可以使用Kafka的相关命令或者API来进行生产和消费操作,可以无痕迁移到pulsar

Kafka生产者发送数据:

image-20221014162346804

Kafka 消费者消费数据:

image-20221014162816814

4.KOP 使用

1、maven依赖导入:有了KOP直接导入Kafka的jar包即可

 <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
</dependency>

2、生产者代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducer_KOP {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "pulsar01:9092,pulsar02:9092,pulsar03:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("kop-topic", Integer.toString(i), Integer.toString(i)));
        }

        producer.close();
    }
}

3、消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumer_KOP {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "pulsar01:9092,pulsar02:9092,pulsar03:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("kop-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

4、输出结果:

image-20221014164504911

转载请注明:西门飞冰的博客 » Kafka on Pulsar(KOP)

喜欢 (1)or分享 (0)