大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)詳細(xì)介紹_第1頁
大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)詳細(xì)介紹_第2頁
大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)詳細(xì)介紹_第3頁
大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)詳細(xì)介紹_第4頁
大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)詳細(xì)介紹_第5頁
已閱讀5頁,還剩99頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)

詳細(xì)介紹

目錄

一MapReduce概念..............................................................4

1.1為什么要MapReduce...........................................................................................................4

1.2MapReduce核心思想.....................................................5

1.3MapReduce進(jìn)程.........................................................6

1.4MapReduce編程規(guī)范(八股文)...........................................6

1.5MapReduce程序運(yùn)行流程分析.............................................7

二MapReduce理論篇............................................................8

2.1Writable序歹ij化..........................................................8

2.1.1常用數(shù)據(jù)序列化類型................................................8

2.1.2自定義bean對象實(shí)現(xiàn)序列化接口.....................................8

2.2InputFormat數(shù)據(jù)切片機(jī)制...............................................10

2.2.1FilelnputFormat切片機(jī)制...........................................10

2.2.2CombineTextlnputFormat切片機(jī)制...................................13

2.2.3自定義InputFormat.................................................................................................13

2.3MapTask工作機(jī)制.......................................................14

2.4Shuffle機(jī)制.............................................................15

2.4.1Shuffle機(jī)制.......................................................15

2.4.2MapReduce工作流程..............................................15

2.4.3partition分區(qū)......................................................17

2.4.4排序..............................................................18

2.4.5Groupingcomparator分組...........................................20

2.4.6Combiner合并.....................................................20

2.4.7數(shù)據(jù)傾斜&Distributedcache....................................................................................21

2.5ReduceTask工作機(jī)制.....................................................21

2.6自定義OutputFormat.........................................................................................................22

2.7MapReduce數(shù)據(jù)壓縮....................................................23

2.7.1概述..............................................................23

2.7.2MR支持的壓縮編碼...............................................23

2.7.3采用壓縮的位置...................................................24

2.7.4壓縮配置參數(shù).....................................................25

2.8計(jì)數(shù)器應(yīng)用.............................................................26

2.9數(shù)據(jù)清洗...............................................................26

2.10MapReduce與Yarn...........................................................................................................26

2.10.1Yarn概述........................................................26

2.10.2Yarn的重要概念..................................................26

2.10.3Yarn工作機(jī)制....................................................27

2.11作業(yè)提交全過程........................................................28

2.12MapReduce開發(fā)總結(jié)...................................................30

2.13MapReduce參數(shù)優(yōu)化...................................................31

2.13.1資源相關(guān)參數(shù)....................................................31

2.13.2容錯(cuò)相關(guān)參數(shù)....................................................32

三MapReduce實(shí)戰(zhàn)篇............................................................33

3.1WordCount案例.........................................................33

3.1.1需求1:統(tǒng)計(jì)一堆文件中單詞出現(xiàn)的個(gè)數(shù)(WordCount案例)..........33

3.1.2需求2:把單詞按照ASCII碼奇偶分區(qū)(Partitioner)....................................37

3.1.3需求3:對每一個(gè)maptask的輸出局部匯總(Combiner)............................38

3.1.4需求4:大量小文件的切片優(yōu)化(CombineTextlnputFormat)......................40

3.2流量匯總程序案例.......................................................41

3.2.1需求1:統(tǒng)計(jì)手機(jī)號(hào)耗費(fèi)的總上行流量、下行流量、總流量(序列化)....41

3.2.2需求2:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(Partitioner)

46

3.2.3需求3:將統(tǒng)計(jì)結(jié)果按照總流量倒序排序(排序)....................49

3.3求每個(gè)訂單中最貴的商品(Groupingcomparator)....................................................54

3.4MapReduce中多表合并案例..............................................59

3.4.1需求1:reduce端表合并(數(shù)據(jù)傾斜)...............................60

3.4.2需求2:map端表合并(Distributedcache).....................................................66

3.5小文件處理(自定義InputFormat)...............................................................................69

3.6過濾日志及自定義日志輸出路徑(自定義OutputFormat)......................................74

3.7日志清洗(數(shù)據(jù)清洗)...................................................78

3.7.1簡單解析版.......................................................78

3.7.2復(fù)雜解析版.......................................................81

3.8倒排索引實(shí)戰(zhàn)(多job串聯(lián)).............................................86

3.9找微信共同好友分析實(shí)戰(zhàn)................................................91

3.10壓縮/解壓縮...........................................................98

3.10.1對數(shù)據(jù)流的壓縮和解壓縮..........................................98

3.10.2在M叩輸出端采用壓縮..........................................100

3.10.3在Reduce輸出端采用壓縮........................................102

四、常見錯(cuò)誤...................................................................104

一MapReduce概念

Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于hado叩的數(shù)據(jù)分析

應(yīng)用”的核心框架;

Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的

分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)hadoop集群上。

1.1為什么要M叩Reduce

1)海量數(shù)據(jù)在單機(jī)上處理因?yàn)橛布Y源限制,無法勝任

2)而一旦將單機(jī)版程序擴(kuò)展到集群來分布式運(yùn)行,將極大增加程序的復(fù)雜度和開發(fā)難

3)引入mapreduce框架后,開發(fā)人員可以將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上,

而將分布式計(jì)算中的復(fù)雜性交由框架來處理。

4)nuipreduce分布式方案考慮的問題

(1)運(yùn)算邏輯要不要先分后合?

(2)程序如何分配運(yùn)算任務(wù)(切片)?

(3)兩階段的程序如何啟動(dòng)?如何協(xié)調(diào)?

(4)整個(gè)程序運(yùn)行過程中的監(jiān)控?容錯(cuò)?重試?

分布式方案需要考慮很多問題,但是我們可以將分布式程序中的公共功能封裝成框架,

讓開發(fā)人員將精力集中于業(yè)務(wù)邏輯上。而m叩reduce就是這樣一個(gè)分布式程序的通用框架。

1.2MapReduce核心思想

上圖簡單的闡明了map和reduce的兩個(gè)過程或者作用,雖然不夠嚴(yán)謹(jǐn),但是足以提供

一個(gè)大概的認(rèn)知,map過程是?個(gè)蔬菜到制成食物前的準(zhǔn)備工作,reduce將準(zhǔn)備好的材料合

并進(jìn)而制作出食物的過程

向硅谷

ujuiuj.atguigu.com

MapReduce核心編程思想

需求:統(tǒng)計(jì)其中每Map和Reduce階段

一個(gè)單詞出現(xiàn)的總1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段

1)讀數(shù)據(jù)

次數(shù)(查詢結(jié)果:a-2)第?個(gè)階段的maptask并發(fā)實(shí)例,完全并行運(yùn)行,

)按行處理

P一個(gè)文件,q-z-2互不相干

個(gè)文件)maptask"心

4)Hashmap<單向.value+1)3)第二個(gè)階段的reducetask并發(fā)實(shí)例互不相干,但是

他們的數(shù)據(jù)依穎于個(gè)階段的所仃并發(fā)實(shí)

Hadoopspark等到分給白己的數(shù)甥片全曲:讀完之后I.maptask

格按照首個(gè)字母范例的輸出

hive5)hashmap

、用分成2個(gè)hashmap

Hbase4)MapReduce編程模型只能包含一個(gè)map階段和一個(gè)

Hadoopreduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只

能多個(gè)程序,中行運(yùn)行

sparkmapreduce

統(tǒng)計(jì)a-p開頭的單詞

maptask

輸出數(shù)據(jù)

輸出結(jié)果若干問題細(xì)節(jié)

Oreducetask;>

Hashmap(a-p)一一一到文件1)maptask如何進(jìn)行任務(wù)分配

Hashmap(q-z)2)Reducetask如何進(jìn)行任務(wù)分配

3)MaptaskfllReducetask之間如何銜接

統(tǒng)計(jì)中z開頭的單詞4)如果某maptask運(yùn)行失敗,如何處理

maptask

5)Maptask如果都要自己負(fù)費(fèi)輸出數(shù)

輸出結(jié)果據(jù)的分區(qū),很麻煩

reducetask

輸入數(shù)據(jù)到文件

Hashmab(q-z).

1MrAppMaster負(fù)責(zé)整個(gè)程序的過程調(diào)度邈燙媽

1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段

2)第一個(gè)階段的maptask并發(fā)實(shí)例,完全并行運(yùn)行,互不相干

3)第二個(gè)階段的reducetask并發(fā)實(shí)例互不相干,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有

maptask并發(fā)實(shí)例的輸出

4)MapReduce編程模型只能包含一個(gè)map階段和一個(gè)reduce階段,如果用戶的業(yè)務(wù)邏輯非

常復(fù)雜,那就只能多個(gè)mapreduce程序,串行運(yùn)行

1.3MapReduce進(jìn)程

一個(gè)完整的mapreduce程序在分布式運(yùn)行時(shí)有三類實(shí)例進(jìn)程:

1)MrAppMaster:負(fù)責(zé)整個(gè)程序的過程調(diào)度及狀態(tài)協(xié)調(diào)

2)MapTask:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程

3)ReduceTask:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程

1.4MapReduce編程規(guī)范(八股文)

用戶編寫的程序分成三個(gè)部分:Nipper,Reducer,Driver(提交運(yùn)行mr程序的客戶端)

1)Mapper階段

(1)用戶自定義的Mapper要繼承自己的父類

(2)Mapper的輸入數(shù)據(jù)是KV對的形式(KV的類型可自定義)

(3)Mapper中的業(yè)務(wù)邏輯寫在map()方法中

(4)Mapper的輸出數(shù)據(jù)是KV對的形式(KV的類型可自定義)

(5)map()方法(maptask進(jìn)程)對每一個(gè)<1<、>調(diào)用一次

2)Reducer階段

(1)用戶自定義的Reducer要繼承自己的父類

(2)Reducer的輸入數(shù)據(jù)類型對應(yīng)Mapper的輸出數(shù)據(jù)類型,也是KV

(3)Reducer的業(yè)務(wù)邏輯寫在reduce。方法中

(4)Reducetask進(jìn)程對每一組相同k的<k,v>組調(diào)用一次reduce。方法

3)Driver階段

整個(gè)程序需要一個(gè)Drvier來進(jìn)行提交,提交的是一個(gè)描述了各種必要信息的job對象

4)案例實(shí)操

詳見3.1.1統(tǒng)ii?堆文件中單詞出現(xiàn)的個(gè)數(shù)(WordCount案例)。

1.5MapReduce程序運(yùn)行流程分析

他尚理看

M叩Reduce程序運(yùn)行流程分析

1待處理文本s讀取數(shù)據(jù)「Maptaskss.tx,0-128一7收柒kv到媛存

/user/atguigu/inputInputFormat------?K,voutputcollector

8按照k分區(qū)排

atguiguss.txt序后寫入磁盤

bigdata200mwordcount

Mapper二理a-g

邏輯其克

Javaphp6reducetask11輸出結(jié)果到文件

map(K,v)oteui[u,l><Hbsse,l><$pak.l>

Android<btgdita,l><hivt1>...

Context.write(k,v)wordcountReduce{OutputFormat

Html5

reduce(k,v)

Bigdatabb.txt

Context.wrlte(k,v);

python100mPart-r-00000

Maptaskss.txt128-200}

2客戶斯submit。前,獲取reducetask

行處理數(shù)據(jù)的信息,然后

OutputFormat、

根據(jù)參數(shù)配置,形成一個(gè)wordcountReducef

他務(wù)分配的規(guī)劃.reduce(k,v)

Maptaskbb.txt0-100

10reducet>sk^Context.write{k,v);

ss.txt0-128Part-r-00001

取數(shù)據(jù),月運(yùn)算}

