Hadoop 超燃之路
发布时间:2024-11-15 02:52:11点击:
1 Hadoop 简介
1.1 Hadoop 由来
数据容量
大数据时代数据量超级大,数据具有如下特性:
以前的存储手段跟分析方法现在行不通了!Hadoop 就是用来解决海量数据的 存储 跟海量数据的 分析计算 问题的,创始人 Doug Cutting 在创建Hadoop 时主要思想源头是 Google 三辆马车
现在说的 Hadoop 通常指的是 Hadoop 生态圈 这样一个广义概念,如下:
大数据知识体系
1.2 Hadoop 特点
1.2.1 Hadoop 特点
高可用
Hadoop 底层对同一个数据维护这多个复本,即使Hadoop某个计算元素或者存储出现问题,也不会导致数据的丢失。
高扩展
在集群之间分配任务数据,可以方便的扩展跟删除多个节点,比如美团节点就在3K~5k 个节点
高效性
在MapReduce的思想下 Hadoop是并行工作的,以加快任务的处理速度
高容错性
如果一个子任务速度过慢或者任务失败 Hadoop会有响应策略会自动重试跟任务分配。
1.2.2 Hadoop 架构设计
Hadoop 的 1.x 跟 2.x 区别挺大,2.x 主要是将1.x MapReduce中资源调度的任务解耦合出来交 Yarn来管理了(接下来本文以2.7开展探索)。
1.x跟2.x变化
Hadoop Distributed File System 简称 HDFS,是一个分布式文件系统。HDFS有着高容错性,被设计用来部署在低廉的硬件上来提供高吞吐量的访问应用程序的数据,适合超大数据集的应用程序。
MapReduce是一种编程模型,包含Map(映射) 跟 Reduce(归约)。你可以认为是归并排序的深入化思想。
Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
Common 组件
log组件。
独有RPC体系ipc、I/O系统、序列化、压缩。
配置文件conf。
公共方法类,比如checkSum校验。
产生背景:
随着数据量变大,数据在一个OS的磁盘无法存储了,需要将数据分配到多个OS管理的磁盘中,为了方面管理多个OS下的磁盘文件,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,HDFS是通过目录树定位文件。需注意 HDFS 只是分布式文件系统中的其中一种。
2.1 HDFS 优缺点
2.1.1 优点
高容错性
数据会自动保存多个副本,默认为3个,通过增加副本来提高容错性。
某个副本丢失后系统会自动恢复。
高扩展性
HDFS集群规模是可以动态伸缩的。
适合大数据处理
数据规模达到GB/TB/PB级别。
文件规模达到百万规模以上。
流式访问,它能保证数据的一致性。
低成本,部署廉价机器 提高了商业化能了。
统一对外接口,Hadoop本身用Java编写,但基于此的应用程序可以用其他语言编写调用。
2.1.1 缺点
做不到低延时
Hadoop对高吞吐做了优化,牺牲了获取数据的延迟,比如毫秒级获取数据在Hadoop上做不到。
不适合存储大量小文件
存储大量小文件的话,它会占用 NameNode 大量的内存来存储文件、目录和块信息。因此该文件系统所能存储的文件总数受限于 NameNode的内存容量,根据经验,每个文件、目录和数据块的存储信息大约占150字节。
小文件存储的寻道时间会超过读取时间,它违反了HDFS的设计目标。
无法修改文件
对于上传到HDFS上的文件,不支持修改文件,仅支持追加。HDFS适合一次写入,多次读取的场景。
无法并发写入
HDFS不支持多用户同时执行写操作,即同一时间,只能有一个用户执行写操作。
2.2 HDFS 组成架构
2.2.1 Client
客户端主要有如下功能:
2.2.2 NameNode
NameNode 简称NN,就是HDFS中的 Master,是个管理者,主要有如下功能:
映射信息:NameNode(文件路径,副本数,{Block1,Block2},[Block1:[三个副本路径],Block2:[三个副本路径]])
2.2.3>
NN跟DN对比
DataNode 的工作机制
DataNode 确保数据完整性
DN 进程死亡或无法跟 NN 通信后 NN 不会立即将 DN 判死,一般经过十分钟 + 30秒再判刑。
2.2.4 Secondary NameNode
当 NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务。需要通过 HA等手段实现自动切换。SNN 主要提供如下功能:
2.2.5 Block
HDFS中的文件在物理上是分块 Block 存储的,在 1.x 版本中块 = 64M,2.x中块 =128M。块不是越大越好,也不是越小越好。因为用户获取数据信息时间 = 寻址块时间 + 磁盘传输时间。
块太小会增加寻址时间,程序大部分耗时在寻址上了。
快太大则会导致磁盘传输时间明显大于寻址时间,程序处理块数据时较慢。
2.3 HDFS 写流程
2.3.1 具体写流程
写流程
2.3.2 节点距离计算
在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的DataNode接收数据。
最近距离 = 两个节点到达最近的共同祖先的距离总和。
节点距离计算
2.3.3 副本节点选择
机架感知
2.4 HDFS 读流程
读流程
2.5 NameNode 和 Secondary NameNode
2.5.1 NN 和 2NN 工作机制
NameNode 中元数据单独存到磁盘不方便读写。单独存到内存时,断电会丢失。Hadoop 采用的是如下方式。
元数据序列化后在磁盘存储的地方。包含HDFS文件系统的所有目录跟文件inode序列化信息。
元数据在内存中存储的地方。
Edit 文件:
Edit 记录客户端更新元数据信息的每一步操作(可通过Edits运算出元数据)。
一旦元数据有更新跟添加,元数据修改追加到Edits中然后修改内存中的元数据,这样一旦NameNode 节点断电,通过 FsImage 跟 Edits的合并生成元数据。
Edits文件不要过大,系统会定期的由 Secondary Namenode 完成 FsImage 和 Edits 的合并。
NN跟2NN工作机制
第一阶段:NameNode 启动
第二阶段:Secondary NameNode 工作
Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode是否检查结果。一般下面条件任意满足即可:
2.6 安全模式
NameNode 刚启动时候系统进入安全模式(只读),如果整个文件系统中99.9%块满足最小副本,NameNode 会30秒后退出安全模式。
2.6.1 NameNode 启动
将 FsImage 文件载入内存再执行Edits文件各种操作,最终内存生成完整的元数据镜像。
创建个新的 FsImage 跟空 Edits 文件。
NameNode 开始监听>
HA故障转移
2.7.1 HDFS-HA工作要点
元数据管理方式需要改变
内存中各自保存一份元数据。
Edits 日志只有 Active 状态的 NameNode 节点可以做写操作。
两个 NameNode 都可以读取 Edits。
共享的 Edits 放在一个共享存储中管理(qjournal 或 NFS)。
需要一个状态管理功能模块
实现了一个ZKFC,常驻在每一个namenode所在的节点,每一个ZKFC负责监控自己所在NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由ZKFC来负责切换,切换时需要防止brainsplit现象的发生。
必须保证两个 NameNode 之间能够ssh无密码登录
防脑裂,同一时刻仅仅有一个 NameNode 对外提供服务。
2.7.2 ZooKeeper
ZooKeeper 提供如下功能:
2.7.3 ZKFC进程
在 NameNode 主机上有个 ZKFC(ZKFailoverController) 这样的ZK客户端,负责监视管理 NameNode状态。ZKFC负责:
3 MapReduce
MapReduce是个分布式运算程序的编程框架,是基于 Hadoop 的 数据分析计算核心框架。处理过程分为两个阶段:Map 阶段跟 Reduce阶段。
Map 负责把一个任务分解成多个任务。该阶段的 MapTask 并发实例,完全并行运行,互不相干。
Reduce 负责把多个任务处理结果汇总。该阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask并发实例的输出。
MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序串行运行。
用户编写MR任务时候 程序实现部分分成三个部分:Mapper、Reducer、Driver(提交运行mr程序的客户端)。
3.1 优缺点
3.1.1 优点
易于编程
简单实现了一些接口就可以完成个分布式程序,你写个分布式程序跟写个串行化程序一样,类似八股文编程。
良好的扩展
计算资源不足时可以简单的增加机器来扩展计算能力。
高容错性
MapReduce任务部署在多台机器上后如果其中一台挂了,系统会进行自动化的任务转移来保证任务正确执行。
适合PB级数据离线处理
比如 美团3K个节点的集群并发,提供超大数据处理能力。
3.1.2 缺点
不擅长实时计算
MapReduce 不会想 MySQL 一样毫秒级返回结果。
不擅长流式计算
流式计算的 输入数据是动态的,而 MapReduce 的输入数据集是静态的。
不擅长DAG计算
多个应用程序存在依赖关系,MapReduce的作业结果会落盘导致大量磁盘IO,性能贼低,此时上Spark吧!
3.2 序列化
序列化
把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
反序列化
将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
因为 Hadoop 在集群之间进行通讯或者 RPC调用时是需要序列化的,而且要求序列化要快、且体积要小、占用带宽要小。而Java自带的序列化是重量级框架,对象序列化后会附带额外信息,比如各种校验信息,header,继承体系等。所以Hadoop 自研了序列化框架。
Java类型 | Hadoop Writable类型 |
---|---|
BooleanWritable | |
ByteWritable | |
IntWritable | |
FloatWritable | |
LongWritable | |
DoubleWritable | |
MapWritable | |
ArrayWritable |
3.3 MapTask 并行度
数据块:Block 是 HDFS 物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
切片核心注意点:
3.3.1 FileInputFormat 切片源码追踪
3.3.2 切片大小计算
SplitSize= Math.max(minSize,Math.min(maxSize,blockSize))
3.3.3 切片举例
切片举例
3.4 FileInputFormat
3.4.1 实现类简介
MR任务输入文件个数各有不同,针对不同类型MR定义了一个接口跟若干实现类来读取不同的数据。
input继承关系
TextInputFormat
默认使用类,按行读取每条数据,Key是该行数据的 offset,Value = 行内容。
KeyValueTExtInputFormat
每行都是一条记录,被指定分隔符分割为Key跟Value,默认是 \t 。
NLineInputFormat
该模型下每个 map 处理 InputSplit 时不再按照 Block 块去划分,而是按照指定的行数N来划分文件。
自定义InputFormat
基础接口,改写 RecordReader,实现一次读取一个完整文件封装为 KV,使用 SequenceFileOutPutFormat输出合并文件。
CombineTextInputFormat
用于小文件过多场景,逻辑上合并多个小文件个一个切片任务。较重要 中
3.4.2 CombineTextInputFormat
默认框架 TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。CombineTextInputFormat可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask处理。主要包含 虚拟存储过程 跟 切片过程。
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
虚拟存储过程:
切片过程:
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
切片过程
3.6 OutputFormat
OutputFormat 是 MapReduce 输出的基类,常见的实现类如下:
3.5.1 TextOutputFormat
系统默认输出格式,把每条记录写为文本行,他的K跟V是任意类型,系统在写入时候会统一转化为字符串。
3.5.2 SequenceFileOutputFormat
此模式下的输出结果作为后续MapReduce任务的输入,该模式下数据格式紧凑,很容易被压缩。
3.5.3 自定义OutputFormat
如果需求不满足可按需求进行自定义。
3.6 MapReduce 流程
3.6.1 整体流程图
MapReduce流程
MapTask 工作机制
ReduceTask 工作机制
3.6.2 Shuffle
Shuffle机制
MapReduce 的核心就是 Shuffle 过程,Shuffle 过程是贯穿于 map 和 reduce两个过程的!在Map端包括Spill过程,在Reduce端包括copy和sort过程。 具体Shuffle过程如下:
3.6.3 Partition
MapReduce 默认的分区方式是hashPartition,在这种分区方式下,KV 对根据 key 的 hashcode值与reduceTask个数进行取模,决定该键值对该要访问哪个ReduceTask。
//numReduceTasks默认=1所以导致默认的reduce结果=1
自定义的时候一般就是类继承Partitioner然后重写getPartition方法。用户也可以设置ReduceTask数量,不过会遵循如下规则。
比如 假设自定义分区数为5。
3.6.4 环形缓冲区
Map 的输出结果由 Collector 处理,每个 Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
环形数据结构其实就是个字节数组byte[],叫kvbuffer,默认值100M。里面主要存储 数据 跟元数据。中间有个分界点,并且分界点是变化的。当环形缓冲区写入的buffer的大小达到 80%满足溢写条件的时候,开始溢写spill。系统有两个线程一个负责写入数据,一个负责spill数据。
数据:
存储 Key + Value + bufindex。其中bufindex(即数据的存储方向)是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。
元数据:
元数据是为了排序而生,是关于数据描述的数据。
Kvmeta = Partition + keystart + valstart + valLength , 共占用4个Int长度,其中K的长度 =V的起点 - K的起点。
Kvmeta 的存放指针 Kvindex 每次都是向下跳四个格子,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放partition的起始位置、(Kvindex+1)的位置存放keystart、(Kvindex+2)的位置存放valstart、(Kvindex+3)的位置存放valuelength,然后Kvindex跳到 -8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。
环形缓冲区
3.6.5 Combiner 合并
3.6.6 关于 MapReduce 排序
MapReduce框架最重要的操作就是排序,MapTask 跟 ReduceTask 都会根据key进行按照字典顺序进行快排。
MapTask 将缓冲区数据快排后写入到磁盘,然后磁盘文件会进行归并排序。
ReduceTask统一对内存跟磁盘所有数据进行归并排序。
3.6.7 ReduceJoin 跟 MapJoin
Reducejoin
思路:通过将关联条件作为Map 输出的 Key,将两表满足Join条件的数据并携带数据源文件发送同一个ReduceTask,在Reduce端进行数据串联信息合并。
缺点:合并操作在Reduce端完成,Reduce 端处理压力太大,并且Reduce端易产生数据倾斜。
适用:适用于一张表十分小、一张表很大的场景。
思路:在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。
3.6.8 注意点
ReduceTask = 0 说明没有Reduce节点,输出文件个数和 Map 个数一样。
ReduceTask 默认= 1,所以结果是一个文件。
ReduceTask 的个数不是任意设置的,需跟集群性能还有结果需求而定。
逻辑处理 Mapper 时候可根据业务需求实现其中三个方法,map、setup、cleanup。
3.7 压缩
压缩是提高Hadoop运行效率的一种优化策略,通过在Mapper、Reducer运行过程的数据进行压缩来减少磁盘空间跟网络传输,最终实现提高MR运行速度。但需注意压缩也给CPU运算带来了负担。
压缩的基本原则:
运算密集型任务 ,少压缩。
IO密集型任务,多压缩。
压缩格式 | 自带 | 算法 | 扩展名 | 可切分吗 | 压缩后,代码修改 |
---|---|---|---|---|---|
是 | 否 | 不需要修改 | |||
是 | 否 | 不需要修改 | |||
是 | 是 | 不需要修改 | |||
否 | 否 | 不需要修改 | |||
否 | 是 | 需要建索引还需要指定输入格式 |
Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
4.1 基本组成
Yarn架构
YARN主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等组件构成。
ResourceManager
处理客户端请求
监控NodeMananger
启动或监控ApplicationMaster
计算资源的分配跟调度
NodeManager
管理单个节点上资源
处理来着ResourceManager的命令
处理来自ApplicationMaster的命令
ApplicationMaster
负责数据切分。
为应用程序申请资源并分配给内部任务。
任务的监控跟容错。
Container 是 YARN 中资源的抽象,封装了某个节点上的多维度资源,比如内存、CPU、磁盘、网络等。
YarnChild 其实它就是一个运行程序的进程。MrAppMaster 运行程序时向 Resouce Manager 请求的 Maptask /ReduceTask。
4.2 Yarn 调度 MapReduce 任务
Yarn调度流程
当 MR 程序提交到客户端所在的节点时后 大致运行流程如下:
作业提交
Client 调用 job.waitForCompletion 方法 YarnRunner ,向整个集群提交MapReduce作业。Client 向 RM申请一个作业id。
RM 给 Client 返回该 job 资源的提交路径和作业 id。
Client 提交jar包、切片信息和配置文件到指定的资源提交路径。
Client 提交完资源后,向 RM 申请运行 MrAppMaster。
作业初始化
当 RM 收到 Client 的请求后,将该 Task 添加到容量调度器中。
某一个空闲的 NodeManager 领取到该 Task 。
该 NodeManager 创建 Container,并产生 MRAppMaster。
下载 Client 提交的资源 到本地。
任务分配
MRAppMaster 向 RM 申请运行多个 MapTask 任务资源。
RM 将运行 MapTask 任务分配给俩 NodeManager。其中分配原则 是优先 jar 跟 数据在一台机器上,其次就尽可能在一个机房。最后随便来个空闲机器。
任务运行
MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动MapTask,MapTask对数据分区排序。
MrAppMaster 等待所有 MapTask 运行完毕后,向RM申请容器 运行ReduceTask。
ReduceTask 向 MapTask 获取相应分区的数据。
程序运行完毕后,MR会向RM申请注销自己。
进度和状态更新
YARN 中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒向应用管理器请求进度更新来展示给用户。
作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用 waitForCompletion() 来检查作业是否完成。作业完成之后,应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
4.3 资源调度器
目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler 和 FairScheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。
FIFO调度
4.3.2 容量调度器 Capacity Scheduler
容量调度器
4.3.3 公平调度器 Fair Scheduler
支持多队列多用户,每个队列中资源可以配置,同一队列中作业公平共享队列中所有资源。
公平调度器
比如有queue1、queue2、queue3三个任务队列,每个队列中的job按照优先级分配资源,优先级高获得资源多,但会确保每个任务被分配到资源。
每个任务理想所需资源跟实际获得资源的差距叫缺额,同一个队列中是按照缺额高低来先后执行的,缺额越大越优先获得资源。
4.4 任务推测执行
作业完成时间取决于最慢的任务完成时间。系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,此时系统会发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。
5 MapReduce 优化方法
MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。
5.1 数据输入
数据采集时,用 Hadoop Archive 将多个小文件打包成一个Har文件。
业务处理前,SequenceFile 由一系列KV组成,key=文件名,value=文件内容,将大批小文件合并成大文件。
在 MapReduce 处理时,采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。
对于大量小文件任务开启JVM 重用可提速,JVM 重用可以使得 JVM 实例在同一个 job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。
5.2 Map 阶段
减少溢写 Spill 次数,调整循环缓存区大小,减少磁盘IO。
减少合并 Merge 次数,增大Merge文件大小减少次数。
在不影响业务的情况下在Map端进行Combine处理。
5.3 Reduce 阶段
设置合理的Map跟REduce数,太少会导致Task等待。太多会导致竞争资源激烈。
设置Map跟Reduce阶段共存,map运行一定程度后Reduce 也可以运行。
规避使用Reduce,Reduce 端的Buffer也要合理设置,尽量防止溢写到磁盘。
5.4 IO 传输
采用数据压缩方式来减少网络IO时间。
使用SequenceFile二进制文件。
5.5 数据倾斜
通过对数据抽样得到结果集来设置分区边界值。
自定义分区。
使用Combine来减少数据倾斜。
采用MapJoin,尽量避免ReduceJoin。
参考
HDFS-Shell 指令: