Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。
Flink提供了诸多高抽象层的API以便用户编写分布式任务:
打开百度语音模式,反复熟记Flink流计算常用DataSet算子DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。
DataStream是Flink的较低级API,用于进行数据的实时处理任务,可以将该编程模型分为DataSource、Transformation、Sink三个部分;
Flink可以使用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction ,来自定义非并行的source,或者实现 ParallelSourceFunction 接口,或者扩展 RichParallelSourceFunction 来自定义并行的 source。
Flink在流处理上的source和在批处理上的source基本一致。大致有4大类:
1.基于 本地集合的source(Collection-based-source)。
2.基于 文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。
3.基于 网络套接字的source(Socket-based-source)- 从 socket 读取,元素可以用分隔符切分。
4.自定义的source(Custom-source)。
下面使用addSource将Kafka数据写入Flink为例:
如果需要外部数据源对接,可使用addSource,如将Kafka数据写入Flink, 先引入依赖,
将Kafka数据写入Flink,
或者基于网络套接字的,
val source = env.socketTextStream("IP", PORT)
Transformation操作将1个或多个DataStream,转换为新的DataStream,多个转换组合成复杂的数据流拓扑,如下图所示,DataStream会由不同的Transformation操作、转换、过滤、聚合成其他不同的流,从而完成业务要求;
1. map:DataStream -> DataStream。
一个数据元生成一个新的数据元。将输入流的元素翻倍:
dataStream.map { x => x * 2 }。
2. FlatMap:DataStream -> DataStream。
一个数据元DataStream 生成多个数据元DataStream (可以为0)。如将句子分割为单词的flatmap函数:
dataStream.flatMap { str => str.split(" ") }
3. Filter:DataStream -> DataStream。
计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:
dataStream.filter { _ != 0 }
4. KeyBy:DataStream -> KeyedStream。
逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。
在内部,keyBy()是使用散列分区实现的。指定键有不同的方法:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
5. Reduce:KeyedStream -> DataStream。
KeyedStream 元素滚动执行 Reduce。将当前数据元与最新的一个 Reduce 值组合,作为新值发送。创建 key 的值求和:
keyedStream.reduce { _ + _ }
6. Fold(废弃):KeyedStream -> DataStream。
带有初始值的键控数据流上的“滚动”折叠。将当前元素与上一个折叠值组合在一起并发出新值。
折叠函数,应用于序列(1,2,3,4,5)时,会发出序列“ start-1”,“ start-1-2”,“ start-1-2-3”,...根据相同的Key进行不断的折叠,新的key会进行新的折叠。
val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })
// 解释:当上述代码应用于序列(1,2,3,4,5)时,输出结果“start-1”,“start-1-2”,“start-1-2-3”,...
7. Aggregations:KeyedStream → DataStream。
在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同):
keyedStream.sum(0);
keyedStream.min(0);
keyedStream.max(0);
keyedStream.minBy(0);
keyedStream.maxBy(0);
8.Window:KeyedStream → WindowedStream。
可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组,其中可以搜索“Flink 中极其重要的 Time 与 Window 详细解析”。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
9. WindowAll:DataStream → AllWindowedStream。
Windows可以在常规DataStreams上定义。Windows会根据某些特征(例如,最近5秒钟内到达的数据)对所有流事件进行分组。警告:*在许多情况下,这是非并行转换。所有记录将被收集到windowAll运算符的一项任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
10. Window Apply:WindowedStream → DataStream AllWindowedream → DataStream。
将一般功能应用于整个窗口。
下面是一个手动求和窗口元素的函数,注意:如果使用windowAll转换,则需要使用AllWindowFunction代替。
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
11. Window Reduce:WindowedStream → DataStream。
将功能化约简函数应用于窗口,并返回缩减后的值。
windowedStream.reduce { _ + _ }
12. Window Fold:WindowedStream → DataStream。
将折叠函数应用于窗口并返回折叠值:
Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
// 上述代码应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”
13. Union:DataStream* → DataStream。
两个或多个数据流的并集,创建一个包含所有流中所有元素的新流。注意:如果您将数据流与其自身合并,则在结果流中每个元素将获得两次。
dataStream.union(otherStream1, otherStream2, ...)
14. Window Join:DataStream,DataStream → DataStream。
在给定Keys和公共窗口上连接两个数据流:
dataStream.join(otherStream)
.where(
).equalTo( ) .window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...})
15. Interval Join
在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound。
am.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2))
.upperBoundExclusive(true)
.lowerBoundExclusive(true)
.process(new IntervalJoinFunction() {...})
16. Window CoGroup:DataStream,DataStream → DataStream。
在给定Keys和公共窗口上对两个数据流进行Cogroup:
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...})
17. Connect:DataStream,DataStream → ConnectedStreams。
“连接”两个保持其类型的数据流,从而允许两个流之间共享状态:
注意:1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2. Connect只能操作两个流,Union可以操作多个。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
18. CoMap,CoFlatMap:ConnectedStreams → DataStream。
与连接的数据流上的map和flatMap相似.
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
19. Split:DataStream → SplitStream。
Split the stream into two or more streams according to some criterion.
根据某些标准将流拆分为两个或更多个流:
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
20. Select:SplitStream → DataStream。
从拆分流中选择一个或多个流.
21.Iterate:DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。
这对于定义不断更新模型的算法特别有用。以下代码从流开始,并连续应用迭代主体。
大于0的元素将被发送回反馈通道,其余元素将被转发到下游。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
22.Custom partitioning:DataStream → DataStream。
使用用户定义的分区程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
23.Random partitioning:DataStream → DataStream。
根据均匀分布对元素进行随机划分.
dataStream.shuffle()
24.Rebalancing (Round-robin partitioning):DataStream → DataStream。
分区元素轮循,每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用。
dataStream.rebalance()
25.Rescaling:DataStream → DataStream。
将元素循环地分区到下游操作的子集。
如果您希望拥有管道,例如,从源的每个并行实例散开到几个映射器的子集,以分配负载,但又不希望 rebalance() ,引起完全的重新平衡,则这很有用。
这将仅需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,例如TaskManager的插槽数。
上游操作向其发送元素的下游操作的子集取决于两个上游操作的并行度和下游操作。例如,如果上游操作具有并行性2,而下游操作具有并行性4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。
另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。
彼此不是整数倍,一个或几个下游操作将具有与上游操作不同的输入数量。请参见此图以查看上例中的连接模式:
dataStream.rescale()
Task chaining and resource groups
26.Broadcasting:DataStream → DataStream。
向每个分区广播元素。
dataStream.broadcast()
27.Extract Timestamps:DataStream → DataStream。
从记录中提取时间戳,以便与使用事件时间语义的窗口一起使用。
stream.assignTimestamps { timestampExtractor }
dataStream支持将数据输出到:本地文件,本地集合,HDFS。
除此之外,还支持:sink到kafka,sink到mysql,sink到redis。
下面以sink到kafka为例:
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。
理论上Flink的内存管理分为三部分:
Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。
User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。