本文共 12692 字,大约阅读时间需要 42 分钟。
大数据计算系统
大数据计算框架的几个要素 : • 计算场景: 适用于何种任务使用? • 抽象:程序员看到的框架是什么样的? • API:程序员如何使用框架? • 系统架构:系统有哪些模块? • 基本数据操作:如何操作数据?如何高效实现? • 流程优化:如何将一个计算任务转化为基本数据操作执行过程? • 流程调度:如何有效执行基本数据操作? • 数据存储机制:如何存储数据? • 事务处理:如何确保计算过程正确地进行?分别对于几种计算系统
Hadoop是一个开源的、可靠的、可扩展的分布式并 行计算框架,主要包括:
– MapReduce 离线大数据分析计算引擎 – HDFS – 分布式文件系统 – YARN– 任务执行调度资源管理框架 – Hbase – NoSQL数据库 –Hive – 分布式数据仓库批处理
Map-Reduce 函数模型
MapReduce并行处理的基本过程 :1.有一个待处理的大数据,被划分为大 小相同的数据块(如 64MB),及与此相应 的用户作业程序
2.系统中有一个负责调度的主节点(Master), 以及数据Map和 Reduce工作节点 (Worker) 3.用户作业程序提交 给主节点 4.主节点为作业程序 寻找和配备可用的 Map节点,并将程 序传送给map节点 5.主节点也为作业程 序寻找和配备可用的 Reduce节点,并将 程序传送给Reduce 节点 6.主节点启动每个 Map节点执行程序, 每个map节点尽可 能读取本地或本机 架的数据进行计算 7.每个Map节点处理读取 的数据块,并做一些数据整 理工作(combining, sorting等)并将中间结果 存放在本地;同时通知主 节点计算任务完成并告知 中间结果数据存储位置 8.主节点等所有Map节 点计算完成后,开始 启动Reduce节点运行; Reduce节点从主节点 所掌握的中间结果数 据位置信息,远程读 取这些数据 9.Reduce节点计算结果 汇总输出到一个结果文 件即获得整个处理结果
重载MapReduce函数
可以分为客户端节点,JobTasker节点,TaskTracker节点
MapReduce框架中 进行并行计算的基本事务单元被称为任务(Task) ,分为Map 和Reduce 任务,一个作业(Job)通常 包含多个任务 (Task)
流程优化:无 (额外的DAG模型生成工具)
流程调度:基础任务调度、Map与Reduce函数的执行初始化:
然后进行作业调度
作业调度大致有先进先出(FIFO)公平(Fair)调度,能力(Capacity)调度器 三种
下面进行的就是任务分配 ,差不多就是创建TaskTracker,然后JobTracker与这些TaskTracker进行通信,保活检测等
等待前三部完成后,后面进行的就是具体的Spilt-Map-Shuffle-Reduce-Output过程了
比较值得说的就属于Shuffle Shuffle顾名思义,洗牌,做的就是Map后进行文件排序和部分的合并(合并Map产生的中间数据),如下图所示:HDFS
采用NameNode 和DataNode 的结构,NameNode 是整个文件系统的大脑,提供整个文件系统的目录信息,各个文件的分块信息,数据块的位置信息,并且 管理各个数据服务器。 DataNode 是数据服务器,分布式文件系统中的每一个文件,都被切分成若务器上HDFS中每个文件都会被切分成若干个块(Block),默认64MB,每一 块都有连续的一段文件内容是存储的基本单位。客户端写文件的时候,不是一个字节一个字节写 入文件系统的,而是累计到一定数量后,往文件写入数个数据包(Packet )。 在每一个数据包中, 都会将数据切成更小的块 (Chunk )( 512 字节 )
1、使用HDFS Client,向远程的Namenode发起RPC(远程过程调用)请求;
2、Namenode会视情况返回文件的部分或者全部block列表,对于每个block, Namenode都会返回有该block拷贝的datanode地址; 3-4、HDFS Client 选取离客户端最接近的datanode来读取block; 5、当读完列表的block后,如果文件读取还没有结束,客户端开发库会继续向Namenode 获取下一批的block列表。 6、读取完当前block的数据后,关闭与当前的datanode连接,并为读取下一个block寻找最佳的datanode
HDFS写:
1.HDFS Client 向远程的Namenode发起RPC请求;
2.Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行 操作,成功则会为文件创建一个记录,否则会让客户端抛出异常; 3.当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并 在内部以"data queue"的形式管理这些packets,并向Namenode申请新 的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根 据在Namenode中对replication的设置而定。 4. 开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把 packet以流的方式写入第一个datanode,该datanode把该packet存储之后, 再将其传递给在此pipeline中的下一个datanode,直到最后一个datanode,这 种写数据的方式呈流水线的形式。 5. 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递 至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返 回的ack packet后会从"ack queue"移除相应的packet。 6. 如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关 闭,出现故障的datanode会从当前的pipeline中移除,剩余的block会继续剩下 的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的 datanode,保持replicas设定的数量。
HDFS优点:支持扩展
HDFS不擅长:就是讲Hadoop的容错机制
节点的容错: 主节点中会周期性地设置检查点(checkpoint),检查整个计算作业的执行情况,一旦某个任务失效,可以从最近有效 的检查点开始重新执行,避免从头开始计算的时间浪费。 工作节点失效是很普遍发生的,主节点会周期性地给工作节点发送检测命令,如果工作节点没有回应,这认为该工作节点失效,主节点将终止该工作节点的任务并把失效的任务 重新调度到其它工作节点上重新执行TaskTracker的容错:
容错的恢复
仅供参考
MapReduce 1.0存在很多缺点 :JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成 很大的内存开销。 在 TaskTracker 端,以map/reducetask 的数目作 为资源的表示过于简单,没有考虑到 cpu/ 内存的占 用情况。 MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会 强制进行系统级别的升级更新。强制让分布式集群系 统的每一个用户端同时更新。
因此升级到2.0,使用Yarn进行调度
Yarn里面主要可以分为三种模块: ResourceManager : 调度、启动每一个 Job 所属的 ApplicationMaster、 另外监控 ApplicationMaster ,是Client和ApplicationMaster 之间交流的中间件NodeManager :类似老版本的TaskTracker
ApplicationMaster :负责一个 Job 生命周期内的所有工作,类似老的框架 中 JobTracker
ResourceManager
– 存在单点故障; – 基于ZooKeeper实现。 NodeManager –失败后,ResourceManager 将失败任务告诉对应的ApplicationMaster; –ApplicationMaster决定如何处理失败的任务。 ApplicationMaster –失败后,由ResourceManager重启; –ApplicationMaster需处理内部任务的容错问题; –ApplicationMaster会保存已经运行完成的Task, 重启后无需重新运行已经完成的工作。
在谈Spark之前,我们看看什么是大数据的批处理:
批处理模式中使用的数据集通常符合下列特征:
批处理适合
批处理不适合
再看一下Spark与Hadoop的对比
因为要理解Spark必须先理解其中的RDD机制,spark中的很多特性都和他有关,所以先谈RDD(弹性分布式数据集)
一个RDD是一个分布式对象集合,本质上是一个只读的分区记录集合 ,提供了一个抽象的数据架构,不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理 。一个RDD的不同分区可以被保存到集群中不同的节点上, 从而可以在集群中的不同节点上进行并行计算
不同RDD之间的转换操作形成依赖关系,可以实现数据流水处理,避免中间数据存储
RDD提供了一种高度受限的共享内存模型
RDD的执行过程:
从上面可以看出RDD的优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单
计算的中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销
存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化
sc=new SparkContext rDD=sc.textfile(“ hdfs://…”) rDD.filter(…) rDD.CacherDD.Count rDD.map
还是以最简单的WordCount举例
概念:
spark中 一个应用由一个Driver(SparkContext)和若干个作业构成,一个作业(Job )由多个阶段构成,一个阶段由多个没有Shuffle关系的任务组成
这里的作业( Job )是指一个作业包含多个RDD及作用于相应RDD上的各种操作 ,和Hadoop中的类似
而阶段( Stage )是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合, 代表了一组关联的、相互之间没有Shuffle(重新洗牌,就是在各个分布式系统上重新分布数据,为后面的Reduce过程节省一些操作)依赖关系的任务组 成的任务集
DAG:是Directed Acyclic Graph(有向无环图)的简称, 反映RDD之间的依赖关系 ,就是由初始的RDD逐个向后演化的过程
Cluster Manager:集群资源管理器,顾名思义,管理集群资源,集群资源管理器为task分配满足 要求的节点,并在节点按照要求创建Executor
Executor:是运行在工作节点(WorkerNode)的一个进程 ,负责运行Task
任务( Task ):运行在Executor上的工作单元
spark的启动流程:
- Spark的Driver Program (简称 Driver)获取来自用户的应用程序 ,完成task的解析和生成
- Driver向Cluster Manager(集 群资源管理器)申请运行task需 要的资源。
- 集群资源管理器为task分配满足 要求的节点,并在节点按照要求 创建Executor ,创建的Executor向Driver注册
- Driver将spark应用程序的代码 和文件传送给分配的executor
- executor运行task,运行完之 后将结果返回给Driver或者写入 HDFS或其他介质。
首先就要划分阶段,将Job划分为Stage,具体是根据DAG 图中的RDD 依赖关系,把一个作业分成多个阶段
阶段划分的依据是窄依赖和宽依赖:
这里不得不提窄依赖和宽依赖 (老师答疑时候也提了一下)
从理解上谈,窄依赖就是一个父分区中的信息最多只会传输到一个子分区中,类似于映射的性质
宽依赖就是相反的的,一个父分区中的信息可以存在于多个子分区
举一个形象的例子:
因为宽依赖往往对应着shuffle操作(多对一,汇总,多节点),需要在运行过程中将同一个父RDD 的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换,可以实现“流水线”优化,而宽依赖无法实现“流水线”优化
从上面可以得出流程优化的方法:
一个优化的实例:
就是spark的缓存机制,基于内存和磁盘的缓存
首先分区和数据块是一一对应的 在内部建立RDD分区和数据块之间的映射,需要读取缓存的RDD时, 根据映射关系取得分区对应的数据块 一个数据块对应着文件系统中的一个文件,文件名和块名称的映射 关系是通过哈希算法计算所得的RDD本身维护着可以用来重建丢失分区的信息
RDD还有优秀的容错机制:
流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,实时性要求保证较低的延迟时间,达到秒级别,甚至是毫秒级别 ,相对于批处理用充裕时间处理静态数据,流数据必须采用实时计算,响应时间为秒级甚至更少
基于MapReduce模型的Hadoop很难满足时效性要求,启动本身是需要时间的:输入切分、调度、启动进程 等,在集群上共享Job也比较复杂,可能需要等待资源 ,而所有数据都需要读写磁盘 ,这些因素加一起导致了Hadoop不适合流计算
数据流处理,可用来实时处理新数据和更新数据库,兼具容错性和 可扩展性。
既然是流计算,就得和批处理的MapReduce不一样把
Storm将流数据Stream描述成一个无限的Tuple序列,这些Tuple序列会以分布式的方式并行地创建和处理 ,每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型
Spout水龙头:Storm认为每个Stream都有一个源头,并把这个源头抽象 为Spout ,通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装 成Tuple形式,发送到Stream中。Spout是一个主动的角色,在接口 内部有个nextTuple函数,Storm框架会不停的调用该函数
Bolt:Storm将Streams的状态转换过程抽象为Bolt。Bolt即可以处 理Tuple,也可以将处理后的Tuple作为新的Streams发送给其他Bolt ,可以执行过滤、函数操作、Join、操作数据库等任何操作 ,其接口中有一个execute(Tuple input)方法, 在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑
Topology:Storm将Spouts和Bolts组成的网络抽象成Topology, 它可以被提交到Storm集群执行。Topology可视为流转换图,图中 节点是一个Spout或Bolt,边则表示Bolt订阅了哪个Stream。当 Spout或者Bolt发送元组时,它会把元组发送到每个订阅了该 Stream的Bolt上进行处理
(类似于前面的有向无环图DAG和Hadoop中的Job)了解一下接即可,差不多得了
Nimbus:主节点,是一个调度中心,负责分发任务 ,类似Hadoop中的JobTracker
Zookeeper:是完成Supervisor和Nimbus之间协调的服务,来作为分布式协调组件,负责Nimbus和多个 Supervisor之间的所有协调工作,若Nimbus 进程或Supervisor进程意外终止,重启时也能读取、恢复之前的状 态并继续工作,加入分布式协调组件使得Storm极其稳定
Supervisor:从节点,任务执行的地方 ,类似于TaskTracker
Worker:任务工作进程,一个Supervisor中可以有多个Worker。
Executor:Worker进程在执行任务时,会启动多个Executor线程关键组件:
Stream Groupings: 定义了一个流在Bolt任务间该如何被切分。 这里有Storm提供 的6个Stream Grouping类型:
- 随机分组(Shuffle grouping):随机分发 tuple到Bolt的任务,保证每个任务获得相等数量 的tuple。
- 字段分组(Fields grouping):根据指定字段 分割数据流,并分组。例如,根据“user-id”字 段,相同“user-id”的元组总是分发到同一个任 务,不同“user-id”的元组可能分发到不同的任 务。
- 全部分组(All grouping):tuple被复制到bolt的所 有任务。这种类型需要谨慎使用。
- 全局分组(Global grouping):全部流都分配到 bolt的同一个任务。明确地说,是分配给ID最小的那个 task。
- 无分组(None grouping):你不需要关心流是如何 分组。目前,无分组等效于随机分组。但最终,Storm 将把无分组的Bolts放到Bolts或Spouts订阅它们的同一 线程去执行(如果可能)。
- 直接分组(Direct grouping):这是一个特别的分组 类型。元组生产者决定tuple由哪个元组处理者任务接收。
基于Topology进行持续的流式计算
一个wordcount的案例:Strom的容错可以分为多种类型:
任务级失败单机节点级故障
集群机器故障
实时计算,Spark Streaming是Spark核心API的一个扩展,可以 实现高吞吐量的、具备容错机制的实时流数据的处理
以一系列非常小的、确定的批处理作业的形式运行流计算 ,说白了还是spark模型,就是让他gkd而已
其本质来看还是spark,无法实现毫秒级的流计算,而 Storm可以实现毫秒级响应 ,但是相比于Storm,RDD数据集更容 易做高效的容错处理
Spark Streaming是将流式计算分解成一系列短小的批处理作业。
批处理引擎用Spark Core。类似于spark的容错机制, RDD可以记住从原始的容错输入创建它的操作序列 ,如果丢失可以重新计算 ;而批量输入数据被复制到多个工作节点的内存中,相互之间是容错的 ,如果是工作人员故障而丢失的数据,可以从输入的数据开始重新计算
许多实际计算机问题会涉及到大型图 ,而MapReduce不适合图处理 , 这是因为MapReduce每一阶段都利用整个图的全部状态 ,需要整合MapReduce链 ,并行处理需要多次迭代,这导致MapReduce的迭代,影响到了整体的性能
Superstep:并行结点计算 ,图并行模式
对于图中的每一个节点,都需要接受上一个superstep发出的消息 ,执行相同的用户定义函数 ,来修改它的值或者其输出边的值 ,然后将消息送到其他点(由下一个superstep接受) ,以此来改变改变大图的拓扑结构图并行模式的简要流程:
对于每个节点:基于上面的抽象,Pregel 允许将图算法写成一系列的MapReduce调用,然后在执行计算的机器上保持顶点和边 , 用网状结构传输信息
Pregel系统也使用主/从模型
主节点从节点
处理自己的任务
与其他从节点通信
从节点可以聚合它的节点报告的消息并整 合为一条消息发送 ,可以减少消息流量和硬盘占用
持久化的数据位于分布式存储系统(如GFS或BigTable) 中
临时数据存储在本地磁盘中 (一笔带过)容错
引入检查点机制: – 主节点定期指示从节点将分区的状态保存到持久化存储中 ( 例如:顶点数值,边数值,信息内容 ) 恢复 主节点将图形分区重新分配给当前可用的从节 点 所有worker都从最近可用的检查点重新加载分区状态既然是基于spark的,还是RDD那套东西
就是将属性图转化为表 将图中的顶点分割开来,构建顶点表,边表,路由表(以RDD的形式储存)spark
用 Map-Reduce三元组收集每个顶点的邻域信息
转载地址:http://xaoiz.baihongyu.com/