ss.txt128-2003提交切片信息

bb.txtJob.split

wc.jar4計(jì)算出maptask數(shù)星reducetask

Job.xml

9所有maptask任務(wù)完wordcountReducefOutputFormat

Mrappmaster成后,啟動(dòng)相應(yīng)數(shù)量的reduce(k,v)

reducetask,并告知Contextwrite(k,v);

Part-r-00002

NodeManagerreducetask處理數(shù)抿范)

圍(數(shù)據(jù)分區(qū))

1)在MapReduce程序讀取文件的輸入目錄上存放相應(yīng)的文件。

2)客戶端程序在submit。方法執(zhí)行前,獲取待處理的數(shù)據(jù)信息,然后根據(jù)集群中參數(shù)的配

置形成一個(gè)任務(wù)分配規(guī)劃。

3)客戶端提交job.split、jar包、job.xml等文件給yarn,yarn中的resourcemanager啟動(dòng)

MRAppMaster?

4)MRAppMaster啟動(dòng)后根據(jù)本次job的描述信息,計(jì)算出需要的maplask實(shí)例數(shù)量,然后

向集群申請機(jī)器啟動(dòng)相應(yīng)數(shù)量的maptask進(jìn)程。

5)maptask利用客戶指定的inputformat來讀取數(shù)據(jù),形成輸入KV對。

6)maptask將輸入KV對傳遞給客戶定義的map()方法,做邏輯運(yùn)算

7)map()運(yùn)算完畢后將KV對收集到maptask緩存。

8)maptask緩存中的KV對按照K分區(qū)排序后不斷寫到磁盤文件

9)MRAppMaster監(jiān)控到所有maptask進(jìn)程任務(wù)完成之后,會(huì)根據(jù)客戶指定的參數(shù)啟動(dòng)相應(yīng)

數(shù)量的reducetask進(jìn)程,并告知reducetask進(jìn)程要處理的數(shù)據(jù)分區(qū)。

10)Reducetask進(jìn)程啟動(dòng)之后,根據(jù)MRAppMaster告知的待處理數(shù)據(jù)所在位置,從若干臺(tái)

maptask運(yùn)行所在機(jī)器上獲取到若干個(gè)maptask輸出結(jié)果文件,并在本地進(jìn)行重新歸并排序,

然后按照相同key的KV為一個(gè)組,調(diào)用客戶定義的reduce。方法進(jìn)行邏輯運(yùn)算。

11)Reducetask運(yùn)算完畢后,調(diào)用客戶指定的outputformat將結(jié)果數(shù)據(jù)輸出到外部存儲(chǔ)。

二MapReduce理論篇

2.1Writable序列化

序列化就是把內(nèi)存中的對象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)(持

久化)和網(wǎng)絡(luò)傳輸。

反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤的持久化數(shù)據(jù),轉(zhuǎn)換

成內(nèi)存中的對象。

Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable),一個(gè)對象被序列化后,會(huì)附帶

很多額外的信息(各種校驗(yàn)信息,header,繼承體系等),不便于在網(wǎng)絡(luò)中高效傳輸。所以,

hadoop自己開發(fā)了一套序列化機(jī)制(Writable),精簡、高效。

2.1.1常用數(shù)據(jù)序列化類型

常用的數(shù)據(jù)類型對應(yīng)的hadoop數(shù)據(jù)序列化類型

Java類型HadoopWritable類型

booleanBooleanWritable

byteByteWritable

intIntWritable

floatFloatWritable

longLongWritable

doubleDoubleWritable

stringText

mapMapWritable

arrayArrayWritable

2.1.2自定義bean對象實(shí)現(xiàn)序列化接口

1)自定義bean對象要想序列化傳輸,必須實(shí)現(xiàn)序列化接口,需要注意以下7項(xiàng)。

