Wetts's blog

Stay Hungry, Stay Foolish.

0%

流计算概述

静态数据和流数据

科从概念上而言,流数据(或数据流)是指在时间分布和数量上无限的一系列动态数据集合体;数据记录是流数据的最小组成单元。流数据具有如下特征:

  1. 数据快速持续到达,潜在大小也许是无穷无尽的。
  2. 数据来源众多,格式复杂。
  3. 数据量大,但是不十分关注存储,一旦流数据中的某个元素经过处理,要么被丢弃,要么被归档存储。
  4. 注重数据的整体价值,不过分关注个别数据。
  5. 数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序。

批量计算和实时计算

对静态数据和流数据的处理,对应着两种截然不同的计算模式:批量计算和实时计算。

批量计算以“静态数据”为对象,可以在很充裕的时间内对海量数据进行批量处理,计算得到有价值的信息。Hadoop就是典型的批处理模型,由 HDFS 和 HBase 存放大量的静态数据,由 MapReduce 负贵对海量数据执行批量计算。

流数据则不适合采用批量计算,因为流数据不适合用传统的关系模型建模,不能把源源不断的流数据保存到数据库中,流数据被处理后,一部分进入数据库成为静态数据,其他部分则直接被丢弃。传统的关系数据库通常用于满足信息实时交互处理需求,比如零售系统和银行系统,每次有一笔业务发生,用户通过和关系数据库系统进行交互,就可以把相应记录写人磁盘,并支持对记录进行随机读写操作。但是,关系数据库并不是为存储快速、连续到达的流数据而设计的,不支持连续处理,把这类数据库用于流数据处理,不仅成本高,而且效率低。

流数据必须采用实时计算,实时计算最重要的一个需求是能够实时得到计算结果,一般要求响应时间为秒级。当只需要处理少量数据时,实时计算并不是问题;但是,在大数据时代,不仅数据格式复杂、来源众多,而且数据量巨大,这就对实时计算提出了很大的挑战。因此,针对流数据的实时计算-流计算,应运而生。

流计算的概念

总的来说,流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。对于一个流计算系统来说,它应达到如下需求。

  1. 高性能。处理大数据的基本要求,如每秒处理几十万条数据。
  2. 海量式。支持 TB 级甚至是 PB 级的数据规模。
  3. 实时性。必须保证一个较低的延迟时间,达到秒级别,甚至是毫秒级别。
  4. 分布式。支持大数据的基本架构,必须能够平滑扩展。
  5. 易用性。能够快速进行开发和部署。
  6. 可靠性。能可靠地处理流数据。

针对不同的应用场景,相应的流计算系统会有不同的需求,但是针对海量数据的流计算,无论在数据采集、数据处理中都应达到秒级别的要求。

流计算与 Hadoop

Hadoop 设计的初衷是面向大规模数据的批量处理,在使用 MapReduce 处理大规模文件时,一个大文件会被分解成许多个块分发到不同的机器上,每台机器并行运行 MapReduce 任务,最后对结果进行汇总输出。有时候,完成一个任务甚至要经过多轮的迭代。很显然,这种批量任务处理方式在时间延迟方面是无法满足流计算的实时响应需求的。这时,我们可能很自然地会想到一种“变通”的方案来降低批处理的时间延迟-将基于 MapReduce 的批量处理转为小批量处理,将输人数据切成小的片段,每隔一个周期就启动一次 MapReduce 作业。但是这种方案会存在以下问题。

  1. 切分成小的片段,虽然可以降低延迟,但是也增加了任务处理的附加开销,而且还要处理片段之间的依赖关系,因为一个片段可能需要用到前一个片段的计算结果。
  2. 需要对 MapReduce 进行改造以支持流式处理,Reduce 阶段的结果不能直接输出,而是保存在内存中。这种做法会大大增加 MapReduce 框架的复杂度,导致系统难以维护和扩展。
  3. 降低了用户程序的可伸缩性,因为用户必须要使用 MapReduce 接口来定义流式作业。

总之,流数据处理和批量数据处理是两种截然不同的数据处理模式,MapReduce 是专门面向静态数据的批量处理的,内部各种实现机制都为批处理做了高度优化,不适合用于处理持续到达的动态数据。

流计算的处理流程

概述

传统的数据处理流程

流计算的数据处理流程

数据实时采集

数据采集系统基本架构

  • Agent:主动采集数据,并把数据推送到 Colletor 部分。
  • Collector:接收多个 Agent 的数据,并实现有序、可靠、高性能的转发。
  • Store:存储 Collecter 转发过来的数据。

但对于流计算,一般在 Store 部分不进行数据的存储,而是将采集的数据直接发送给流计算平台进行实时计算。

数据实时计算

数据实时计算的流程

实时查询服务

流计算的第三个阶段是实时查询服务,经由流计算框架得出的结果可供用户进行实时查询、展示或储存。传统的数据处理流程,用户需要主动发出查询才能获得想要的结果。而在流处理流程中,实时查询服务可以不断更新结果,并将用户所需的结果实时推送给用户。虽然通过对传统的数据处理系统进行定时查询也可以实现不断更新结果和结果推送,但通过这样的方式获取的结果仍然是根据过去某一时 刻的数据得到的结果,与实时结果有着本质的区别。

由此可见,流处理系统与传统的数据处理系统有如下不同之处。

  1. 流处理系统处理的是实时的数据,而传统的数据处理系统处理的是预先存储好的静态数据。
  2. 用户通过流处理系统获取的是实时结果,而通过传统的数据处理系统获取的是过去某时刻的结果。并且,流处理系统无需用户主动发出查询,实时查询服务可以主动将实时结果推送给用户。

开源流计算框架 Storm

Storm 的设计思想

Streams

在 Storm 对流数据 Streams 的抽象描述中,流数据是一个无限的 Tuple 序列(Tuple 即元组,是元素的有序列表,每一个 Tuple 就是一个值列表,列表中的每个值都有一个名称,并且该值可以是基本类型、字符类型、字节数组等,也可以是其他可序列化的类型)。这些 Tuple 序列会以分布式的方式并行地创建和处理。

Streams

Spouts

Storm 认为每个 Stream 都有一个源头,并把这个源头抽象为 Spouts。Spouts 会从外部读取流数据并持续发出 Tuple。

Spouts

Bolts

Storm 将 Streams 的状态转换过程抽象为 Bolts。Bolts 既可以处理 Tuple,也可以将处理后的 Tuple 作为新的 Streams 发送给其他 Bolts 对 Tuple 的处理逻辑都被封装 在Bolts中,可执行过滤、聚合、查询等操作。

Bolts

Topology

Storm 将 Spouts 和 Bolts 组成的网络抽象成 Topology。Topology 是 Storm 中最高层次的抽象概念,它可以被提交到 Storm 集群执行。一个 Topology 就是一个流转换图,图中节点是一个 Spout 或 Bolt,图中的边则表示 Bolt 订阅了哪个 Stream。当 Spout 或者 Bolt 发送元组时,它会把元组发送到每个订阅了该 Stream 的 Bolt 上进行处理。

