您当前的位置: 首页 >  flink

Bulut0907

暂无认证

  • 4浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink1.14版本DataSet已废弃,Streaming和Batch统一改用DataStream

Bulut0907 发布时间:2022-02-11 17:15:54 ,浏览量:4

目录
  • 1. Streaming模式
    • 1.1 代码Demo
    • 1.2 flink run命令
  • 2. Batch模式
    • 2.1 代码Demo
    • 2.2 flink run命令

1. Streaming模式 1.1 代码Demo
package apiTest

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}


object StreamWordCount {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    val ds: DataStream[(String, Int)] = senv.addSource(new WordSourceFunction())
      .map(word => (word, 1))
      .keyBy(_._1)
      .sum(1)

    ds.print("ds")

    // 懒加载,调用execute执行
    senv.execute("StreamWordCount")
  }

}

执行结果

ds:7> (flink,1)
ds:3> (hello,1)
ds:1> (table,1)
ds:1> (table,2)
ds:7> (batch,1)
ds:7> (flink,2)
ds:3> (hello,2)
ds:3> (hello,3)
....省略部分......
1.2 flink run命令
bin/flink run -Dexecution.runtime-mode=STREAMING examples/streaming/WordCount.jar
2. Batch模式 2.1 代码Demo
package apiTest

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment,createTypeInformation}

object BatchWordCount {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)

    val text: DataStream[String] = env.fromElements("flink batch demo", "batch demo", "demo")
    val ds: DataStream[(String, Int)] = text.flatMap(_.split(" "))
      .map(word => (word, 1))
      .keyBy(_._1)
      .sum(1)

    ds.print("ds")

    env.execute("BatchWordCount")
  }

}

执行结果:

ds:7> (batch,2)
ds:7> (flink,1)
ds:3> (demo,3)
2.2 flink run命令
bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0373s