(1)必須實(shí)現(xiàn)Writable接口

(2)反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造

(3)重寫序列化方法

(4)重寫反序列化方法

(5)注意反序列化的順序和序列化的順序完全一致

(6)要想把結(jié)果顯示在文件中,需要重寫toString(),且用“\t“分開,方便后續(xù)用

(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因?yàn)?/p>

mapreduce框中的shuffle過程一定會(huì)對key進(jìn)行排序

//1必須實(shí)現(xiàn)Writable接口

publicclassFlowBeanimplementsWritable{

privatelongupFlow;

privatelongdownFlow;

privatelongsumFlow;

111反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有

publicFlowBean(){

super();

*3重寫序列化方法

*

*@paramout

*@throwslOException

*/

@Override

publicvoidwrite(DataOutputout)throwslOException{

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

*4重寫反序列化方法

5注意反序列化的順序和序列化的順序完全一致

*

*@paramin

*@throwslOException

*/

@Override

publicvoidreadFields(DataInputin)throwslOException{

upFlow=in.readLong();

downFlow=in.readLongO;

sumFlow=in.readLong();

)

〃6要想把結(jié)果顯示在文件中,需要重寫loString。,且用分開,方便后續(xù)用

@Override

publicStringtoStringO{

returnupFlow+n\t"+downFlow+"\t"+sumFlow;

//7如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因

為mapreduce框中的shuffle過程一定會(huì)對key進(jìn)行排序

@Override

publicintcompareTo(FlowBeano){

//倒序排列,從大到小

returnthis.sumFlow>o.getSumFlow()?-1:1;

2)案例實(shí)操

詳見3.2.1統(tǒng)計(jì)每一個(gè)手機(jī)號(hào)耗費(fèi)的總上行流量、下行流量、總流量(序列化)。

2.2InputFormat數(shù)據(jù)切片機(jī)制

2.2.1FilelnputFormat切片機(jī)制

1)job提交流程源碼詳解

waitForCompletion()

submit();

//1建立連接

connect();

//I)創(chuàng)建提交job的代理

newCluster(getConfiguration());

//(1)判斷是本地yarn還是遠(yuǎn)程

initialize(jobTrackAddr,conf);

//2提交job

submitter.submitJobInternal(Job.this,cluster)

//I)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑

PathjobStagingArea=JobSubmissionFiles.ge04g山gZ)ir(cluster,conf);

//2)獲取jobid,并創(chuàng)建job路徑

JoblDjobld=submitClient.getNewJobID();

//3)拷貝jar包到集群

copyAndConfigureFiles(job,submitJobDir);

rUploader.uploadFiles(job,jobSubmitDir);

//4)計(jì)算切片,生成切片規(guī)劃文件

writeSplits(job,submitJobDir);

maps=writeNewSplits(job,jobSubmitDir);

input.getSplits(job);

//5)向Stag路徑寫xml配置文件

writeConf(conf,submitJobFile);

conf.writeXml(out);

//6)提交job,返回提交狀態(tài)

status=submitClient.submitJob(jobId,submitJobDir.toStringO,job.getCredentials());

2)FilelnputFormat源碼解析(input.getSplits(job))

(1)找到你數(shù)據(jù)存儲(chǔ)的目錄。

(2)開始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件

(3)遍歷第一個(gè)文件ss.txt

a)獲取文件大小fs.sizeOf(ss.txt);

b)計(jì)算切片大小

computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M

c)默認(rèn)情況下,切片大小=blocksize

d)開始切,形成第1個(gè)切片:ss.txt—0:128M第2個(gè)切片ss.txt—128:256M第3

個(gè)切片ss.txt—256M:300M(每次切片時(shí),都要判斷切完剩下的部分是否大于塊的1.1倍,

不大于1.1倍就劃分一塊切片)

e)將切片信息寫到一個(gè)切片規(guī)劃文件中

f)整個(gè)切片的核心過程在getSplit()方法中完成。