在 Topology 的具体实现上,Storm 中的 Topology 定义仅仅是一些 Thrift 结构体(Thrift 是基于二进制的高性能的通信中间件),而 Thrift 支持各种编程语言进行定义,这样一来就可以使用各种编程语言来创建、提交 Topology。

Topology

Stream Groupings

Storm 中的 Stream Groupings 用于告知 Topology 如何在两个组件间(如 Spout 和 Bolt 之间,或者不同的 Bolt 之间)进行 Tuple 的传送。一个 Topology 中 Tuple 的流向中,箭头表示 Tuple 的流向,而圆圈则表示任务,每一个 Spout 和 Bolt 都可以有多个分布式任务,一个任务在什么时候、以什么方式发送 Tuple 就是由 Stream Groupings 来决定的。

Stream_Groupings

目前,Storm 中的 Stream Groupings 有如下 6 种方式。

  1. ShuffleGrouping:随机分组,随机分发 Stream 中的 Tuple,保证每个 Bolt 的 Task 接收 Tuple数量大致一致。
  2. FieldsGrouping:按照字段分组,保证相同字段的 Tuple 分配到同一个 Task 中。
  3. AllGrouping:广播发送,每一个 Task 都会收到所有的 Tuple。
  4. GlobalGrouping:全局分组,所有的 Tuple 都发送到同一个 Task 中。
  5. NonGrouping:不分组,和 ShuffleGrouping 类似,当前 Task 的执行会和它的被订阅者在同一个线程中执行。
  6. DireetGrouping:直接分组,直接指定由某个 Task 来执行 Tuple 的处理。

Storm 的框架设计

Storm 运行在分布式集群中,其运行任务的方式与 Hadoop 类似:在 Hadoop 上运行的是 MapReduce 作业,而在 Storm 上运行的是”Topology”。但两者的任务大不相同,其中主要的不同是一个 MapReduce 作业最终会完成计算并结束运行,而一个 Topology 将持续处理消息(直到入为终止)。

Storm集群架构示意图

Storm 集群采用”Mater-Worker”的节点方式,其中 Master 节点运行名为”Nimbus”的后台程序(类似 Hadoop 中的”JobTracker”), 负责在集群范围内分发代码、为 Worker 分配任务和监测故障。而每个 Worker 节点运行名为”Supervisor”的后台程序,负责监听分配给它所在机器的工作,即根据 Nimbus 分配的任务来决定启动或停止 Worker 进程。

Storm 采用了 Zookeeper 来作为分布式协调组件,负责 Nimbus 和多个 Supervisor 之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑,并由多个 Supervisor 完成)。

此外,Nimbus 后台进程和 Supervisor 后台进程都是快速失败(Fail-fast)和无状态(Stateless)的,Master 节点并没有直接和 Worker 节点通信,而是借助 Zookeeper 将状态信息存放在 Zookeeper 中或本地磁盘中,以便节点故障时进行快速恢复。这意味着,若 Nimbus 进程或 Supervisor 进程终止后,一旦进程重启,它们将恢复到之前的状态并继续工作。这种设计使 Storm 极其稳定。

基于这样的架构设计,Storm 的工作流程包含4个过程。

  1. 客户端提交 Topology 到 Storm 集群中。
  2. Nimbus 将分配给 Supervisor 的任务写人 Zookeeper。
  3. Supervisor 从 Zookeeper 中获取所分配的任务,并启动 Worker 进程。
  4. Worker 进程执行具体的任务。

Storm工作流程示意图

Spark Streaming

Spark Streaming 是构建在 Spark 上的实时计算框架,它扩展了 Spark 处理大规模流式数据的能力。Spark Streaming 可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。

Spark Streaming 设计

Spark Streaming 是 Spark 的核心组件之一,为 Spark 提供了可拓展、高吞吐、容错的流计算能力。Spark Streaming 可整合多种输人数据源,如 Kafka、Flume、HDFS,甚至是普通的 TCP 套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。

Spark_Streaming支持的输入、输出数据源

Spark Streaming 的基本原理是将实时输人数据流以时间片(秒级)为单位进行拆分,然后经 Spark 引擎以类似批处理的方式处理每个时间片数据。

Spark_Streaming执行流程

Spark Streaming 最主要的抽象是 DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming 的输人数据按照时间片(如 1s)分成一段一段的 DStream,每一段数据转换为 Spark 中的 RDD,并且对 DStream 的操作都最终转变为对相应的 RDD的操作。

例如,进行单词统计时,每个时间片的数据(存储句子的 RDD)经 flatMap 操作,生成了存储单词的 RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。

DStream

Spark Streaming 与 Storm 的对比

Spark Streaming 和 Storm 最大的区别在于,Spark Streaming 无法实现毫秒级的流计算,而 Storm 则可以实现毫秒级响应。

Spark Streaming 无法实现毫秒级的流计算,是因为其将流数据按批处理窗口大小(通常在 0.5~2s 之间)分解为一系列批处理作业,在这个过程中会产生多个 Spark 作业,且每一段数据的处理都会经过 Spark DAG 图分解、任务调度过程,因此无法实现毫秒级相应。Spark Streaming 难以满足对实时性要求非常高(如高频实时交易)的场景,但足以胜任其他流式准实时计算场景。

相比之下,Storm 处理的单位为 Tuple,只需要极小的延迟。Spark Streaming 构建在 Spark 上,一方面是因为 Spark 的低延迟执行引擎(100ms 左右)可以用于实时计算,另一方面,相比于 Storm,RDD 数据集更容易做高效的容错处理。此外,Spark Streaming 采用的小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法,因此方便了一些需要历史数据和实时数据联合分析的特定应用场合。

神经风格迁移

神经风格迁移2

$J_{\text{content}}(C,G)$

第一部分被称作内容代价,这是一个关于内容图片和生成图片的函数,它是用来度量生成图片$G$的内容与内容图片$C$的内容有多相似。

$J_{\text{style}}(S,G)$

然后我们会把结果加上一个风格代价函数,也就是关于$S$和$G$的函数,用来度量图片$G$的风格和图片$S$的风格的相似度。

$J( G) = a J_{\text{content}}( C,G) + \beta J_{\text{style}}(S,G)$

内容代价函数(Content cost function)

内容代价函数

假如说,你用隐含层$l$来计算内容代价,如果$l$是个很小的数,比如用隐含层1,这个代价函数就会使你的生成图片像素上非常接近你的内容图片。然而如果你用很深的层,那么那就会问,内容图片里是否有狗,然后它就会确保生成图片里有一个狗。所以在实际中,这个层$l$在网络中既不会选的太浅也不会选的太深。因为你要自己做这周结束的编程练习,我会让你获得一些直觉,在编程练习中的具体例子里通常$l$会选择在网络的中间层,既不太浅也不很深,然后用一个预训练的卷积模型,可以是VGG网络或者其他的网络也可以。

