流计算概述
静态数据和流数据
科从概念上而言,流数据(或数据流)是指在时间分布和数量上无限的一系列动态数据集合体;数据记录是流数据的最小组成单元。流数据具有如下特征:
- 数据快速持续到达,潜在大小也许是无穷无尽的。
- 数据来源众多,格式复杂。
- 数据量大,但是不十分关注存储,一旦流数据中的某个元素经过处理,要么被丢弃,要么被归档存储。
- 注重数据的整体价值,不过分关注个别数据。
- 数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序。
批量计算和实时计算
对静态数据和流数据的处理,对应着两种截然不同的计算模式:批量计算和实时计算。
批量计算以“静态数据”为对象,可以在很充裕的时间内对海量数据进行批量处理,计算得到有价值的信息。Hadoop就是典型的批处理模型,由 HDFS 和 HBase 存放大量的静态数据,由 MapReduce 负贵对海量数据执行批量计算。
流数据则不适合采用批量计算,因为流数据不适合用传统的关系模型建模,不能把源源不断的流数据保存到数据库中,流数据被处理后,一部分进入数据库成为静态数据,其他部分则直接被丢弃。传统的关系数据库通常用于满足信息实时交互处理需求,比如零售系统和银行系统,每次有一笔业务发生,用户通过和关系数据库系统进行交互,就可以把相应记录写人磁盘,并支持对记录进行随机读写操作。但是,关系数据库并不是为存储快速、连续到达的流数据而设计的,不支持连续处理,把这类数据库用于流数据处理,不仅成本高,而且效率低。
流数据必须采用实时计算,实时计算最重要的一个需求是能够实时得到计算结果,一般要求响应时间为秒级。当只需要处理少量数据时,实时计算并不是问题;但是,在大数据时代,不仅数据格式复杂、来源众多,而且数据量巨大,这就对实时计算提出了很大的挑战。因此,针对流数据的实时计算-流计算,应运而生。
流计算的概念
总的来说,流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。对于一个流计算系统来说,它应达到如下需求。
- 高性能。处理大数据的基本要求,如每秒处理几十万条数据。
- 海量式。支持 TB 级甚至是 PB 级的数据规模。
- 实时性。必须保证一个较低的延迟时间,达到秒级别,甚至是毫秒级别。
- 分布式。支持大数据的基本架构,必须能够平滑扩展。
- 易用性。能够快速进行开发和部署。
- 可靠性。能可靠地处理流数据。
针对不同的应用场景,相应的流计算系统会有不同的需求,但是针对海量数据的流计算,无论在数据采集、数据处理中都应达到秒级别的要求。
流计算与 Hadoop
Hadoop 设计的初衷是面向大规模数据的批量处理,在使用 MapReduce 处理大规模文件时,一个大文件会被分解成许多个块分发到不同的机器上,每台机器并行运行 MapReduce 任务,最后对结果进行汇总输出。有时候,完成一个任务甚至要经过多轮的迭代。很显然,这种批量任务处理方式在时间延迟方面是无法满足流计算的实时响应需求的。这时,我们可能很自然地会想到一种“变通”的方案来降低批处理的时间延迟-将基于 MapReduce 的批量处理转为小批量处理,将输人数据切成小的片段,每隔一个周期就启动一次 MapReduce 作业。但是这种方案会存在以下问题。
- 切分成小的片段,虽然可以降低延迟,但是也增加了任务处理的附加开销,而且还要处理片段之间的依赖关系,因为一个片段可能需要用到前一个片段的计算结果。
- 需要对 MapReduce 进行改造以支持流式处理,Reduce 阶段的结果不能直接输出,而是保存在内存中。这种做法会大大增加 MapReduce 框架的复杂度,导致系统难以维护和扩展。
- 降低了用户程序的可伸缩性,因为用户必须要使用 MapReduce 接口来定义流式作业。
总之,流数据处理和批量数据处理是两种截然不同的数据处理模式,MapReduce 是专门面向静态数据的批量处理的,内部各种实现机制都为批处理做了高度优化,不适合用于处理持续到达的动态数据。
流计算的处理流程
概述
数据实时采集
- Agent:主动采集数据,并把数据推送到 Colletor 部分。
- Collector:接收多个 Agent 的数据,并实现有序、可靠、高性能的转发。
- Store:存储 Collecter 转发过来的数据。
但对于流计算,一般在 Store 部分不进行数据的存储,而是将采集的数据直接发送给流计算平台进行实时计算。
数据实时计算
实时查询服务
流计算的第三个阶段是实时查询服务,经由流计算框架得出的结果可供用户进行实时查询、展示或储存。传统的数据处理流程,用户需要主动发出查询才能获得想要的结果。而在流处理流程中,实时查询服务可以不断更新结果,并将用户所需的结果实时推送给用户。虽然通过对传统的数据处理系统进行定时查询也可以实现不断更新结果和结果推送,但通过这样的方式获取的结果仍然是根据过去某一时 刻的数据得到的结果,与实时结果有着本质的区别。
由此可见,流处理系统与传统的数据处理系统有如下不同之处。
- 流处理系统处理的是实时的数据,而传统的数据处理系统处理的是预先存储好的静态数据。
- 用户通过流处理系统获取的是实时结果,而通过传统的数据处理系统获取的是过去某时刻的结果。并且,流处理系统无需用户主动发出查询,实时查询服务可以主动将实时结果推送给用户。
开源流计算框架 Storm
Storm 的设计思想
Streams
在 Storm 对流数据 Streams 的抽象描述中,流数据是一个无限的 Tuple 序列(Tuple 即元组,是元素的有序列表,每一个 Tuple 就是一个值列表,列表中的每个值都有一个名称,并且该值可以是基本类型、字符类型、字节数组等,也可以是其他可序列化的类型)。这些 Tuple 序列会以分布式的方式并行地创建和处理。
Spouts
Storm 认为每个 Stream 都有一个源头,并把这个源头抽象为 Spouts。Spouts 会从外部读取流数据并持续发出 Tuple。
Bolts
Storm 将 Streams 的状态转换过程抽象为 Bolts。Bolts 既可以处理 Tuple,也可以将处理后的 Tuple 作为新的 Streams 发送给其他 Bolts 对 Tuple 的处理逻辑都被封装 在Bolts中,可执行过滤、聚合、查询等操作。
Topology
Storm 将 Spouts 和 Bolts 组成的网络抽象成 Topology。Topology 是 Storm 中最高层次的抽象概念,它可以被提交到 Storm 集群执行。一个 Topology 就是一个流转换图,图中节点是一个 Spout 或 Bolt,图中的边则表示 Bolt 订阅了哪个 Stream。当 Spout 或者 Bolt 发送元组时,它会把元组发送到每个订阅了该 Stream 的 Bolt 上进行处理。
在 Topology 的具体实现上,Storm 中的 Topology 定义仅仅是一些 Thrift 结构体(Thrift 是基于二进制的高性能的通信中间件),而 Thrift 支持各种编程语言进行定义,这样一来就可以使用各种编程语言来创建、提交 Topology。
Stream Groupings
Storm 中的 Stream Groupings 用于告知 Topology 如何在两个组件间(如 Spout 和 Bolt 之间,或者不同的 Bolt 之间)进行 Tuple 的传送。一个 Topology 中 Tuple 的流向中,箭头表示 Tuple 的流向,而圆圈则表示任务,每一个 Spout 和 Bolt 都可以有多个分布式任务,一个任务在什么时候、以什么方式发送 Tuple 就是由 Stream Groupings 来决定的。
目前,Storm 中的 Stream Groupings 有如下 6 种方式。
- ShuffleGrouping:随机分组,随机分发 Stream 中的 Tuple,保证每个 Bolt 的 Task 接收 Tuple数量大致一致。
- FieldsGrouping:按照字段分组,保证相同字段的 Tuple 分配到同一个 Task 中。
- AllGrouping:广播发送,每一个 Task 都会收到所有的 Tuple。
- GlobalGrouping:全局分组,所有的 Tuple 都发送到同一个 Task 中。
- NonGrouping:不分组,和 ShuffleGrouping 类似,当前 Task 的执行会和它的被订阅者在同一个线程中执行。
- DireetGrouping:直接分组,直接指定由某个 Task 来执行 Tuple 的处理。
Storm 的框架设计
Storm 运行在分布式集群中,其运行任务的方式与 Hadoop 类似:在 Hadoop 上运行的是 MapReduce 作业,而在 Storm 上运行的是”Topology”。但两者的任务大不相同,其中主要的不同是一个 MapReduce 作业最终会完成计算并结束运行,而一个 Topology 将持续处理消息(直到入为终止)。
Storm 集群采用”Mater-Worker”的节点方式,其中 Master 节点运行名为”Nimbus”的后台程序(类似 Hadoop 中的”JobTracker”), 负责在集群范围内分发代码、为 Worker 分配任务和监测故障。而每个 Worker 节点运行名为”Supervisor”的后台程序,负责监听分配给它所在机器的工作,即根据 Nimbus 分配的任务来决定启动或停止 Worker 进程。
Storm 采用了 Zookeeper 来作为分布式协调组件,负责 Nimbus 和多个 Supervisor 之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑,并由多个 Supervisor 完成)。
此外,Nimbus 后台进程和 Supervisor 后台进程都是快速失败(Fail-fast)和无状态(Stateless)的,Master 节点并没有直接和 Worker 节点通信,而是借助 Zookeeper 将状态信息存放在 Zookeeper 中或本地磁盘中,以便节点故障时进行快速恢复。这意味着,若 Nimbus 进程或 Supervisor 进程终止后,一旦进程重启,它们将恢复到之前的状态并继续工作。这种设计使 Storm 极其稳定。
基于这样的架构设计,Storm 的工作流程包含4个过程。
- 客户端提交 Topology 到 Storm 集群中。
- Nimbus 将分配给 Supervisor 的任务写人 Zookeeper。
- Supervisor 从 Zookeeper 中获取所分配的任务,并启动 Worker 进程。
- Worker 进程执行具体的任务。
Spark Streaming
Spark Streaming 是构建在 Spark 上的实时计算框架,它扩展了 Spark 处理大规模流式数据的能力。Spark Streaming 可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。
Spark Streaming 设计
Spark Streaming 是 Spark 的核心组件之一,为 Spark 提供了可拓展、高吞吐、容错的流计算能力。Spark Streaming 可整合多种输人数据源,如 Kafka、Flume、HDFS,甚至是普通的 TCP 套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。
Spark Streaming 的基本原理是将实时输人数据流以时间片(秒级)为单位进行拆分,然后经 Spark 引擎以类似批处理的方式处理每个时间片数据。
Spark Streaming 最主要的抽象是 DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming 的输人数据按照时间片(如 1s)分成一段一段的 DStream,每一段数据转换为 Spark 中的 RDD,并且对 DStream 的操作都最终转变为对相应的 RDD的操作。
例如,进行单词统计时,每个时间片的数据(存储句子的 RDD)经 flatMap 操作,生成了存储单词的 RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。
Spark Streaming 与 Storm 的对比
Spark Streaming 和 Storm 最大的区别在于,Spark Streaming 无法实现毫秒级的流计算,而 Storm 则可以实现毫秒级响应。
Spark Streaming 无法实现毫秒级的流计算,是因为其将流数据按批处理窗口大小(通常在 0.5~2s 之间)分解为一系列批处理作业,在这个过程中会产生多个 Spark 作业,且每一段数据的处理都会经过 Spark DAG 图分解、任务调度过程,因此无法实现毫秒级相应。Spark Streaming 难以满足对实时性要求非常高(如高频实时交易)的场景,但足以胜任其他流式准实时计算场景。
相比之下,Storm 处理的单位为 Tuple,只需要极小的延迟。Spark Streaming 构建在 Spark 上,一方面是因为 Spark 的低延迟执行引擎(100ms 左右)可以用于实时计算,另一方面,相比于 Storm,RDD 数据集更容易做高效的容错处理。此外,Spark Streaming 采用的小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法,因此方便了一些需要历史数据和实时数据联合分析的特定应用场合。