Spark 流计算状态管理进化史

·206 字·1 分钟
Spark
n3xtchen
作者
n3xtchen
Sharing Funny Tech With You

状态(State)在流计算是一个宽泛概念的词汇;继续之前,我们先明确下个定义。状态(State)字面意思就是“中间信息(Intermediate Information)”。

从数据角度看,流计算主要有两种处理方法:

  • 无状态(Stateless):每一个进入的记录独立于其他记录。不同记录间没有任何关系,他们可以独立处理和持久化。例如:mapfliter、静态数据 join 等等。
  • 有状态(Stateful):处理进入的记录依赖于之前记录处理的结果。因此,我们需要维护不同数据处理之间的中间信息。每一个进入的记录都可以读取和更新这个信息。我们把这个中间信息称作状态(State)。例如,独立键的计数聚合,去重等等。

状态处理也分为两种:

  1. 过程状态:它是流计算的元数据(metadata);追踪历史至今被处理的数据。在流的世界中,我们称之为 Checkpoint 或者保存数据的偏移(offset)。为了防止重启,升级或者任务失败,它需要容错性(fault tolerance)。这个信息是任何高可靠流处理的基本,同时被无状态和状态处理需要。
  2. 数据状态:这些中间数据源自数据(目前为止处理过的),它需要在记录之间维护。这个只在 Stateful 模式下,需要处理。

状态储存方式的选择 #

为了维护流处理中的状态,我们需要选择一种存储器方式,当然有很多可选的是方式,归纳下常用的几种:

  • 内存,如 HashMap
  • 文件系统,如 HDFS
  • 分布式数据库,如 *Cassandra
  • 嵌入式存储,最流行的单属 FacebookRocksDb

方式可以多种多样,但是稳定可靠是关键,效率同样也是考量的标准之一,整个流计算的发展就是围绕着稳定和高效不断迭代进化的。

接下来,我们要进入正题了,介绍下 Spark 中几种状态的处理机制和存储方式。

老派:DStream/Spark Streaming #

Spark Streaming 或者 Structured StreamingDStream 中,每一个微批处理状态都会和 Checkpoint* 元数据中一起维护;当每个批处理任务结束的时候,同步(synchronous)完成状态的维护(即使该微批处理没有任何状态操作),同时加大了任务的延迟。

状态没有进行增量持久化,每一次都是全量快照,导致不必要的开销(主要是序列化和持久化(I/O))。

当数据很大的时候,这个设计带来问题将会更加凸显。

我们生活在一个不断进化的世界中。因为旧的东西不够好,导致有新的东西不断涌现,取而代之。

新潮:Structured Streaming #

Structured StreamingSpark 的第二代基于 SQL 的流计算)的出现,除了带来更多的新特性,同时解决了上代遗留下来的坑,状态管理就是其中之一。

状态管理从 Checkpoint* 中解藕出来,不再是 tasks/jobs 的一部分;是异步(asynchronous)的,同时支持增量持久化。

现在,让我们深入了解下 Spark 2.3 的状态管理机制。