内容代价函数2

现在你需要衡量假如有一个内容图片和一个生成图片他们在内容上的相似度,我们令这个$a^{[l][C]}$和$a^{[l][G]}$,代表这两个图片$C$和$G$的$l$层的激活函数值。如果这两个激活值相似,那么就意味着两个图片的内容相似。

$J_{\text{content}}( C,G) = \frac{1}{2}|| a^{[l][C]} - a^{[l][G]}||^{2}$

为两个激活值不同或者相似的程度,我们取$l$层的隐含单元的激活值,按元素相减,内容图片的激活值与生成图片相比较,然后取平方,也可以在前面加上归一化或者不加,比如$\frac{1}{2}$或者其他的,都影响不大,因为这都可以由这个超参数$a$来调整($J(G) =a J_{\text{content}}( C,G) + \beta J_{\text{style}}(S,G)$)

风格代价函数(Style cost function)

风格代价函数

风格代价函数2

风格代价函数3

Siamese 网络

Siamese网络

建立一个人脸识别系统的方法就是,如果你要比较两个图片的话,例如这里的第一张(编号1)和第二张图片(编号2),你要做的就是把第二张图片喂给有同样参数的同样的神经网络,然后得到一个不同的128维的向量(编号3),这个向量代表或者编码第二个图片,我要把第二张图片的编码叫做$f(x^{(2)})$。这里我用$x^{(1)}$和$x^{(2)}$仅仅代表两个输入图片,他们没必要非是第一个和第二个训练样本,可以是任意两个图片。

最后如果你相信这些编码很好地代表了这两个图片,你要做的就是定义$d$,将$x^{(1)}$和$x^{(2)}$的距离定义为这两幅图片的编码之差的范数,$d( x^{( 1)},x^{( 2)}) =|| f( x^{( 1)}) - f( x^{( 2)})||_{2}^{2}$。

Triplet 损失

Triplet损失

Triplet损失2

面部识别与二分类

面部识别与二分类

面部识别与二分类2

面部识别与二分类3

概述

Spark 简介

