您当前的位置: 首页 >  ar

Bulut0907

暂无认证

  • 1浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Apache Pulsar的Function流式计算框架使用

Bulut0907 发布时间:2022-05-07 12:27:38 ,浏览量:1

目录
  • 1. 背景
  • 2. 介绍
  • 3. Pulsar Function的使用
    • 3.1 Pulsar Function的启用
    • 3.2 使用Pulsar Function
  • 4. 自己编写一个Function

1. 背景

当从Pulsar中的一个topic消费数据,进行一些简单的ETL/聚合计算,然后将数据保存到另一个topic时。这个就可以使用Funtion流式计算框架。但是复杂的计算还是需要使用Spark/Flink等计算框架

2. 介绍

Function架构从多个Input Topic中消费数据,然后将计算后的数据发送到Output Topic。同时可以将日志写入到Log Topic中,主要用于Funtion出现问题时,定位错误并调试

Pulsar Funtion的编写方式有两种

  1. 本地模式:在集群外部进行本地运行
  2. 集群模式:在集群内部运行,支持独立模式和集成模式
3. Pulsar Function的使用 3.1 Pulsar Function的启用

修改Pulsar集群所有服务器的conf/broker.conf,如下内容

functionsWorkerEnabled=true

修改Pulsar集群所有服务器的conf/functions_worker.yml,如下内容

pulsarFunctionsCluster: pulsar-cluster

然后重启broker服务

3.2 使用Pulsar Function

运行官网提供的example包,先在集群模式下创建Function,创建完成的Function是运行的

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
>   --jar examples/api-examples.jar \
>   --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
>   --inputs persistent://public/default/exclamation-input \
>   --output persistent://public/default/exclamation-output \
>   --tenant public \
>   --namespace default \
>   --name exclamation
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#

然后触发Function运行,得到结果。原理是向exclamation-input这个topic发送消息,然后消费exclamation-output这个topic的消息

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
hello world!
[root@bigdata001 apache-pulsar-2.9.1]#

查看Function状态

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions status --name exclamation 
{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReceived" : 0,
      "numSuccessfullyProcessed" : 0,
      "numUserExceptions" : 0,
      "latestUserExceptions" : [ ],
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "averageLatency" : 0.0,
      "lastInvocationTime" : 0,
      "workerId" : "c-pulsar-cluster-fw-bigdata003-8086"
    }
  } ]
}
[root@bigdata001 apache-pulsar-2.9.1]#

stop Function

[root@bigdata001 apache-pulsar-2.9.1]# 
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions stop --name exclamation
Stopped successfully
[root@bigdata001 apache-pulsar-2.9.1]# 

start Function

[root@bigdata001 apache-pulsar-2.9.1]# 
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions start --name exclamation
Started successfully
[root@bigdata001 apache-pulsar-2.9.1]# 

delete Function

[root@bigdata001 apache-pulsar-2.9.1]# 
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions delete --name exclamation
"Deleted successfully"
[root@bigdata001 apache-pulsar-2.9.1]#

**其它Function使用说明

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions [command]

command可选值:

  1. localrun:创建本地function运行
  2. get:获取function相关信息
  3. restart:重启
  4. stats:查看状态
  5. list:查看特点tenant和namespace下的所有function
4. 自己编写一个Function

需求:读取input topic,其中日期格式为yyyy/MM/dd HH/mm/ss,转换为格式yyyy-MM-dd HH:mm:ss,然后发送到output topic

  1. 添加依赖

    org.apache.pulsar
    pulsar-functions-api
    2.9.1

  1. 编写程序
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.pulsar.functions.api.{Context, Function}

import java.util.Date

class DateTransfromFunction extends Function[String, String] {

  private val oldDateFormat: FastDateFormat =
    FastDateFormat.getInstance("yyyy/MM/dd HH/mm/ss")
  private val newDateFormat: FastDateFormat =
    FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  // 每来一条消息,都会调用process进行处理
  // context表示上下文对象,用于执行一些相关的统计计算操作,以及获取相关的对象和元数据信息
  override def process(input: String, context: Context): String = {
    
    val oldDate: Date = oldDateFormat.parse(input)
    val newStringDate = newDateFormat.format(oldDate)

    newStringDate
  }

}

  1. 然后将程序进行打包,上传到Pulsar集群中的一台服务器
  2. 创建Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
> --jar /opt/pulsar_dev-1.0-SNAPSHOT.jar \
> --classname DateTransfromFunction \
> --inputs persistent://public/default/dateTransfrom-input \
> --output persistent://public/default/dateTransfrom-output \
> --tenant public \
> --namespace default \
> --name dateTransfrom
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
  1. 触发Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name dateTransfrom --trigger-value "2022/04/10 16/32/18"
2022-04-10 16:32:18
[root@bigdata001 apache-pulsar-2.9.1]#
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0365s