目录
1. 背景
- 1. 背景
- 2. 介绍
- 3. Pulsar Function的使用
- 3.1 Pulsar Function的启用
- 3.2 使用Pulsar Function
- 4. 自己编写一个Function
当从Pulsar中的一个topic消费数据,进行一些简单的ETL/聚合计算,然后将数据保存到另一个topic时。这个就可以使用Funtion流式计算框架。但是复杂的计算还是需要使用Spark/Flink等计算框架
2. 介绍从多个Input Topic中消费数据,然后将计算后的数据发送到Output Topic。同时可以将日志写入到Log Topic中,主要用于Funtion出现问题时,定位错误并调试
Pulsar Funtion的编写方式有两种
- 本地模式:在集群外部进行本地运行
- 集群模式:在集群内部运行,支持独立模式和集成模式
修改Pulsar集群所有服务器的conf/broker.conf,如下内容
functionsWorkerEnabled=true
修改Pulsar集群所有服务器的conf/functions_worker.yml,如下内容
pulsarFunctionsCluster: pulsar-cluster
然后重启broker服务
3.2 使用Pulsar Function运行官网提供的example包,先在集群模式下创建Function,创建完成的Function是运行的
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
> --jar examples/api-examples.jar \
> --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
> --inputs persistent://public/default/exclamation-input \
> --output persistent://public/default/exclamation-output \
> --tenant public \
> --namespace default \
> --name exclamation
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
然后触发Function运行,得到结果。原理是向exclamation-input这个topic发送消息,然后消费exclamation-output这个topic的消息
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
hello world!
[root@bigdata001 apache-pulsar-2.9.1]#
查看Function状态
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions status --name exclamation
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-pulsar-cluster-fw-bigdata003-8086"
}
} ]
}
[root@bigdata001 apache-pulsar-2.9.1]#
stop Function
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions stop --name exclamation
Stopped successfully
[root@bigdata001 apache-pulsar-2.9.1]#
start Function
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions start --name exclamation
Started successfully
[root@bigdata001 apache-pulsar-2.9.1]#
delete Function
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions delete --name exclamation
"Deleted successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
**其它Function使用说明
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions [command]
command可选值:
- localrun:创建本地function运行
- get:获取function相关信息
- restart:重启
- stats:查看状态
- list:查看特点tenant和namespace下的所有function
需求:读取input topic,其中日期格式为yyyy/MM/dd HH/mm/ss,转换为格式yyyy-MM-dd HH:mm:ss,然后发送到output topic
- 添加依赖
org.apache.pulsar
pulsar-functions-api
2.9.1
- 编写程序
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.pulsar.functions.api.{Context, Function}
import java.util.Date
class DateTransfromFunction extends Function[String, String] {
private val oldDateFormat: FastDateFormat =
FastDateFormat.getInstance("yyyy/MM/dd HH/mm/ss")
private val newDateFormat: FastDateFormat =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
// 每来一条消息,都会调用process进行处理
// context表示上下文对象,用于执行一些相关的统计计算操作,以及获取相关的对象和元数据信息
override def process(input: String, context: Context): String = {
val oldDate: Date = oldDateFormat.parse(input)
val newStringDate = newDateFormat.format(oldDate)
newStringDate
}
}
- 然后将程序进行打包,上传到Pulsar集群中的一台服务器
- 创建Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
> --jar /opt/pulsar_dev-1.0-SNAPSHOT.jar \
> --classname DateTransfromFunction \
> --inputs persistent://public/default/dateTransfrom-input \
> --output persistent://public/default/dateTransfrom-output \
> --tenant public \
> --namespace default \
> --name dateTransfrom
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
- 触发Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name dateTransfrom --trigger-value "2022/04/10 16/32/18"
2022-04-10 16:32:18
[root@bigdata001 apache-pulsar-2.9.1]#