Spark 具有如下 4 个主要特点:

  • 运行速度快。Spark 使用先进的 DAG(Directed Acyelic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比 Hadoop MapReduce 快上百倍,基于磁盘的执行速度也能快十倍。
  • 容易使用。Spark 支持使用 Scala、Java、Python 和 R 语言进行编程,简洁的 API 设计有助于用户轻松构建并行程序,并且可以通过 Spark Shell 进行交互式编程。
  • 通用性。Spark 提供了完整而强大的技术栈,包括 SQL 查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算。
  • 运行模式多样。Spark 可运行于独立的集群模式中,或者运行于 Hadoop 中,也可运行于 Amazon EC2 等云环境中,并且可以访问 HDFS、Cassandra、HBase、Hive等多种数据源。

Spark 与 Hadoop 的对比

Hadoop 虽然已成为大数据技术的事实标准,但其本身还存在诸多缺陷,最主要的缺陷是其 MapReduce 计算模型延迟过高,无法胜任实时、快速计算的需求,因而只适用于离线批处理的应用场景。

Hadoop存在以下缺点:

  • 表达能力有限。计算都必须要转化成 Map 和 Reduce 两个操作,但这并不适合所有的情况,难以描述复杂的数据处理过程。
  • 磁盘 IO 开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写人到磁盘中,IO 开销较大。
  • 延迟高。一次计算可能需要分解成一系列按顺序执行的 MapReduce 任务,任务之间的衔接由于涉及到 IO 开销,会产生较高延迟。而且,在前一个任务执行完成之前,其他任务无法开始,因此难以胜任复杂、多阶段的计算任务。

Spark 在借鉴 Hadoop MapReduce 优点的同时,很好地解决了 MapReduce 所面临的问题。相比于 MapReduce,Spark主要具有如下优点:

  • Spark 的计算模式也属于 MapReduce,但不局限于 Map 和 Reduce 操作,还提供了多种数据集操作类型,编程模型比 MapReduce 更灵活。
  • Spark 提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率。
  • Spark 基于 DAG 的任务调度执行机制,要优于 MapReduce 的迭代执行机制。

Hadoop与Spark的执行流程对比

Spark 最大的特点就是将计算数据、中间结果都存储在内存中,大大减少了 IO 开销,因而 Spark 更适合于迭代运算比较多的数据挖掘与机器学习运算。

使用 Hadoop 进行迭代计算非常耗资源,因为每次迭代都需要从磁盘中写人、读取中间数据,IO 开销大。而 Spark 将数据载人内存后,之后的迭代计算都可以直接使用内存中的中间结果作运算,避免了从磁盘中频繁读取数据。

Hadoop与Spark执行逻辑回归的时间对比

在实际进行开发时,使用 Hadoop 需要编写不少相对底层的代码,不够高效。相对而言,Spark 提供了多种高层次、简洁的 API。通常情况下,对于实现相同功能的应用程序,Hadoop 的代码量要比 Spark 多 2~5 倍。更重要的是,Spark 提供了实时交互式编程反馈,可以方便地验证、调整算法。

尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,主要用于替代 Hadoop 中的 MapReduce 计算模型。实际上,Spark 已经很好地融入了 Hadoop 生态圈,并成为其中的重要一员,它可以借助于 YARN 实现资源调度管理,借助于 HDFS 实现分布式存储。此外,Hadoop 可以使用廉价的、异构的机器来做分布式存储与计算,但是 Spark 对硬件的要求稍高一些,对内存与 CPU 有一定的要求。

Spark 生态系统

在实际应用中,大数据处理主要包括以下三个类型:

  • 复杂的批量数据处理:时间跨度通常在数十分钟到数小时之间。
  • 基于历史数据的交互式查询:时间跨度通常在数十秒到数分钟之间。
  • 基于实时数据流的数据处理:时间跨度通常在数百毫秒到数秒之间。

目前,已有很多相对成熟的开源软件用于处理以上三种情景。

  • 可以利用 Hadoop MapReduce 来进行批量数据处理;
  • 可以用 Impala 来进行交互式查询(Impala 与 Hive 相似,但底层引擎不同,提供了实时交互式 SQL 查询);
  • 对于流式数据处理可以采用开源流计算框架 Storm。

一些企业可能只会涉及其中部分应用场景,只需部署相应软件即可满足业务需求,但是对于互联网公司而言,通常会同时存在以上三种场景,就需要同时部署三种不同的软件,这样做难免会带来一些问题。

  • 不同场景之间输人输出数据无法做到无缝共享,通常需要进行数据格式的转换。
  • 不同的软件需要不同的开发和维护团队,带来了较高的使用成本。
  • 比较难以对同一个集群中的各个系统进行统一的资源协调和分配。

Spark 的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,既能够提供内存计算框架,也可以支持 SQL 即席查询、实时流式计算、机器学习和图计算等。Spark 可以部署在资源管理器 YARN 之上,提供一站式的大数据解决方案。因此,Spark 所提供的生态系统足以应对上述三种场景,即同时支持批处理、交互式查询和流数据处理。

Spark 生态系统主要包含了 Spark Core、Spark SQL、Spark Streaming、MLlib 和 GraphX 等组件,各个组件的具体功能如下:

  1. Spark Core

    Spark Core 包含 Spark 的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等,主要面向批数据处理。Spark 建立在统一的抽象 RDD 之上,使其可以以基本一致的方式应对不同的大数据处理场景。

  2. Spark SQL

    Spark SQL 允许开发人员直接处理 RDD,同时也可查询 Hive、HBase 等外部数据源。Spark SQL 的一个重要特点是其能够统一处理关系表和 RDD,使得开发人员不需要自己编写 Spark 应用程序,开发人员可以轻松地使用 SQL 命令进行查询,并进行更复杂的数据分析。

  3. Spark Streaming

    Spark Streaming 支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用 Spark Core 进行快速处理。Spark Streaming 支持多种数据输入源,如 Kafka、Flume 和 TCP 套接字等。

  4. MLlib(机器学习)

    MLlib 提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习的工作。

  5. GraphX(图计算)

    GraphX 是 Spark 中用于图计算的 API,可认为是 Pregel 在 Spark 上的重写及优化,GraphX 性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。

需要说明的是,无论是 Spark SQL、Spark Streaming、MLlib 还是 GraphX,都可以使用 Spark Core 的 API 处理问题,它们的方法几乎是通用的,处理的数据也可以共享,不同应用之间的数据可以无缝集成。

Spark的应用场景

Spark 运行架构

基本概念

  • RDD:是弹性分布式数据集(Resilient Distributed Dataset)的英文缩写,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
  • DAG:是 Directed Acyclic Graph(有向无环图)的英文缩写,反映 RDD 之间的依赖关系。
  • Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行任务,并为应用程
    序存储数据。
  • 应用:用户编写的 Spark 应用程序。
  • 任务:运行在 Executor 上的工作单元。
  • 作业:一个作业包含多个 RDD 及作用于相应 RDD 上的各种操作。
  • 阶段:是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”,或者
    也被称为“任务集”。

架构设计

Spark 运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(WorkerNode)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是 Spark 自带的资源管理器,也可以是 YARN 或 Mesos 等资源管理框架。

Spark运行架构

与 Hadoop MapReduce 计算框架相比,Spark 所采用的 Executor 有两个优点:

  • 一是利用多线程来执行具体的任务(Hadoop MapReduce 采用的是进程模型),减少任务的启动开销;
  • 二是 Executor 中有一个 BlockManager 存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时就可以直接读该存储模块里的数据,而不需要读写到 HDFS 等文件系统里,因而有效减少了 IO 开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写 IO 性能。

在 Spark 中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动 Executor,并向 Executor 发送应用程序代码和文件,然后在 Executor 上执行任务,运行结束后执行结果会返回给任务控制节点,或者写到 HDFS 或者其他数据库中。

Spark中各种概念之间定的相互关系

Spark 运行基本流程

Spark 运行基本流程如下:

  1. 当一个 Spark 应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个 SparkContext,由 SparkContext 负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext 会向资源管理器注册并申请运行 Executor 的资源。
  2. 资源管理器为 Executor 分配资源,并启动 Executor 进程,Executor 运行情况将随着“心跳”发送到资源管理器上。
  3. SparkContext 根据 RDD 的依赖关系构建 DAG 图,DAG 图提交给 DAG 调度器(DAGScheduler)进行解析,将 DAG 图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor 向 SparkContext 申请任务,任务调度器将任务分发给 Executor 运行,同时 SparkContext 将应用程序代码发放给 Executor。
  4. 任务在 Executor 上运行,把执行结果反馈给任务调度器,然后反馈给 DAG 调度器,运行完毕后写人数据并释放所有资源。

Spark运行基本流程图

Spark 运行架构具有以下特点:

  1. 每个应用都有自己专属的 Executor 进程,并且该进程在应用运行期间一直驻留。Executor 进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠。
  2. Spark 运行过程与资源管理器无关,只要能够获取 Executor 进程并保持通信即可。
  3. Executor 上有一个 BlockManager 存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写人到 HDFS 等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写 IO 性能。
  4. 任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark 采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么在这种情况下是否需要将数据移动到其他的空闲节点上呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么调度就会等待,直到当前节点可用。

RDD 的设计与运行原理

RDD 设计背景

在实际应用中,存在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,目前的 MapReduce 框架都是把中间结果写人到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。虽然类似 Pregel 等图计算框架也是将结果保存在内存当中,但是这些框架只能支持一些特定的计算模式,并没有提供种通用的数据抽象。RDD 就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同 RDD 之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销。

RDD 概念

一个 RDD 就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个 RDD 可以分成多个分区,每个分区就是一个数据集片段,并且一个 RDD 的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD 提供了一种高度受限的共享内存模型,即 RDD 是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建 RDD,或者通过在其他 RDD 上执行确定的转换操作(如 map、join 和 groupBy)而创建得到新的 RDD。

RDD 提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型:

  • 前者用于执行计算并指定输出的形式
  • 后者指定 RDD 之间的相互依赖关系。

两类操作的主要区别是,转换操作(如map、filter、 groupBy、 join等)接受 RDD 并返回 RDD,而行动操作(如 count、collect 等)接受 RDD 但是返回非 RDD(即输出一个值或结果)。

RDD提供的转换接口都非常简单,都是类似 map、filter、 groupBy、 join 等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。因此,RDD 比较适合对于数据集中元素执行相同操作的批处理式应用,而不适合用于需要异步、细粒度状态的应用,比如 Web 应用系统、增量式的网页爬虫等。正因为这样,这种粗粒度转换接口设计,会使人直觉上认为 RDD 的功能很受限、不够强大。但是,实际上 RDD 已经被实践证明可以很好地应用于许多并行计算应用中,可以具备很多现有计算框架(如 MapReduce、SQL、 Pregel 等)的表达能力,并且可以应用于这些框架处理不了的交互式数据挖掘应用。

Spark的转换和行动操作

RDD 特性

  1. 高效的容错性。现有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,也就是在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销。在 RDD 的设计中,数据只读,不可修改,如果需要修改数据,必须从父 RDD 转换到子 RDD,由此在不同 RDD 之间建立了血缘关系。所以,RDD 是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(比如检查点)实现容错,而只需通过 RDD 父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,而且重算过程可以在不同节点之间并行进行,实现了高效的容错。此外,RDD 提供的转换操作都是一些粗粒度的操作(比如 map、filter 和 join),RDD 依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销。
  2. 中间结果持久化到内存。数据在内存中的多个 RDD 操作之间进行传递,不需要“落地”到磁盘上,避免了不必要的读写磁盘开销。
  3. 存放的数据可以是 Java 对象,避免了不必要的对象序列化和反序列化开销。

RDD 之间的依赖关系

RDD 中不同的操作会使得不同 RDD 中的分区产生不同的依赖。RDD 中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)。

