首页 技术 正文
技术 2022年11月14日
0 收藏 703 点赞 3,072 浏览 4371 个字

Flink–time-window 的高级用法

Flink–time-window 的高级用法

1.现实世界中的时间是不一致的,在 flink 中被划分为事件时间,提取时间,处理时间三种。
2.如果以 EventTime 为基准来定义时间窗口那将形成 EventTimeWindow,要求消息本身就应该携带 EventTime
3.如果以 IngesingtTime 为基准来定义时间窗口那将形成 IngestingTimeWindow,以 source 的 systemTime 为准。
4.如果以 ProcessingTime 基准来定义时间窗口那将形成 ProcessingTimeWindow,以 operator 的 systemTime 为准。

EventTime

1.要求消息本身就应该携带 EventTime2.时间对应关系如下 

Flink–time-window 的高级用法

需求:

EventTime 3 数据:

1527911155000,boos1,pc1,100.0 1527911156000,boos2,pc1,200.0 1527911157000,boos1,pc1,300.0 1527911158000,boos2,pc1,500.0 1527911159000,boos1,pc1,600.0 1527911160000,boos1,pc1,700.0 1527911161000,boos2,pc2,700.0 1527911162000,boos2,pc2,900.0 1527911163000,boos2,pc2,1000.0 1527911164000,boos2,pc2,1100.0 1527911165000,boos1,pc2,1100.0 1527911166000,boos2,pc2,1300.0 1527911167000,boos2,pc2,1400.0 1527911168000,boos2,pc2,1600.0
1527911169000,boos1,pc2,1300.0

代码实现:

object EventTimeExample {
def main(args: Array[String]) {
//1.创建执行环境,并设置为使用 EventTime
val env = StreamExecutionEnvironment.getExecutionEnvironment
//置为使用 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//2.创建数据流,并进行数据转化
val source = env.socketTextStream("localhost", 9999)
case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
val dst1: DataStream[SalePrice] = source.map(value => {
val columns = value.split(",")
SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
})
//3.使用 EventTime 进行求最值操作
val dst2: DataStream[SalePrice] = dst1
//提取消息中的时间戳属性
.assignAscendingTimestamps(_.time)
.keyBy(_.productName)
.timeWindow(Time.seconds(3))//设置 window 方法一
.max("price")
//4.显示结果
dst2.print()
//5.触发流计算 env.execute()
}
}

当前代码理论上看没有任何问题,在实际使用的时候就会出现很多问题,甚至接 收不到数据或者接收到的数据是不准确的;这是因为对于 flink 最初设计的时 候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念基座水印

(WaterMark);

Flink–time-window 的高级用法

水印分成两种形式:
第一种:

Flink–time-window 的高级用法

第二种:

Flink–time-window 的高级用法

所以,我们需要考虑到网络延迟的状况,那么代码中就需要添加水印操作:
object EventTimeOperator {
def main(args: Array[String]): Unit = {
//创建执行环境,并设置为使用EventTime
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)//注意控制并发数
//置为使用EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = env.socketTextStream("localhost", 9999)
val dst1: DataStream[SalePrice] = source.map(value => {
val columns = value.split(",")
SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
})
//todo 水印时间 assignTimestampsAndWatermarks
val timestamps_data = dst1.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SalePrice]{ var currentMaxTimestamp:Long = 0
val maxOutOfOrderness = 2000L //最大允许的乱序时间是2s
var wm : Watermark = null
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def getCurrentWatermark: Watermark = {
wm = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
wm
} override def extractTimestamp(element: SalePrice, previousElementTimestamp: Long): Long = {
val timestamp = element.time
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) }
})
val data: KeyedStream[SalePrice, String] = timestamps_data.keyBy(line => line.productName)
val window_data: WindowedStream[SalePrice, String, TimeWindow] = data.timeWindow(Time.seconds(3))
val apply: DataStream[SalePrice] = window_data.apply(new MyWindowFunc)
apply.print()
env.execute() }
}
case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
class MyWindowFunc extends WindowFunction[SalePrice , SalePrice , String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[SalePrice], out: Collector[SalePrice]): Unit = {
val seq = input.toArray
val take: Array[SalePrice] = seq.sortBy(line => line.price).reverse.take(1)
for(info <- take){
out.collect(info)
}
}
}

Flink–time-window 的高级用法

ProcessingTime

对于 processTime 而言,是 flink 处理数据的时间,所以就不关心发过来的数据 是不是有延迟操作,只关心数据具体的处理时间,所以不需要水印处理,操作相 对来说简单了很多

object ProcessingTimeExample {
def main(args: Array[String]) {
//创建执行环境,并设置为使用EventTime
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)//注意控制并发数
//置为使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.socketTextStream("localhost", 9999)
case class SalePrice(time: Long, boosName: String, productName: String, price: Double) val dst1: DataStream[SalePrice] = source.map(value => {
val columns = value.split(",")
SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
})
//processTime不需要提取消息中的时间
// val timestamps_data: DataStream[SalePrice] = dst1.assignAscendingTimestamps(line => line.time)
val keyby_data: KeyedStream[SalePrice, String] = dst1.keyBy(line => line.productName)
//TODO 窗口事件是:TumblingProcessingTimeWindows
val window_data: WindowedStream[SalePrice, String, TimeWindow] = keyby_data.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
val max_price: DataStream[SalePrice] = window_data.max("price")
max_price.print()
env.execute()
}
}
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,083
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,558
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,407
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,180
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,816
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,899