由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Pulsar function

中间件 西门飞冰 2970℃
[隐藏]

1.function 背景

当我们进行流式处理的时候,很多情况下,我们的需求可能只是下面这些简单的操作:简单的ETL 操作\聚合计算操作等相关服务。

但为了实现这些功能,我们不得不去部署一整套 流处理服务(spark、flink等)。但是我们仅仅需要这些服务的一小部分功能,部署流处理引擎的成本可能比用户开发这个功能本身更困难。

基于这个原因,pulsar设计并实现了Pulsar Functions,在Pulsar functions中,用户只需要关心计算逻辑本身,而不需要去了解或者部署流处理服务,当然也可以将pulsar function与现有的流处理服务一起使用。

也就是说,在Pulsar function中,无需部署流处理服务,就可以让用户完成轻量化的流计算任务,并且部署简单,运维简单。

2.function 介绍

Pulsar Functions 的设计灵感来自于 Heron 这样的流处理引擎,Pulsar Functions 将会拓展Pulsar和整个消息领域的未来。使用 Pulsar Functions,用户可以轻松地部署和管理 function,通过function 从Pulsartopic 读取数据或者生产新数据到 Pulsar topic。

引入 Pulsar Functions 后,Pulsar 成为统一的消息投递/计算/存储平台。只需部署一套Pulsar 集群,便可以实现一个计算引擎,页面简单,操作便捷。

image-20221014095209077

Input topic 是数据的来源,在 Pulsar Functions 中,所有的数据均来自 input topic。当数据进入inputtopic 中,Pulsar Functions 充当消费者的角色,去 input topic 中消费消息;当从input topic 中拿到需要处理的消息时,Pulsar Functions 充当生产者的角色往 output topic 或者 log topic 中生产消息。

Output topic 和 log topic 都可以看作是 Pulsar Functions 的输出。从是否会有output 这个点来看,我们可以将 Pulsar Functions 分为两类,当有输出的时候 Pulsar Functions 会将相应的output 输出到outputtopic中。log topic 主要存储用户的日志信息,当 Pulsar Functions 出现问题时,方便用户定位错误并调试。

综上所述:我们不难看出 Pulsar Functions 充当了一个消息处理和转运的角色。

在使用Pulsar Functions, 可以使用不同的语言来编写, 比如Python,Java,Go等. 编写方式主要两种

  • 本地模式: 集群外部, 进行本地运行
  • 集群模式: 集群内部运行(支持独立模式和集成模式)

3.function的基本使用操作

3.1.开启function功能

pulsar 默认没有开启function功能,需要手动开启并重启集群

# vim conf/broker.conf 
functionsWorkerEnabled=true
# ./bin/pulsar-daemon stop broker
# ./bin/pulsar-daemon start broker

注意:所有broker节点都需要调整配置并重启

3.2.测试function使用

1、导入自带的测试jar包

./bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--tenant public \
--namespace default \
--name exclamation

2、检查是否按照预期触发函数运行

./bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"

3、functions属性说明

bin/pulsar-admin functions
属性说明:
	functions:
		可选值:
				localrun: 创建本地function进行运行
				create: 在集群模式下创建
				delete: 删除在集群中运行的function
				get: 获取function的相关信息
				restart: 重启
				stop : 停止运行
				start: 启动
				status: 检查状态
				stats: 查看状态
				list: 查看特定租户和名称空间下的所有的function
--classname: 设置function执行类
--jar 设置function对应的jar包
--inputs : 输入的topic
--output : 输出的topic
--tenant : 设置function运行在那个租户中
--namespace: 设置function运行在那个名称空间中
--name : 定义function的名称

4.function的案例

案例需求: 使用Pulsar Function 读取某一个Topic中日期(格式为: yyyy/MM/dd HH/mm/ss)数据, 读取后, 对数据进行日期转换(格式为:yyyy-MM-ddHH:mm:ss )

1、创建一个maven工程,并引入依赖

    <build>
        <finalName>Pulsar-FunctionData</finalName>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-functions-api</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>

2、编写逻辑代码

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.text.SimpleDateFormat;
import java.util.Date;

public class FunctionDate implements Function<String,String> {
    //  此方法, 没接收到一条数据, 就会调用一次  process方式, 其中
    //  参数1: 输入的消息数据
    //  参数2: Context 表示上下文对象,用于执行一些相关的统计计算操作, 以及获取到相关的对象以及元数据信息
    private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss");
    private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public String process(String input, Context context) throws Exception {
        Date oldDate = format1.parse(input);
        return format2.format(oldDate);
    }
}

3、打包上传到服务器pulsar 安装目录的functions目录下

4、构建测试

./bin/pulsar-admin functions create \
--jar functions/Pulsar-FunctionData.jar \
--classname com.fblinux.functions.FunctionDate \
--inputs persistent://public/default/fm-input \
--output persistent://public/default/fm-output \
--tenant public \
--namespace default \
--name FunctionData

启动function测试

./bin/pulsar-admin functions trigger --name FunctionData --trigger-value "2022/10/14 19/30/30"

image-20221014105852901

也可以通过代码向input对应的Topic发送消息, 并消费output对应的Topic中数据, 也是可以看到function可以正常处理的

转载请注明:西门飞冰的博客 » Pulsar function

喜欢 (0)or分享 (0)