RDD的宽依赖和窄依赖

窄依赖表现为一个父 RDD 的分区对应于一个子 RDD 的分区,或多个父 RDD 的分区对应于一个子 RDD 的分区。比如图中,RDD1 是 RDD2 的父 RDD,RDD2 是子 RDD,RDD1 的分区 1 对应于 RDD2 的一个分区(即分区 4);再比如,RDD6 和 RDD7 都是RDD8的父RDD, RDD6 中的分区(分区 15)和 RDD7 中的分区(分区18),两者都对应于 RDD8 中的一个分区(分区 21)。

宽依赖则表现为存在一个父 RDD 的一个分区对应一个子 RDD 的多个分区。比如中,RDD9 是 RDD12 的父 RDD,RDD9 中的分区 24 对应了 RDD12 中的两个分区(即分区 27 和分区 28)。

总体而言,如果父 RDD 的一个分区只被一个子 RDD 的一个分区所使用就是窄依赖,否则就
是宽依赖。窄依赖典型的操作包括 map、filter、 union 等,宽依赖典型的操作包括groupByKey、sortByKey 等。对于连接(Join)操作,可以分为两种情况。

  1. 对输人进行协同划分,属于窄依赖,如图所示。所谓协同划分(Co-partitioned)是指多个父 RDD 的某一分区的所有“键(Key)”落在子 RDD的同一个分区内,不会产生同一个父 RDD 的某一分区落在子 RDD 的两个分区的情况。
  2. 对输人做非协同划分,属于宽依赖,如图所示。

对于窄依赖的 RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合。对于宽依赖的 RDD,则通常伴随着 Shuffle 操作,即首先需要计算好所有父分区数据,然后在节
点之间进行 Shuffle。

Spark 的这种依赖关系设计,使其具有了天生的容错性,大大加快了 Spark 的执行速度。因为,RDD 数据集通过“血缘关系”记住了它是如何从其他 RDD 中演变过来的,血缘关系记录的是粗颗粒度的转换操作行为,当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。相对而言,在两种依赖关系中,窄依赖的失败恢复更为高效,它只需要根据父 RDD 分区重新计算丢失的分区即可(不需要重新计算所有分区),而且可以并行地在不同节点上进行重新计算。而对于宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父 RDD 分区,开销较大。此外,Spark 还提供了数据检查点和记录日志,用于持久化中间 RDD,从而使得在进行失败恢复时不需要追溯到最开始的阶段。在进行故障恢复时,Spark 会对数据检查点开销和重新计算 RDD 分区的开销进行比较,从而自动选择最优的恢复策略。

阶段的划分

Spark 通过分析各个 RDD 的依赖关系生成了 DAG,再通过分析各个 RDD 中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算

RDD阶段划分

例如,根据 RDD 分区的依赖关系划分阶段,假设从 HDFS 中读入数据生成 3 个不同的 RDD(即 A、C 和 E),通过一系列转换操作后再将计算结果保存回 HDFS。对 DAG 进行解析时,在依赖图中进行反向解析,由于从 RDD A到 RDD B 的转换以及从 RDD B 和 RDD F 到 RDD G 的转换都属于宽依赖,因此在宽依赖处断开后可以得到 3 个阶段,即阶段 1、阶段 2和阶段 3。由图可以看出,在阶段 2 中,从 map 到 union 都是窄依赖,这两步操作可以形成一个流水线操作。比如,分区 7 通过 map 操作生成的分区 9,可以不用等待分区 8 到分区 10 这个转换操作的计算结束,而是继续进行 union 操作,转换得到分区 13,这样流水线执行大大提高了计算的效率。

由上述论述可知,把一个 DAG 图划分成多个阶段以后,每个阶段都代表了一组关联的、相互之间没有 Shuffle 依赖关系的任务组成的任务集合。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给 Executor 运行。

RDD 运行过程

  1. 创建 RDD 对象。
  2. SparkContext 负责计算 RDD 之间的依赖关系,构建 DAG。
  3. DAGScheduler 负责把 DAG 图分解成多个阶段,每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的 Executor 去执行。

RDD运行过程

  • 重命名并求改引用:F2

  • 格式化代码:shift + alt + f (Win、Mac)

  • workbench.files.action.showActiveFileInExplorer:cmd + m (Mac)

  • 光标上一个位置:ctrl + - (Mac)

  • 光标下一个位置:ctrl + shift + - (Mac)

  • code-runner 快速执行:ctrl + alt + n

  • 转换成大写:shift + ctrl + u (Mac)

  • 转换成小写:shift + ctrl + l (Mac)

  • 滑动窗口目标检测
    • 缺点:计算成本

评价对象检测算法

交并比

一般约定,在计算机检测任务中,如果 $IoU \geq 0.5$,就说检测正确,如果预测器和实际边界框完美重叠,$IoU$ 就是 1,因为交集就等于并集。但一般来说只要 $IoU \geq 0.5$,那么结果是可以接受的,看起来还可以。一般约定,0.5 是阈值,用来判断预测的边界框是否正确。

非极大值抑制(Non-max suppression)

算法可能对同一个对象做出多次检测,所以算法不是对某个对象检测出一次,而是检测出多次。非极大值抑制这个方法可以确保你的算法对每个对象只检测一次。

非极大抑制

这个 $p_c$ 检测概率,首先看概率最大的那个,这个例子(右边车辆)中是 0.9,然后就说这是最可靠的检测,所以我们就用高亮标记,就说我这里找到了一辆车。这么做之后,非极大值抑制就会逐一审视剩下的矩形,所有和这个最大的边框有很高交并比,高度重叠的其他边界框,那么这些输出就会被抑制。所以这两个矩形 $p_c$ 分别是 0.6 和 0.7,这两个矩形和淡蓝色矩形重叠程度很高,所以会被抑制,变暗,表示它们被抑制了。

非极大抑制2

接下来,逐一审视剩下的矩形,找出概率最高,$p_c$ 最高的一个,在这种情况下是 0.8,我们就认为这里检测出一辆车(左边车辆),然后非极大值抑制算法就会去掉其他 $IoU$ 值很高的矩形。所以现在每个矩形都会被高亮显示或者变暗,如果你直接抛弃变暗的矩形,那就剩下高亮显示的那些,这就是最后得到的两个预测结果。

非极大抑制3

非极大抑制算法

Anchor Boxes

对象检测中存在的一个问题是每个格子只能检测出一个对象,如果你想让一个格子检测出多个对象,你可以这么做,就是使用 anchor box 这个概念。