g)數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進(jìn)行分片,并不會(huì)再磁盤上將其切分成分片

進(jìn)行存儲(chǔ)。InputSplit只記錄了分片的元數(shù)據(jù)信息,比如起始位置、長度以及所在的節(jié)

點(diǎn)列表等。

h)注意:block是HDFS上物理上存儲(chǔ)的存儲(chǔ)的數(shù)據(jù),切片是對數(shù)據(jù)邏輯上的劃分。

(4)提交切片規(guī)劃文件到y(tǒng)arn±,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)

算開啟maptask個(gè)數(shù)。

3)FilelnputFormat中默認(rèn)的切片機(jī)制:

(1)簡單地按照文件的內(nèi)容長度進(jìn)行切片

(2)切片大小,默認(rèn)等于block大小

(3)切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對每一個(gè)文件單獨(dú)切片

比如待處理數(shù)據(jù)有兩個(gè)文件:

filel.txt320M

file2,txt10M

經(jīng)過FilelnputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:

filel.txt.splitl-0?128

filel.txt.split2-128?256

filel.txt.split3—256?320

file2.txt.splitl—(MOM

4)FilelnputFormat切片大小的參數(shù)配置

(1)通過分析源碼,在FilelnputFormat中,計(jì)算切片大小的邏輯:Math.max(minSize,

Math.min(maxSize,blockSize));

切片主要由這幾個(gè)值來運(yùn)算決定

mapreduce.input.fileinputformat.split.minsize=l默認(rèn)值為I

mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue默認(rèn)值Long.MAXValue

因此,默認(rèn)情況下,切片大小=1)1(?:1(^26。

maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小,而且就等于

配置的這個(gè)參數(shù)的值。

minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還

大。

5)獲取切片信息API

〃根據(jù)文件類型獲取切片信息

FileSplitinputSplit=(FileSplit)context.getInputSplit();

//獲取切片的文件名稱

Stringname=inputSplit.getPath().getName();

2.2.2CombineTextlnputFormat切片機(jī)制

關(guān)于大量小文件的優(yōu)化策略

1)默認(rèn)情況下Textinputformat對任務(wù)的切片機(jī)制是按文件規(guī)劃切片,不管文件多小,都會(huì)

是一個(gè)單獨(dú)的切片,都會(huì)交給一個(gè)maptask,這樣如果有大量小文件,就會(huì)產(chǎn)生大量的

maplask,處理效率極其低下。

2)優(yōu)化策略

(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預(yù)處理/采集),將小文件先合并成大文

件,再上傳到HDFS做后續(xù)分析。

(2)補(bǔ)救措施:如果已經(jīng)是大量小文件在HDFS中了,可以使用另一種InpulFormat

來做切片(CombineTextlnputFormat),它的切片邏輯跟TextFilelnputFormat不同:它可以

將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣,多個(gè)小文件就可以交給一個(gè)maptask。

(3)優(yōu)先滿足最小切片大小,不超過最大切片大小

CombineTextInputFormat.se/Mar/叩w/Sp/〃Size(job,4194304);//4m

CombineTextlnputFormat.2097152);//2m

舉例:0.5m+1m+0.3m+5m=2m+4.8m=2m+4m+0.8m

3)具體實(shí)現(xiàn)步驟

//9如果不設(shè)置InputFormat,它默認(rèn)用的是TextlnputFormat.class

job.setlnputFormatClass(CombineTextlnputFormat.class)

CombineTextlnpulFormaLsefMax/叩”rSp〃rSize(job,4194304);//4m

CombineTextlnputFormat.se/Mi”/叩"/Sp/"Size(job,2097152);//2m

4)案例實(shí)操

詳見3.1.4需求4:大量小文件的切片優(yōu)化(CombineTexlInpulFormal)。

2.2.3自定義InputFormat

1)概述

(1)自定義一個(gè)InputFormat

(2)改寫RecordReader,實(shí)現(xiàn)一次讀取一個(gè)完整文件封裝為KV