Structured Streaming 有且只有提供了一种默认的状态存储:基于 HDFS 的状态管理(其实,Databrick(商业)、Quole(开源)已经提供了基于 RocksDB 的实现,大家可以了解下)。

  • 每一个聚合的 RDD 在各自执行器(Executor)的内存中维护一个版本化键值存储结构(内存中的 HashMap)。这个存储是唯一的:checkpointPath + operatorId + partitionId

    • checkpointPath:流查询的 Checkpoint* 路径
    • operatorId:流查询中的每一个聚合操作(如 groupBy)内部会被分配一个整型值
    • partitionId:聚合操作之后会生成聚合 RDD 分区 ID
  • 版本的值就是 batchId

  • 第一个之外的每一个微批(micro-batch)的分区都会先从前一个微批处理器拷贝 HashMap (同一分区的最后一个微批(micro-batch)),并更新它。微批处理结束后会更新后的 HashMap 将会传给下一个微批(micro-batch),就这样不断重复的执行下去知道处理结束;

  • 同样,某个微批(micro-batch)的某个分区,会有一个文件以容错(fault-tolerant)的方式记录该微批处理的变更。这个文件称之为 版本化增量文件。它只包含相关分区的特定批次的状态变更。因此每个批次的分区数和 增量文件 数是相等的;它对应的路径:checkpointPath/state/operatorId/partitionId/${版本}.delta

  • 分区任务计划在执行器(Executor) 上执行,在该执行程序中存在与以前的 microBatch 相同的分区的 HashMap。这个是由 Driver 决定的,在执行器(Executor)上保存有关于状态存储的足量数据;

  • 在微批处理的任务中,键的变更异步执行的,并且具有事务,同时会输出版本化的增量文件;

  • 关于状态管理的其他操作(如快照,清楚、删除,文件的管理等等)在 执行器(Executor) 的隔离守护线程(称之为 MaintenanceTask)中异步完成的。一个 执行器(Executor) 一个线程;

  • 如果任务成功了,输出流将会关闭,版本增量文件将会提交并持久化到文件系统(如HDFS)中。内存中版本化的 HashMap 会被加到提交过 HashMap 列表中,该分区的版本号会加1。新的版本ID将会在该分区的下一个批次中使用;

  • 如果分区任务失败了,相关内存中 HashMap 会被抛弃,增量文件输出流会被切削。这样,不会有任何的变化会在内存或者文件中被记录。整个任务将会重试;

  • 就像之前说的,每一个 执行器(Executor) 都有一个独立线程(MaintanenceTask),他会在等间隔时长(默认 60 秒)执行,为每个分区完成的状态进行异步地快照,将最新的版本 HashMap 持久化到磁盘中(文件名:version.snapshot,路径:checkpointLocation/state/operatorId/partitionId/${version}.snapshot)。一次没几个批次,就有一个分区的快照文件被这个线程创建,代表该版本的完整状态。这个线程会删掉比这个版本旧的增量和快照文件;

  • 注意:相同的执行器(Executor)不会有多线程来把状态写到增量文件中。但是在特定场景(如果推测执行)下可以有多个执行器(Executor)同时将同一个状态载入到内存中。这个意味着只能有一个线程写内存中的 HashMap,但是可以有不同 执行器(Executor) 的多个线程写到同一个增量文件中。

当前实现的优点和缺点 #

如大家所知,软件开发没有银弹。每一种设计都有他们的优缺点。

优点:

  • 有更好拓展性的抽象和接口。可以实现自己想要的任何存储方式,如现在流行的 RocksDB 取代内存的方式已经在付费版的 Spark 中得到实现;
  • 不像早期的 DStream,高效,没有和执行器(Executor)任务强绑定;
  • 增量状态的 Checkpoint

缺点:

  • 默认的状态存储方式占用了执行器(Executor) 内存。执行器(Executor)任务的内存没有和状态存储隔离。当运行任务、状态数据成倍增长,超过执行器(Executor)的可用内存时,将会导致垃圾回收(GC),甚至内存溢出;
  • 每一个执行器(Executor)只有单线程负责快照和数据清理。对于状态数据量过大或者单执行器(Executor)分区数过多的时候,这个线程将会不堪重负,很有可能会导致延迟。

和其他流系统对比 #

如果不和其他的流计算框架对比,这篇文章显得不完整。像 FlinkKafka Streams 这样的开源流计算框架使用 RocksDB 来放开内存限制。RocksDB 解决了内存问题,但是在节点失败的时候,没有容错性。

Kafka Streams 使用 RocksDB 作为无限制的本地存储。对于容错性,Kafka Streams 依赖于 Kafka。他们为每一个更新写变更日志到内部的 Kafka 主题中,它会进行压缩,最终变成单个快照日志文件。防止失败和重启,RocksDB 可以从这个 Kafka 主题中恢复数据。

Flink 则使用另一种方式,为了容错,独立实现了快照策略。 Flink 会定时快照 RocksDB 的数据,并拷贝到可靠的文件系统中(如 HDFS)。RocksDB 会在失败的时候从最新的快照还原。最后一次快照和失败之间将会存在一些数据将不会固化到快照中。为了还原,Flink 将从快照的时间点开始处理数据,保证未考虑的数据会被重新处理。记住,只有像 KafkaKinesis 这类可回放的数据源才能实现这些。

Storm/Triden 依赖外部的存储(如 Cassandra/Redis)作为状态管理,来解决可依赖额容错,但是规模化后会影响性能。外部存储会大量的网络调用,将会对流处理增加延迟。这就是为什么大部分流系统使用嵌入式本地存储。

结语 #

对比旧的 DStream 实现,Structured Streaming 当前状态管理的实现已经有很大的进步。他解决的早期的问题,是一个很成熟的设计。为了和其他流系统一较高下,就需要一个稳定的状态存储的实现。

引用自: State Management in Spark Structured Streaming