AnchorBoxes

对于这个例子,我们继续使用3×3网格,注意行人的中点和汽车的中点几乎在同一个地方,两者都落入到同一个格子中。所以对于那个格子,如果 $y$ 输出这个向量$y= \ \begin{bmatrix} p_{c} \ b_{x} \ b_{y} \ b_{h} \ b_{w} \ c_{1} \ c_{2}\ c_{3} \\end{bmatrix}$,你可以检测这三个类别,行人、汽车和摩托车,它将无法输出检测结果,所以我必须从两个检测结果中选一个。

anchor box的思路是,这样子,预先定义两个不同形状的anchor box,或者anchor box形状,你要做的是把预测结果和这两个anchor box关联起来。一般来说,你可能会用更多的anchor box,可能要5个甚至更多,但对于这个视频,我们就用两个anchor box,这样介绍起来简单一些。

AnchorBoxes2

AnchorBoxes3

你要做的是定义类别标签,用的向量不再是上面这个$\begin{bmatrix} p_{c} & b_{x} &b_{y} & b_{h} & b_{w} & c_{1} & c_{2} & c_{3} \\end{bmatrix}^{T}$,而是重复两次,$y= \begin{bmatrix} p_{c} & b_{x} & b_{y} &b_{h} & b_{w} & c_{1} & c_{2} & c_{3} & p_{c} & b_{x} & b_{y} & b_{h} & b_{w} &c_{1} & c_{2} & c_{3} \\end{bmatrix}^{T}$,前面的$p_{c},b_{x},b_{y},b_{h},b_{w},c_{1},c_{2},c_{3}$(绿色方框标记的参数)是和anchor box 1关联的8个参数,后面的8个参数(橙色方框标记的元素)是和anchor box 2相关联。因为行人的形状更类似于anchor box 1的形状,而不是anchor box 2的形状,所以你可以用这8个数值(前8个参数),这么编码$p_{c} =1$,是的,代表有个行人,用$b_{x},b_{y},b_{h}$和$b_{w}$来编码包住行人的边界框,然后用$c_{1},c_{2},c_{3}$($c_{1}= 1,c_{2} = 0,c_{3} = 0$)来说明这个对象是个行人。

然后是车子,因为车子的边界框比起anchor box 1更像anchor box 2的形状,你就可以这么编码,这里第二个对象是汽车,然后有这样的边界框等等,这里所有参数都和检测汽车相关($p_{c}= 1,b_{x},b_{y},b_{h},b_{w},c_{1} = 0,c_{2} = 1,c_{3} = 0$)。

AnchorBoxes算法

总结一下,用anchor box之前,你做的是这个,对于训练集图像中的每个对象,都根据那个对象中点位置分配到对应的格子中,所以输出$y$就是3×3×8,因为是3×3网格,对于每个网格位置,我们有输出向量,包含$p_{c}$,然后边界框参数$b_{x},b_{y},b_{h}$和$b_{w}$,然后$c_{1},c_{2},c_{3}$。

现在用到anchor box这个概念,是这么做的。现在每个对象都和之前一样分配到同一个格子中,分配到对象中点所在的格子中,以及分配到和对象形状交并比最高的anchor box中。所以这里有两个anchor box,你就取这个对象,如果你的对象形状是这样的(编号1,红色框),你就看看这两个anchor boxanchor box 1形状是这样(编号2,紫色框),anchor box 2形状是这样(编号3,紫色框),然后你观察哪一个anchor box和实际边界框(编号1,红色框)的交并比更高,不管选的是哪一个,这个对象不只分配到一个格子,而是分配到一对,即(grid cell,anchor box)对,这就是对象在目标标签中的编码方式。所以现在输出 $y$ 就是3×3×16,上一张幻灯片中你们看到 $y$ 现在是16维的,或者你也可以看成是3×3×2×8,因为现在这里有2个anchor box,而 $y$ 是8维的。$y$ 维度是8,因为我们有3个对象类别,如果你有更多对象,那么$y$ 的维度会更高。

YOLO 算法(Putting it together: YOLO algorithm)

候选区域(Region proposals)

滑动窗法,使用训练过的分类器,在这些窗口中全部运行一遍,然后运行一个检测器,看看里面是否有车辆,行人和摩托车。现在也可以运行一下卷积算法,这个算法的其中一个缺点是,它在显然没有任何对象的区域浪费时间。

候选区域

所以这里这个矩形区域(编号1)基本是空的,显然没有什么需要分类的东西。也许算法会在这个矩形上(编号2)运行,而你知道上面没有什么有趣的东西。

R-CNN的算法,意思是带区域的卷积网络,或者说带区域的CNN。这个算法尝试选出一些区域,在这些区域上运行卷积网络分类器是有意义的,所以这里不再针对每个滑动窗运行检测算法,而是只选择一些窗口,在少数窗口上运行卷积网络分类器。

选出候选区域的方法是运行图像分割算法,分割的结果是下边的图像,为了找出可能存在对象的区域。比如说,分割算法在这里得到一个色块,所以你可能会选择这样的边界框(编号1),然后在这个色块上运行分类器,就像这个绿色的东西(编号2),在这里找到一个色块,接下来我们还会在那个矩形上(编号2)运行一次分类器,看看有没有东西。在这种情况下,如果在蓝色色块上(编号3)运行分类器,希望你能检测出一个行人,如果你在青色色块(编号4)上运行算法,也许你可以发现一辆车,我也不确定。

候选区域2

RCNN

临时使用

清华源:-i https://pypi.tuna.tsinghua.edu.cn/simple

1
2
3
4
https://pypi.tuna.tsinghua.edu.cn/simple/	# 清华大学
https://mirrors.aliyun.com/pypi/simple/ # 阿里云
https://pypi.douban.com/simple/ # 豆瓣
https://pypi.mirrors.ustc.edu.cn/simple/ # 中国科学技术大学

设为默认

修改 ~/.config/pip/pip.conf(Linux),%APPDATA%\pip\pip.ini(Windows 10)或 $HOME/Library/Application Support/pip/pip.conf(macOS)(没有就创建一个),修改 index-url 至 tuna,例如