(3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件

2)案例實(shí)操

詳見3.5小文件處理(自定義InputFormat)。

2.3M叩Task工作機(jī)制

i)問題引出

maptask的并行度決定map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)job的處理速度。

那么,mapTask并行任務(wù)是否越多越好呢?

2)MapTask并行度決定機(jī)制

一個(gè)job的map階段MapTask并行度(個(gè)數(shù)),由客戶端提交job時(shí)的切片個(gè)數(shù)決定。

3)MapTask工作機(jī)制

(1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出

一個(gè)個(gè)key/value。

(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并

產(chǎn)生一系列新的key/value?

(3)Collect階段:在用戶編寫map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用

OutputCollector.collect。輸出結(jié)果。在該函數(shù)內(nèi)部,它會(huì)將生成的key/value分區(qū)(調(diào)用

Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。

(4)Spill階段:即“溢寫”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上,

生成一個(gè)臨時(shí)文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進(jìn)行一次本地排

序,并在必要時(shí)對數(shù)據(jù)進(jìn)行合并、壓縮等操作。

溢寫階段詳情:

步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號(hào)

partition進(jìn)行排序,然后按照key進(jìn)行排序。這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在

一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。

步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文

件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了Combiner,則寫入文件之

前,對每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。

步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpiDRecord中,其中每個(gè)分區(qū)的元

信息包括在臨時(shí)文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)期內(nèi)存索引大

小超過1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中。

(5)Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對所有臨時(shí)文件進(jìn)行一次合并,

以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件。

當(dāng)所有數(shù)據(jù)處理完后,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件

output/file.out中,同時(shí)生成相應(yīng)的索引文件output/file.out.index。

在進(jìn)行文件合并過程中,MapTask以分區(qū)為單位進(jìn)行合并。對于某個(gè)分區(qū),它將采用多

輪遞歸合并的方式。每輪合并io.sort.factor(默認(rèn)100)個(gè)文件,并將產(chǎn)生的文件重新加入

待合并列表中,對文件排序后,重復(fù)以上過程,直到最終得到一個(gè)大文件。

讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開大量文件和同時(shí)讀取大量

小文件產(chǎn)生的隨機(jī)讀取帶來的開銷。

2.4Shuffle機(jī)制

2.4.1Shuffle機(jī)制

M叩reduce確保每個(gè)reducer的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過程(即將map

輸出作為輸入傳給reducer)稱為shuffleo

Copy"Sort"Reduce

phasephasephase

maptask端嚅reducetask

spilltodisk幽m...........?>1卜”A..

bufferinmerge1------------1

memoryi_________________________)口▼4n

■-?!|AllII『II1/merge位

/_n__u

s■'▼蹦tk

mergeJr?-----------1output

partitiom?/y

/mixtunofin-memoryandon-diskdata

Othermaps**..............Otherreduces

2.4.2MapReduce工作流程

1)流程示意圖

?向硅谷

ss.txt0128MapReduce詳細(xì)工作流程(一)

5默認(rèn)TextlnputFormat,__Maptask

7環(huán)形緩沖區(qū)默認(rèn)100M

RecorderReaderInputFormat>K,v

......<a,lxc.

K,vMapper!___

readerO6邏輯運(yùn)

map(K,v)溢出到文件(分區(qū)且區(qū)內(nèi)有序)

spiller9

待處理文本Context.write(k,v)

1Merwe底并排序

/user/inputHashPartitioner分區(qū)<a,lxc,X><b,l><b,l>

o」xa,lxc4xe,l>

ss.txtoutputcollector

Key.compareT。排序

200m

Combiner介并<a,lxc.l>va,2xGlxe,l><b,2xd,lxf,l>

2客戶端submit。前,獲取partitionOpartitionl

8分區(qū)、排序11合并

待處理數(shù)據(jù)的信息,然后ss.txt128-200

根據(jù)參數(shù)配置,形成一個(gè)

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論