版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)
詳細(xì)介紹
目錄
一MapReduce概念..............................................................4
1.1為什么要MapReduce...........................................................................................................4
1.2MapReduce核心思想.....................................................5
1.3MapReduce進(jìn)程.........................................................6
1.4MapReduce編程規(guī)范(八股文)...........................................6
1.5MapReduce程序運(yùn)行流程分析.............................................7
二MapReduce理論篇............................................................8
2.1Writable序歹ij化..........................................................8
2.1.1常用數(shù)據(jù)序列化類型................................................8
2.1.2自定義bean對象實(shí)現(xiàn)序列化接口.....................................8
2.2InputFormat數(shù)據(jù)切片機(jī)制...............................................10
2.2.1FilelnputFormat切片機(jī)制...........................................10
2.2.2CombineTextlnputFormat切片機(jī)制...................................13
2.2.3自定義InputFormat.................................................................................................13
2.3MapTask工作機(jī)制.......................................................14
2.4Shuffle機(jī)制.............................................................15
2.4.1Shuffle機(jī)制.......................................................15
2.4.2MapReduce工作流程..............................................15
2.4.3partition分區(qū)......................................................17
2.4.4排序..............................................................18
2.4.5Groupingcomparator分組...........................................20
2.4.6Combiner合并.....................................................20
2.4.7數(shù)據(jù)傾斜&Distributedcache....................................................................................21
2.5ReduceTask工作機(jī)制.....................................................21
2.6自定義OutputFormat.........................................................................................................22
2.7MapReduce數(shù)據(jù)壓縮....................................................23
2.7.1概述..............................................................23
2.7.2MR支持的壓縮編碼...............................................23
2.7.3采用壓縮的位置...................................................24
2.7.4壓縮配置參數(shù).....................................................25
2.8計(jì)數(shù)器應(yīng)用.............................................................26
2.9數(shù)據(jù)清洗...............................................................26
2.10MapReduce與Yarn...........................................................................................................26
2.10.1Yarn概述........................................................26
2.10.2Yarn的重要概念..................................................26
2.10.3Yarn工作機(jī)制....................................................27
2.11作業(yè)提交全過程........................................................28
2.12MapReduce開發(fā)總結(jié)...................................................30
2.13MapReduce參數(shù)優(yōu)化...................................................31
2.13.1資源相關(guān)參數(shù)....................................................31
2.13.2容錯(cuò)相關(guān)參數(shù)....................................................32
三MapReduce實(shí)戰(zhàn)篇............................................................33
3.1WordCount案例.........................................................33
3.1.1需求1:統(tǒng)計(jì)一堆文件中單詞出現(xiàn)的個(gè)數(shù)(WordCount案例)..........33
3.1.2需求2:把單詞按照ASCII碼奇偶分區(qū)(Partitioner)....................................37
3.1.3需求3:對每一個(gè)maptask的輸出局部匯總(Combiner)............................38
3.1.4需求4:大量小文件的切片優(yōu)化(CombineTextlnputFormat)......................40
3.2流量匯總程序案例.......................................................41
3.2.1需求1:統(tǒng)計(jì)手機(jī)號(hào)耗費(fèi)的總上行流量、下行流量、總流量(序列化)....41
3.2.2需求2:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(Partitioner)
46
3.2.3需求3:將統(tǒng)計(jì)結(jié)果按照總流量倒序排序(排序)....................49
3.3求每個(gè)訂單中最貴的商品(Groupingcomparator)....................................................54
3.4MapReduce中多表合并案例..............................................59
3.4.1需求1:reduce端表合并(數(shù)據(jù)傾斜)...............................60
3.4.2需求2:map端表合并(Distributedcache).....................................................66
3.5小文件處理(自定義InputFormat)...............................................................................69
3.6過濾日志及自定義日志輸出路徑(自定義OutputFormat)......................................74
3.7日志清洗(數(shù)據(jù)清洗)...................................................78
3.7.1簡單解析版.......................................................78
3.7.2復(fù)雜解析版.......................................................81
3.8倒排索引實(shí)戰(zhàn)(多job串聯(lián)).............................................86
3.9找微信共同好友分析實(shí)戰(zhàn)................................................91
3.10壓縮/解壓縮...........................................................98
3.10.1對數(shù)據(jù)流的壓縮和解壓縮..........................................98
3.10.2在M叩輸出端采用壓縮..........................................100
3.10.3在Reduce輸出端采用壓縮........................................102
四、常見錯(cuò)誤...................................................................104
一MapReduce概念
Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于hado叩的數(shù)據(jù)分析
應(yīng)用”的核心框架;
Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的
分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)hadoop集群上。
1.1為什么要M叩Reduce
1)海量數(shù)據(jù)在單機(jī)上處理因?yàn)橛布Y源限制,無法勝任
2)而一旦將單機(jī)版程序擴(kuò)展到集群來分布式運(yùn)行,將極大增加程序的復(fù)雜度和開發(fā)難
度
3)引入mapreduce框架后,開發(fā)人員可以將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上,
而將分布式計(jì)算中的復(fù)雜性交由框架來處理。
4)nuipreduce分布式方案考慮的問題
(1)運(yùn)算邏輯要不要先分后合?
(2)程序如何分配運(yùn)算任務(wù)(切片)?
(3)兩階段的程序如何啟動(dòng)?如何協(xié)調(diào)?
(4)整個(gè)程序運(yùn)行過程中的監(jiān)控?容錯(cuò)?重試?
分布式方案需要考慮很多問題,但是我們可以將分布式程序中的公共功能封裝成框架,
讓開發(fā)人員將精力集中于業(yè)務(wù)邏輯上。而m叩reduce就是這樣一個(gè)分布式程序的通用框架。
1.2MapReduce核心思想
上圖簡單的闡明了map和reduce的兩個(gè)過程或者作用,雖然不夠嚴(yán)謹(jǐn),但是足以提供
一個(gè)大概的認(rèn)知,map過程是?個(gè)蔬菜到制成食物前的準(zhǔn)備工作,reduce將準(zhǔn)備好的材料合
并進(jìn)而制作出食物的過程
向硅谷
ujuiuj.atguigu.com
MapReduce核心編程思想
需求:統(tǒng)計(jì)其中每Map和Reduce階段
一個(gè)單詞出現(xiàn)的總1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段
1)讀數(shù)據(jù)
次數(shù)(查詢結(jié)果:a-2)第?個(gè)階段的maptask并發(fā)實(shí)例,完全并行運(yùn)行,
)按行處理
P一個(gè)文件,q-z-2互不相干
個(gè)文件)maptask"心
4)Hashmap<單向.value+1)3)第二個(gè)階段的reducetask并發(fā)實(shí)例互不相干,但是
他們的數(shù)據(jù)依穎于個(gè)階段的所仃并發(fā)實(shí)
Hadoopspark等到分給白己的數(shù)甥片全曲:讀完之后I.maptask
格按照首個(gè)字母范例的輸出
hive5)hashmap
、用分成2個(gè)hashmap
Hbase4)MapReduce編程模型只能包含一個(gè)map階段和一個(gè)
Hadoopreduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只
能多個(gè)程序,中行運(yùn)行
sparkmapreduce
統(tǒng)計(jì)a-p開頭的單詞
maptask
輸出數(shù)據(jù)
輸出結(jié)果若干問題細(xì)節(jié)
Oreducetask;>
Hashmap(a-p)一一一到文件1)maptask如何進(jìn)行任務(wù)分配
Hashmap(q-z)2)Reducetask如何進(jìn)行任務(wù)分配
3)MaptaskfllReducetask之間如何銜接
統(tǒng)計(jì)中z開頭的單詞4)如果某maptask運(yùn)行失敗,如何處理
maptask
5)Maptask如果都要自己負(fù)費(fèi)輸出數(shù)
輸出結(jié)果據(jù)的分區(qū),很麻煩
reducetask
輸入數(shù)據(jù)到文件
Hashmab(q-z).
1MrAppMaster負(fù)責(zé)整個(gè)程序的過程調(diào)度邈燙媽
1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段
2)第一個(gè)階段的maptask并發(fā)實(shí)例,完全并行運(yùn)行,互不相干
3)第二個(gè)階段的reducetask并發(fā)實(shí)例互不相干,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有
maptask并發(fā)實(shí)例的輸出
4)MapReduce編程模型只能包含一個(gè)map階段和一個(gè)reduce階段,如果用戶的業(yè)務(wù)邏輯非
常復(fù)雜,那就只能多個(gè)mapreduce程序,串行運(yùn)行
1.3MapReduce進(jìn)程
一個(gè)完整的mapreduce程序在分布式運(yùn)行時(shí)有三類實(shí)例進(jìn)程:
1)MrAppMaster:負(fù)責(zé)整個(gè)程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
2)MapTask:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程
3)ReduceTask:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程
1.4MapReduce編程規(guī)范(八股文)
用戶編寫的程序分成三個(gè)部分:Nipper,Reducer,Driver(提交運(yùn)行mr程序的客戶端)
1)Mapper階段
(1)用戶自定義的Mapper要繼承自己的父類
(2)Mapper的輸入數(shù)據(jù)是KV對的形式(KV的類型可自定義)
(3)Mapper中的業(yè)務(wù)邏輯寫在map()方法中
(4)Mapper的輸出數(shù)據(jù)是KV對的形式(KV的類型可自定義)
(5)map()方法(maptask進(jìn)程)對每一個(gè)<1<、>調(diào)用一次
2)Reducer階段
(1)用戶自定義的Reducer要繼承自己的父類
(2)Reducer的輸入數(shù)據(jù)類型對應(yīng)Mapper的輸出數(shù)據(jù)類型,也是KV
(3)Reducer的業(yè)務(wù)邏輯寫在reduce。方法中
(4)Reducetask進(jìn)程對每一組相同k的<k,v>組調(diào)用一次reduce。方法
3)Driver階段
整個(gè)程序需要一個(gè)Drvier來進(jìn)行提交,提交的是一個(gè)描述了各種必要信息的job對象
4)案例實(shí)操
詳見3.1.1統(tǒng)ii?堆文件中單詞出現(xiàn)的個(gè)數(shù)(WordCount案例)。
1.5MapReduce程序運(yùn)行流程分析
他尚理看
M叩Reduce程序運(yùn)行流程分析
1待處理文本s讀取數(shù)據(jù)「Maptaskss.tx,0-128一7收柒kv到媛存
/user/atguigu/inputInputFormat------?K,voutputcollector
8按照k分區(qū)排
atguiguss.txt序后寫入磁盤
bigdata200mwordcount
Mapper二理a-g
邏輯其克
Javaphp6reducetask11輸出結(jié)果到文件
map(K,v)oteui[u,l><Hbsse,l><$pak.l>
Android<btgdita,l><hivt1>...
Context.write(k,v)wordcountReduce{OutputFormat
Html5
reduce(k,v)
Bigdatabb.txt
Context.wrlte(k,v);
python100mPart-r-00000
Maptaskss.txt128-200}
2客戶斯submit。前,獲取reducetask
行處理數(shù)據(jù)的信息,然后
OutputFormat、
根據(jù)參數(shù)配置,形成一個(gè)wordcountReducef
他務(wù)分配的規(guī)劃.reduce(k,v)
Maptaskbb.txt0-100
10reducet>sk^Context.write{k,v);
ss.txt0-128Part-r-00001
取數(shù)據(jù),月運(yùn)算}
ss.txt128-2003提交切片信息
bb.txtJob.split
wc.jar4計(jì)算出maptask數(shù)星reducetask
Job.xml
9所有maptask任務(wù)完wordcountReducefOutputFormat
Mrappmaster成后,啟動(dòng)相應(yīng)數(shù)量的reduce(k,v)
reducetask,并告知Contextwrite(k,v);
Part-r-00002
NodeManagerreducetask處理數(shù)抿范)
圍(數(shù)據(jù)分區(qū))
1)在MapReduce程序讀取文件的輸入目錄上存放相應(yīng)的文件。
2)客戶端程序在submit。方法執(zhí)行前,獲取待處理的數(shù)據(jù)信息,然后根據(jù)集群中參數(shù)的配
置形成一個(gè)任務(wù)分配規(guī)劃。
3)客戶端提交job.split、jar包、job.xml等文件給yarn,yarn中的resourcemanager啟動(dòng)
MRAppMaster?
4)MRAppMaster啟動(dòng)后根據(jù)本次job的描述信息,計(jì)算出需要的maplask實(shí)例數(shù)量,然后
向集群申請機(jī)器啟動(dòng)相應(yīng)數(shù)量的maptask進(jìn)程。
5)maptask利用客戶指定的inputformat來讀取數(shù)據(jù),形成輸入KV對。
6)maptask將輸入KV對傳遞給客戶定義的map()方法,做邏輯運(yùn)算
7)map()運(yùn)算完畢后將KV對收集到maptask緩存。
8)maptask緩存中的KV對按照K分區(qū)排序后不斷寫到磁盤文件
9)MRAppMaster監(jiān)控到所有maptask進(jìn)程任務(wù)完成之后,會(huì)根據(jù)客戶指定的參數(shù)啟動(dòng)相應(yīng)
數(shù)量的reducetask進(jìn)程,并告知reducetask進(jìn)程要處理的數(shù)據(jù)分區(qū)。
10)Reducetask進(jìn)程啟動(dòng)之后,根據(jù)MRAppMaster告知的待處理數(shù)據(jù)所在位置,從若干臺(tái)
maptask運(yùn)行所在機(jī)器上獲取到若干個(gè)maptask輸出結(jié)果文件,并在本地進(jìn)行重新歸并排序,
然后按照相同key的KV為一個(gè)組,調(diào)用客戶定義的reduce。方法進(jìn)行邏輯運(yùn)算。
11)Reducetask運(yùn)算完畢后,調(diào)用客戶指定的outputformat將結(jié)果數(shù)據(jù)輸出到外部存儲(chǔ)。
二MapReduce理論篇
2.1Writable序列化
序列化就是把內(nèi)存中的對象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)(持
久化)和網(wǎng)絡(luò)傳輸。
反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤的持久化數(shù)據(jù),轉(zhuǎn)換
成內(nèi)存中的對象。
Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable),一個(gè)對象被序列化后,會(huì)附帶
很多額外的信息(各種校驗(yàn)信息,header,繼承體系等),不便于在網(wǎng)絡(luò)中高效傳輸。所以,
hadoop自己開發(fā)了一套序列化機(jī)制(Writable),精簡、高效。
2.1.1常用數(shù)據(jù)序列化類型
常用的數(shù)據(jù)類型對應(yīng)的hadoop數(shù)據(jù)序列化類型
Java類型HadoopWritable類型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
stringText
mapMapWritable
arrayArrayWritable
2.1.2自定義bean對象實(shí)現(xiàn)序列化接口
1)自定義bean對象要想序列化傳輸,必須實(shí)現(xiàn)序列化接口,需要注意以下7項(xiàng)。
(1)必須實(shí)現(xiàn)Writable接口
(2)反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造
(3)重寫序列化方法
(4)重寫反序列化方法
(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結(jié)果顯示在文件中,需要重寫toString(),且用“\t“分開,方便后續(xù)用
(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因?yàn)?/p>
mapreduce框中的shuffle過程一定會(huì)對key進(jìn)行排序
//1必須實(shí)現(xiàn)Writable接口
publicclassFlowBeanimplementsWritable{
privatelongupFlow;
privatelongdownFlow;
privatelongsumFlow;
111反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有
publicFlowBean(){
super();
*3重寫序列化方法
*
*@paramout
*@throwslOException
*/
@Override
publicvoidwrite(DataOutputout)throwslOException{
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
*4重寫反序列化方法
5注意反序列化的順序和序列化的順序完全一致
*
*@paramin
*@throwslOException
*/
@Override
publicvoidreadFields(DataInputin)throwslOException{
upFlow=in.readLong();
downFlow=in.readLongO;
sumFlow=in.readLong();
)
〃6要想把結(jié)果顯示在文件中,需要重寫loString。,且用分開,方便后續(xù)用
@Override
publicStringtoStringO{
returnupFlow+n\t"+downFlow+"\t"+sumFlow;
//7如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因
為mapreduce框中的shuffle過程一定會(huì)對key進(jìn)行排序
@Override
publicintcompareTo(FlowBeano){
//倒序排列,從大到小
returnthis.sumFlow>o.getSumFlow()?-1:1;
2)案例實(shí)操
詳見3.2.1統(tǒng)計(jì)每一個(gè)手機(jī)號(hào)耗費(fèi)的總上行流量、下行流量、總流量(序列化)。
2.2InputFormat數(shù)據(jù)切片機(jī)制
2.2.1FilelnputFormat切片機(jī)制
1)job提交流程源碼詳解
waitForCompletion()
submit();
//1建立連接
connect();
//I)創(chuàng)建提交job的代理
newCluster(getConfiguration());
//(1)判斷是本地yarn還是遠(yuǎn)程
initialize(jobTrackAddr,conf);
//2提交job
submitter.submitJobInternal(Job.this,cluster)
//I)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
PathjobStagingArea=JobSubmissionFiles.ge04g山gZ)ir(cluster,conf);
//2)獲取jobid,并創(chuàng)建job路徑
JoblDjobld=submitClient.getNewJobID();
//3)拷貝jar包到集群
copyAndConfigureFiles(job,submitJobDir);
rUploader.uploadFiles(job,jobSubmitDir);
//4)計(jì)算切片,生成切片規(guī)劃文件
writeSplits(job,submitJobDir);
maps=writeNewSplits(job,jobSubmitDir);
input.getSplits(job);
//5)向Stag路徑寫xml配置文件
writeConf(conf,submitJobFile);
conf.writeXml(out);
//6)提交job,返回提交狀態(tài)
status=submitClient.submitJob(jobId,submitJobDir.toStringO,job.getCredentials());
2)FilelnputFormat源碼解析(input.getSplits(job))
(1)找到你數(shù)據(jù)存儲(chǔ)的目錄。
(2)開始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件
(3)遍歷第一個(gè)文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計(jì)算切片大小
computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M
c)默認(rèn)情況下,切片大小=blocksize
d)開始切,形成第1個(gè)切片:ss.txt—0:128M第2個(gè)切片ss.txt—128:256M第3
個(gè)切片ss.txt—256M:300M(每次切片時(shí),都要判斷切完剩下的部分是否大于塊的1.1倍,
不大于1.1倍就劃分一塊切片)
e)將切片信息寫到一個(gè)切片規(guī)劃文件中
f)整個(gè)切片的核心過程在getSplit()方法中完成。
g)數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進(jìn)行分片,并不會(huì)再磁盤上將其切分成分片
進(jìn)行存儲(chǔ)。InputSplit只記錄了分片的元數(shù)據(jù)信息,比如起始位置、長度以及所在的節(jié)
點(diǎn)列表等。
h)注意:block是HDFS上物理上存儲(chǔ)的存儲(chǔ)的數(shù)據(jù),切片是對數(shù)據(jù)邏輯上的劃分。
(4)提交切片規(guī)劃文件到y(tǒng)arn±,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)
算開啟maptask個(gè)數(shù)。
3)FilelnputFormat中默認(rèn)的切片機(jī)制:
(1)簡單地按照文件的內(nèi)容長度進(jìn)行切片
(2)切片大小,默認(rèn)等于block大小
(3)切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對每一個(gè)文件單獨(dú)切片
比如待處理數(shù)據(jù)有兩個(gè)文件:
filel.txt320M
file2,txt10M
經(jīng)過FilelnputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:
filel.txt.splitl-0?128
filel.txt.split2-128?256
filel.txt.split3—256?320
file2.txt.splitl—(MOM
4)FilelnputFormat切片大小的參數(shù)配置
(1)通過分析源碼,在FilelnputFormat中,計(jì)算切片大小的邏輯:Math.max(minSize,
Math.min(maxSize,blockSize));
切片主要由這幾個(gè)值來運(yùn)算決定
mapreduce.input.fileinputformat.split.minsize=l默認(rèn)值為I
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue默認(rèn)值Long.MAXValue
因此,默認(rèn)情況下,切片大小=1)1(?:1(^26。
maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小,而且就等于
配置的這個(gè)參數(shù)的值。
minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還
大。
5)獲取切片信息API
〃根據(jù)文件類型獲取切片信息
FileSplitinputSplit=(FileSplit)context.getInputSplit();
//獲取切片的文件名稱
Stringname=inputSplit.getPath().getName();
2.2.2CombineTextlnputFormat切片機(jī)制
關(guān)于大量小文件的優(yōu)化策略
1)默認(rèn)情況下Textinputformat對任務(wù)的切片機(jī)制是按文件規(guī)劃切片,不管文件多小,都會(huì)
是一個(gè)單獨(dú)的切片,都會(huì)交給一個(gè)maptask,這樣如果有大量小文件,就會(huì)產(chǎn)生大量的
maplask,處理效率極其低下。
2)優(yōu)化策略
(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預(yù)處理/采集),將小文件先合并成大文
件,再上傳到HDFS做后續(xù)分析。
(2)補(bǔ)救措施:如果已經(jīng)是大量小文件在HDFS中了,可以使用另一種InpulFormat
來做切片(CombineTextlnputFormat),它的切片邏輯跟TextFilelnputFormat不同:它可以
將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣,多個(gè)小文件就可以交給一個(gè)maptask。
(3)優(yōu)先滿足最小切片大小,不超過最大切片大小
CombineTextInputFormat.se/Mar/叩w/Sp/〃Size(job,4194304);//4m
CombineTextlnputFormat.2097152);//2m
舉例:0.5m+1m+0.3m+5m=2m+4.8m=2m+4m+0.8m
3)具體實(shí)現(xiàn)步驟
//9如果不設(shè)置InputFormat,它默認(rèn)用的是TextlnputFormat.class
job.setlnputFormatClass(CombineTextlnputFormat.class)
CombineTextlnpulFormaLsefMax/叩”rSp〃rSize(job,4194304);//4m
CombineTextlnputFormat.se/Mi”/叩"/Sp/"Size(job,2097152);//2m
4)案例實(shí)操
詳見3.1.4需求4:大量小文件的切片優(yōu)化(CombineTexlInpulFormal)。
2.2.3自定義InputFormat
1)概述
(1)自定義一個(gè)InputFormat
(2)改寫RecordReader,實(shí)現(xiàn)一次讀取一個(gè)完整文件封裝為KV
(3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件
2)案例實(shí)操
詳見3.5小文件處理(自定義InputFormat)。
2.3M叩Task工作機(jī)制
i)問題引出
maptask的并行度決定map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)job的處理速度。
那么,mapTask并行任務(wù)是否越多越好呢?
2)MapTask并行度決定機(jī)制
一個(gè)job的map階段MapTask并行度(個(gè)數(shù)),由客戶端提交job時(shí)的切片個(gè)數(shù)決定。
3)MapTask工作機(jī)制
(1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出
一個(gè)個(gè)key/value。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并
產(chǎn)生一系列新的key/value?
(3)Collect階段:在用戶編寫map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用
OutputCollector.collect。輸出結(jié)果。在該函數(shù)內(nèi)部,它會(huì)將生成的key/value分區(qū)(調(diào)用
Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
(4)Spill階段:即“溢寫”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上,
生成一個(gè)臨時(shí)文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進(jìn)行一次本地排
序,并在必要時(shí)對數(shù)據(jù)進(jìn)行合并、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號(hào)
partition進(jìn)行排序,然后按照key進(jìn)行排序。這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在
一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文
件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了Combiner,則寫入文件之
前,對每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpiDRecord中,其中每個(gè)分區(qū)的元
信息包括在臨時(shí)文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)期內(nèi)存索引大
小超過1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中。
(5)Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對所有臨時(shí)文件進(jìn)行一次合并,
以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件。
當(dāng)所有數(shù)據(jù)處理完后,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件
output/file.out中,同時(shí)生成相應(yīng)的索引文件output/file.out.index。
在進(jìn)行文件合并過程中,MapTask以分區(qū)為單位進(jìn)行合并。對于某個(gè)分區(qū),它將采用多
輪遞歸合并的方式。每輪合并io.sort.factor(默認(rèn)100)個(gè)文件,并將產(chǎn)生的文件重新加入
待合并列表中,對文件排序后,重復(fù)以上過程,直到最終得到一個(gè)大文件。
讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開大量文件和同時(shí)讀取大量
小文件產(chǎn)生的隨機(jī)讀取帶來的開銷。
2.4Shuffle機(jī)制
2.4.1Shuffle機(jī)制
M叩reduce確保每個(gè)reducer的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過程(即將map
輸出作為輸入傳給reducer)稱為shuffleo
Copy"Sort"Reduce
phasephasephase
maptask端嚅reducetask
spilltodisk幽m...........?>1卜”A..
bufferinmerge1------------1
memoryi_________________________)口▼4n
■-?!|AllII『II1/merge位
/_n__u
s■'▼蹦tk
mergeJr?-----------1output
partitiom?/y
/mixtunofin-memoryandon-diskdata
Othermaps**..............Otherreduces
2.4.2MapReduce工作流程
1)流程示意圖
?向硅谷
ss.txt0128MapReduce詳細(xì)工作流程(一)
5默認(rèn)TextlnputFormat,__Maptask
7環(huán)形緩沖區(qū)默認(rèn)100M
RecorderReaderInputFormat>K,v
......<a,lxc.
K,vMapper!___
readerO6邏輯運(yùn)
map(K,v)溢出到文件(分區(qū)且區(qū)內(nèi)有序)
spiller9
待處理文本Context.write(k,v)
1Merwe底并排序
/user/inputHashPartitioner分區(qū)<a,lxc,X><b,l><b,l>
o」xa,lxc4xe,l>
ss.txtoutputcollector
Key.compareT。排序
200m
Combiner介并<a,lxc.l>va,2xGlxe,l><b,2xd,lxf,l>
2客戶端submit。前,獲取partitionOpartitionl
8分區(qū)、排序11合并
待處理數(shù)據(jù)的信息,然后ss.txt128-200
根據(jù)參數(shù)配置,形成一個(gè)
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025-2030年中國隔熱涂料行業(yè)發(fā)展動(dòng)態(tài)及前景趨勢分析報(bào)告
- 2025-2030年中國鋁冶煉行業(yè)現(xiàn)狀調(diào)研及投資發(fā)展?jié)摿Ψ治鰣?bào)告
- 2025-2030年中國鐵路用鋼行業(yè)發(fā)展現(xiàn)狀及前景趨勢分析報(bào)告
- 2025-2030年中國輻射加工產(chǎn)業(yè)發(fā)展現(xiàn)狀規(guī)劃研究報(bào)告
- 2024年紡織品出口加工與貿(mào)易合同
- 2025年度梅州房產(chǎn)買賣合同(含室內(nèi)空氣質(zhì)量保證)3篇
- 2024水路貨物運(yùn)輸合同(示范文本GF)-水路貨物運(yùn)輸風(fēng)險(xiǎn)管理合作協(xié)議3篇
- 2025年度環(huán)保技術(shù)三方共同投資股份合同書3篇
- 2024食品公司食品安全風(fēng)險(xiǎn)評(píng)估合同范本3篇
- 2024版合同范本房屋委托出租合同范本
- 無人機(jī)航拍技術(shù)教案(完整版)
- 人教PEP版(2024)三年級(jí)上冊英語Unit 4《Plants around us》單元作業(yè)設(shè)計(jì)
- 《保密法》培訓(xùn)課件
- 醫(yī)院項(xiàng)目竣工驗(yàn)收和工程收尾階段的管理措施專項(xiàng)方案
- 2024年涉密人員考試試題庫保密基本知識(shí)試題附答案(考試直接用)
- 2024年桂林中考物理試卷
- DL∕T 5362-2018 水工瀝青混凝土試驗(yàn)規(guī)程
- (正式版)JC∕T 60023-2024 石膏條板應(yīng)用技術(shù)規(guī)程
- DL-T5054-2016火力發(fā)電廠汽水管道設(shè)計(jì)規(guī)范
- (權(quán)變)領(lǐng)導(dǎo)行為理論
- 家用電器可靠性與壽命預(yù)測研究
評(píng)論
0/150
提交評(píng)論