1
2
[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple

pip 和 pip3 并存时,只需修改 ~/.pip/pip.conf

转自:https://blog.csdn.net/qq_21729419/article/details/113733500

中心思想

使普通全局变量的写对其他线程立即可见(使用volatile有序性来传递)

内存屏障

先来一堆有必要的废话

  • LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
  • LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
  • StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。

Java内存模型允许编译器和处理器对指令重排序以提高运行性能,并且只会对不存在数据依赖性的指令重排序。

为了实现volatile的有序性内存语义(jdk5之后),编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。

  • 在每个volatile写操作的前面插入一个StoreStore屏障。
  • 在每个volatile写操作的后面插入一个StoreLoad屏障。
  • 在每个volatile读操作的前面插入一个LoadLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadStore屏障

volatile写操作插入内存屏障后生成的指令序列如下图所示。
1

volatile读操作插入内存屏障后生成的指令序列如下图所示。
2

传递性 :如果操作A先行发生于操作B,操作B先行发生于操作C,那就可以得出操作A先行发生于操作C的结论。

相信大家对上面的内容很熟悉了,但是代表啥意思呢,估计你还是一头雾水。我们举个例子

1
2
3
4
5
6
7
8
9
10
11
12
int e = 0;
volatile f = 0;


//thread 1
e = 1; //操作 A
f = 1; //操作 B

//thread 2

int j = f; //操作 C
int k = e; //操作 D

我们假设 thread 1 执行结束之后thread 2执行,由于f是volatile变量,那么可知B操作的写,对于C操作的读是可见的。那么可得B先发生与C,每个线程单独来看, A先发生与B, C先发生与D,最后我们再加上传递性可知。

A->B->C->D

至此我们可以得到普通变量e的的写入对于其他线程立即可见(注意变量f在其中起到的作用)

Reentrantlock 也借助了volatile的这个特性

总结一下有序性的含义

  1. 禁止指令重排
  2. volatile写会将线程工作缓存(cpu缓存)中的所有数据写入主存
  3. volatile读会将线程工作缓存(cpu缓存)中的所有数据失效,读的时候需要从主存中取。

测试代码

前面举的例子比较难测试出反例(对普通变量的写,不通过volatile的有序性保证,其他线程不是立即可见),所以写了下面的测试代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int a = 0;
static volatile int b = 0; //去掉volatile修饰会发生死循环,即变量a对于其他线程不是立即可见


public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while(a==0) {
int c = b;
}
});
thread.start();

Thread.sleep(200);

new Thread(() -> {
a = 1;
b = 1;
}).start();

thread.join();
}

转自:https://blog.csdn.net/zzti_erlie/article/details/86355477

极简计算机发展史

我们知道,计算机CPU和内存的交互是最频繁的,内存是我们的高速缓存区。而刚开始用户磁盘和CPU进行交互,CPU运转速度越来越快,磁盘远远跟不上CPU的读写速度,才设计了内存,但是随着CPU的发展,内存的读写速度也远远跟不上CPU的读写速度,因此,为了解决这一纠纷,CPU厂商在每颗CPU上加入了高速缓存,用来缓解这种症状,因此,现在CPU同内存交互就变成了下面的样子。

1

单核CPU的性能不可能无限制的增长,要想很多的提升新能,需要多个处理器协同工作。 基于高速缓存的存储交互很好的解决了处理器与内存之间的矛盾,也引入了新的问题:缓存一致性问题。在多处理器系统中,每个处理器有自己的高速缓存,而他们又共享同一块内存(下文成主存,main memory 主要内存),当多个处理器运算都涉及到同一块内存区域的时候,就有可能发生缓存不一致的现象。为了解决这一问题,需要各个处理器运行时都遵循一些协议,在运行时需要用这些协议保证数据的一致性。

2

缓存一致性协议中最出名的就是Intel 的MESI协议,MESI协议保证了每个缓存中使用的共享变量的副本是一致的。它核心的思想是:当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存设置为无效状态,因此当其他CPU需要读取这个变量时,发现自己缓存中该变量是无效状态,那么它就会从内存重新读取

Java内存模型

Java的内存模型和上面的结构还是挺相似的,此时在看工作内存和主内存关系,从逻辑上,高速缓存对应工作内存,每个线程分配到CPU时间片时,独自享有高速缓存的使用能力。主内存对应存储的物理内存。特别注意,这只是逻辑上的对等关系,物理的上具体对应关系十分复杂,这里不讨论。

3

volatile的作用是什么?

volatile可以保证可见性,有序性,但不能保证原子性

可见性

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值

假如说有2个线程对一个变量data进行操作,线程先会把主内存中的值缓存到工作内存,这样做的原因和上面提到的高速缓存类似,提高效率

4

但是这样会引入新的问题,假如说线程A把data修改为1,线程A的工作内存data值为1,但是主内存和线程B的工作内存data值为0,此时就有可能出现Java并发编程中的可见性问题

5

举个例子,如下面代码,线程A已经将flag的值改变,但是线程B并没有及时的感知到,导致一直进行死循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Test {

public static boolean flag = false;

public static void main(String[] args) {

new Thread(()->{
while(!flag) {
}
System.out.println("threadB end");
}).start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(()->{
flag = true;
System.out.println("threadA end");
}).start();
}

}

输出为,线程B一直没有结束

1
threadA end

但是如果将data定义为如下形式,线程A对data的变更,线程B立马能感知到

1
public static volatile boolean flag = false;

输出为

1
2
threadA end
threadB end

那么是如何实现的呢?其实volatile保证可见性的方式和上面提到的缓存一致性协议的原理很类似

  1. 线程A将工作内存的data更改后,强制将data值刷回主内存
  2. 如果线程B的工作内存中有data变量的缓存时,会强制让这个data变量缓存失效
  3. 当线程B需要读取data变量的值时,先从工作内存中读,发现已经过期,就会从主内存中加载data变量的最新值了

6

有序性

有序性即程序执行的顺序按照代码的先后顺序执行

1
2
3
4
int i = 0;              
boolean flag = false;
i = 1; //语句1
flag = true; //语句2

上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序(Instruction Reorder)。

下面解释一下什么是指令重排序,一般来说,处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。

但是有依赖关系的语句不会进行重排序,如下面求圆面积的代码

1
2
3
double pi = 4.14   //A
double r = 1.0 //B
double area = pi * r * r //c

程序的执行顺序只有下面这2个形式:A->B->C和B->A->C,因为A和C之间存在依赖关系,同时B和C之间也存在依赖关系。因此最终执行的指令序列中C不能被重排序到A和B前面。

虽然重排序不会影响单个线程内程序执行的结果,但是多线程呢?下面看一个例子

1
2
3
4
5
6
7
8
9
//线程1:
context = loadContext(); //语句1
inited = true; //语句2

//线程2:
while(!inited ){
sleep()
}
doSomethingwithconfig(context);

上面代码中,由于语句1和语句2没有数据依赖性,因此可能会被重排序。假如发生了重排序,在线程1执行过程中先执行语句2,而此是线程2会以为初始化工作已经完成,那么就会跳出while循环,去执行doSomethingwithconfig(context)方法,而此时context并没有被初始化,就会导致程序出错。

从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性

当写双重检测锁定版本的单例模式时,就要用到volatile来保证可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Singleton {

private volatile static Singleton uniqueInstance;

private Singleton() {}

public static Singleton getInstance() {
if (uniqueInstance == null) {
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}

原子性

原子性即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Test {

public static volatile int inc = 0;

public static void main(String[] args) {

//新建一个线程池
ExecutorService service = Executors.newCachedThreadPool();
//Java8 lambda表达式执行runnable接口
for (int i = 0; i < 5; i++) {
service.execute(() -> {
for (int j = 0; j < 1000; j++) {
inc++;
}
});
}
//关闭线程池
service.shutdown();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("inc = " + inc);
}

}

执行上述代码结果并不是每次都是5000,表明volatile并不能保证原子

可能有的朋友就会有疑问,不对啊,上面是对变量inc进行自增操作,由于volatile保证了可见性,那么在每个线程中对inc自增完之后,在其他线程中都能看到修改后的值啊,所以有5个线程分别进行了1000次操作,那么最终inc的值应该是1000*5=5000。

这里面就有一个误区了,volatile关键字能保证可见性没有错,但是上面的程序错在没能保证原子性。可见性只能保证每次读取的是最新的值,但是volatile没办法保证对变量的操作的原子性。

在前面已经提到过,自增操作是不具备原子性的,它包括读取变量的原始值、进行加1操作、写入工作内存。那么就是说自增操作的三个子操作可能会分割开执行,就有可能导致下面这种情况出现:

假如某个时刻变量inc的值为10,

线程1对变量进行自增操作,线程1先读取了变量inc的原始值,然后线程1被阻塞了;然后线程2对变量进行自增操作,线程2也去读取变量inc的原始值,由于线程1只是对变量inc进行读取操作,而没有对变量进行修改操作,所以不会导致线程2的工作内存中缓存变量inc的缓存行无效,也不会导致主存中的值刷新,所以线程2会直接去主存读取inc的值(这个部分小编感觉是海子大佬的笔误,应该是线程2会直接去工作内存读取inc的值,因为工作内存中inc并没有失效),发现inc的值时10,然后进行加1操作,并把11写入工作内存,最后写入主存。

然后线程1接着进行加1操作,由于已经读取了inc的值(inc++,包括3个操作,1.读取inc的值,2.进行加1操作,3.写入新的值),注意此时在线程1的工作内存中inc的值仍然为10,所以线程1对inc进行加1操作后inc的值为11,然后将11写入工作内存,最后写入主存。

那么两个线程分别进行了一次自增操作后,inc只增加了1。

根源就在这里,自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。

解决方案:可以通过synchronized或lock,进行加锁,来保证操作的原子性。也可以通过使用AtomicInteger

应用

  1. 状态标记量
  2. 单例模式中的double check

转自:https://www.cnblogs.com/lliuye/p/9549881.html


损失函数、代价函数与目标函数

  • 损失函数(Loss Function):是定义在单个样本上的,是指一个样本的误差。
  • 代价函数(Cost Function):是定义在整个训练集上的,是所有样本误差的平均,也就是所有损失函数值的平均。
  • 目标函数(Object Function):是指最终需要优化的函数,一般来说是经验风险+结构风险,也就是(代价函数+正则化项)。

常用的损失函数

0-1损失函数(0-1 loss function)

$$
L(y, f(x))=\left{\begin{array}{ll}
1, & y \neq f(x) \
0, & y=f(x)
\end{array}\right.
$$

也就是说,当预测错误时,损失函数为 1,当预测正确时,损失函数值为 0。该损失函数不考虑预测值和真实值的误差程度。只要错误,就是 1。

平方损失函数(quadratic loss function)

$$
L(y, f(x))=(y-f(x))^{2}
$$

是指预测值与实际值差的平方。

绝对值损失函数(absolute loss function)

$$
L(y, f(x))=|y-f(x)|
$$

该损失函数的意义和上面差不多,只不过是取了绝对值而不是求绝对值,差距不会被平方放大。

对数损失函数(logarithmic loss function)

$$
L(y, p(y | x))=-\log p(y | x)
$$

这个损失函数就比较难理解了。事实上,该损失函数用到了极大似然估计的思想。P(Y|X)通俗的解释就是:在当前模型的基础上,对于样本X,其预测值为Y,也就是预测正确的概率。由于概率之间的同时满足需要使用乘法,为了将其转化为加法,我们将其取对数。最后由于是损失函数,所以预测正确的概率越高,其损失值应该是越小,因此再加个负号取个反。


对数损失是用于最大似然估计的。

一组参数在一堆数据下的似然值,等于每一条数据在这组参数下的条件概率之积。

而损失函数一般是每条数据的损失之和,为了把积变为和,就取了对数。

再加个负号是为了让最大似然值和最小损失对应起来。

Hinge loss

Hinge loss一般分类算法中的损失函数,尤其是SVM,其定义为:
$$
L(w, b)=\max {0,1-y f(x)}
$$
其中 $𝑦=+1$ 或 $𝑦=−1$,$f(𝑥)=wx+b$,当为SVM的线性核时。

常用的代价函数

均方误差(Mean Squared Error)

$$
M S E=\frac{1}{N} \sum_{i=1}^{N}\left(y^{(i)}-f\left(x^{(i)}\right)\right)^{2}
$$
均方误差是指参数估计值与参数真值之差平方的期望值;MSE 可以评价数据的变化程度,MSE 的值越小,说明预测模型描述实验数据具有更好的精确度。($i$ 表示第 $i$ 个样本,$N$ 表示样本总数)

通常用来做回归问题的代价函数。

均方根误差

$$
R M S E=\sqrt{\frac{1}{N} \sum_{i=1}^{N}\left(y^{(i)}-f\left(x^{(i)}\right)\right)^{2}}
$$

均方根误差是均方误差的算术平方根,能够直观观测预测值与实际值的离散程度。

通常用来作为回归算法的性能指标。

平均绝对误差(Mean Absolute Error)

$$
M A E=\frac{1}{N} \sum_{i=1}^{N}\left|y^{(i)}-f\left(x^{(i)}\right)\right|
$$

平均绝对误差是绝对误差的平均值,平均绝对误差能更好地反映预测值误差的实际情况。

通常用来作为回归算法的性能指标。

交叉熵代价函数(Cross Entry)

$$
H(p, q)=-\sum_{i=1}^{N} p\left(x^{(i)}\right) \log q\left(x^{(-i)}\right)
$$

交叉熵是用来评估当前训练得到的概率分布与真实分布的差异情况,减少交叉熵损失就是在提高模型的预测准确率。其中 $p(x)$ 是指真实分布的概率,$q(x)$ 是模型通过数据计算出来的概率估计。

比如对于二分类模型的交叉熵代价函数:
$$
L(w, b)=-\frac{1}{N} \sum_{i=1}^{N}\left(y^{(i)} \log f\left(x^{(i)}\right)+\left(1-y^{(i)}\right) \log \left(1-f\left(x^{(i)}\right)\right)\right)
$$

其中 $f(x)$ 可以是sigmoid函数。或深度学习中的其它激活函数。而 $y^(i) \in 0, 1$。

通常用做分类问题的代价函数。