尚硅谷大數(shù)據(jù)技術之_第1頁
尚硅谷大數(shù)據(jù)技術之_第2頁
尚硅谷大數(shù)據(jù)技術之_第3頁
尚硅谷大數(shù)據(jù)技術之_第4頁
尚硅谷大數(shù)據(jù)技術之_第5頁
已閱讀5頁,還剩97頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)尚硅谷大數(shù)據(jù)技術之Hadoop(MapReduce)(作者:大海哥)官網(wǎng):版本:V1.1 MapReduce 概念Mapreduce 是一個分布式運算程序的編程框架,是用戶開發(fā)“基于 hadoop 的數(shù)據(jù)分析應用”的框架;Mapreduce功能是將用戶編寫的業(yè)務邏輯代碼和自帶默認組件整一個完整的分布式運算程序,并發(fā)運行在一個 hadoop 集群上。1.1 為什么要 MapReduce1)海量數(shù)據(jù)在單機上處理因為硬件限制,無法勝任2)而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發(fā)難度3)引入 mapreduce 框架后

2、,開發(fā)可以將絕大部分工作集中在業(yè)務邏輯的開發(fā)上,而將分布式計算中的復雜由框架來處理。4)mapreduce 分布式方案考慮的(1)運算邏輯要不要先分后合?(2)程序如何分配運算任務(切片)?(3)兩階段的程序如何啟動?如何協(xié)調?(4)整個程序運行過程中的?容錯?重試?分布式方案需要考慮很多,但是我們可以將分布式的公共功能封裝成框架,讓開發(fā)將精力集中于業(yè)務邏輯上。而 mapreduce 就是這樣一個分布式程序的通用框架。Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)1.2 MapReduce思想上

3、圖簡單的闡明了 map 和 reduce 的兩個過程或者作用,雖然不夠嚴謹,但是足以提供一個大概的認知,map 過程是一個蔬菜到制成前的準備工作,reduce 將準備好的材料合并進而制作出的過程MapReduceMap和Reduce階段編程思想需求:統(tǒng)計其中每一個單詞出現(xiàn)的總1) 分布式的運算程序往往需要分成至少2個階段2) 第一個階段的maptask并發(fā)實例,完全并行運行,互不相干3) 第二個階段的reduce task并發(fā)實例互不相干,但是他們的數(shù)據(jù)依賴于上一個階段的所有maptask并發(fā)實例的輸出4) MapReduce編程模型只能包含一個map階段和一個reduce階段,如果用戶的業(yè)務

4、邏輯非常復雜,那就只能多個mapreduce程序,串行運行次數(shù)(結果:a-p一個文件,q-z一個文件)輸出數(shù)據(jù)若干細節(jié)1)maptask如何進行任務分配2)Reduce task 如何進行任務分配3)Maptask和Reduce task之間如何銜接4)如果某maptask運行失敗,如何處理5)Maptask如果都要據(jù)的分區(qū),很麻煩負責輸出數(shù)輸入數(shù)據(jù)MrAppMaster負責整個程序的過程調度及狀態(tài)協(xié)調1)分布式的運算程序往往需要分成至少 2 個階段2)第一個階段的 maptask 并發(fā)實例,完全并行運行,互不相干3)第二個階段的 reduce task 并發(fā)實例互不相干,但是他們的數(shù)據(jù)依賴于

5、上一個階段的所有Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官輸出結果到文件maptaskHashmap(a-p) Hashmap(q-z)統(tǒng)計q-z開頭的單詞3reducetask輸出結果到文件統(tǒng)計a-p開頭的單詞reducetaskmaptaskHashmap(a-p) Hashmap(q-z)2Hadoop spark hive Hbase Hadoop sparkJava php Android Html5 Bigdata python1) 讀數(shù)據(jù)2) 按行處理maptask 3)按空格切分行內單詞4)Hashmap(單詞,value+1)

6、等到分給的數(shù)據(jù)片全部讀完之后5)將hashmap按照首個字母范圍分成2個hashmap1尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)maptask 并發(fā)實例的輸出4)MapReduce 編程模型只能包含一個 map 階段和一個 reduce 階段,如果用戶的業(yè)務邏輯非常復雜,那就只能多個 mapreduce 程序,串行運行1.3 MapReduce 進程一個完整的 mapreduce 程序在分布式運行時有三類實例進程:1)MrAppMaster:負責整個程序的過程調度及狀態(tài)協(xié)調2)MapTask:負責 map 階段的整個數(shù)據(jù)處理流程3)ReduceTask:負責 reduce 階段的

7、整個數(shù)據(jù)處理流程1.4 MapReduce 編程規(guī)范(八股文)成三個部分:Mapper,Reducer,Driver(提交運行 mr 程序的客戶端)用戶編寫的1)Mapper 階段(1)用戶自定義的 Mapper 要繼承的父類(2)Mapper 的輸入數(shù)據(jù)是 KV 對的形式(KV 的類型可自定義)(3)Mapper 中的業(yè)務邏輯寫在 map()中(4)Mapper 的輸出數(shù)據(jù)是 KV 對的形式(KV 的類型可自定義)(5)map()(maptask 進程)對每一個調用一次2)Reducer 階段(1)用戶自定義的 Reducer 要繼承的父類(2)Reducer 的輸入數(shù)據(jù)類型對應 Mappe

8、r 的輸出數(shù)據(jù)類型,也是 KV(3)Reducer 的業(yè)務邏輯寫在 reduce()中(4)Reducetask 進程對每一組相同 k 的組調用一次 reduce()3)Driver 階段整個程序需要一個 Drvier 來進行提交,提交的是一個描述了各種必要信息的 job 對象4)案例詳見 3.1.1 統(tǒng)計一堆文件中單詞出現(xiàn)的個數(shù)(WordCount 案例)。Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)1.5 MapReduce 程序運行流程分析MapReduce程序運行流程分析數(shù)據(jù)5Map

9、taskss.txt 0-1287 收集kv到緩存1 待處理文本/user/atguigu/inputK,vInputFormatoutputCollector8 按照k分區(qū)排序后寫入磁盤txtwordcountm處理a-gMapper6 邏輯運算11 輸出結果到文件 map(K,v)Context.write(k,v) tbb.txt100m128-200處理h-r2 客戶端submit()前,獲取待處理數(shù)據(jù)的信息,然后根據(jù)參數(shù)配置,形成一個任務分配的 。ss.txt 0-128ss.txt 128-200 3 提交切片信息reduce taskOutputFormatwordcountRe

10、duce reduce(k,v) Context.write(k,v);0-10010 reduce task獲取數(shù)據(jù),并運算Part-r-00001bb.txt處理s-z4 計算出maptask數(shù)量9 所有maptask任務完成后,啟動相應數(shù)量的reducetask,并告知reducetask處理數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))客戶端YarnRM1)在 MapReduce 程序文件的輸入目錄上存放相應的文件。2)客戶端程序在 submit()執(zhí)行前,獲取待處理的數(shù)據(jù)信息,然后根據(jù)集群中參數(shù)的配置形成一個任務分配。3)客戶端提交 job.split、jar 包、job.xml 等文件給 yarn,yarn

11、 中的 resourcemanager 啟動MRAppMaster。4)MRAppMaster 啟動后根據(jù)本次 job 的描述信息,計算出需要的 maptask 實例數(shù)量,然后向集群申請啟動相應數(shù)量的 maptask 進程。5)maptask 利用客戶指定的 inputformat 來數(shù)據(jù),形成輸入 KV 對。6)maptask 將輸入 KV 對傳遞給客戶定義的 map(),做邏輯運算7)map()運算完畢后將 KV 對收集到 maptask 緩存。8)maptask 緩存中的 KV 對按照 K 分區(qū)排序后不斷寫到磁盤文件9)MRAppMaster到所有 maptask 進程任務完成之后,會根

12、據(jù)客戶指定的參數(shù)啟動相應數(shù)量的 reducetask 進程,并告知 reducetask 進程要處理的數(shù)據(jù)分區(qū)。10)Reducetask 進程啟動之后,根據(jù) MRAppMaster 告知的待處理數(shù)據(jù)所在位置,從若干臺maptask 運行所在上獲取到若干個 maptask 輸出結果文件,并在本地進行重新歸并排序,然后按照相同 key 的 KV 為一個組,調用客戶定義的 reduce()進行邏輯運算。11)Reducetask 運算完畢后,調用客戶指定的 outputformat 將結果數(shù)據(jù)輸出到外部。Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官P

13、art-r-00002Mr appmasterNodeManagerreduce taskwordcountReduce reduce(k,v) Context.write(k,v);OutputFormatJob.split wc.jar Job.xmlMap task bb.txtMap task ss.txtPart-r-00000reduce taskwordcountReduce reduce(k,v) Context.write(k,v);OutputFormatatguigubigdatass.200HbahivspaJava An Hphp droid ml5Bigdatapy

14、thon尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)二 MapReduce 理論篇2.1 Writable 序列化序列化就是把內存中的對象,轉換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于(持久化)和傳輸。反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤的持久化數(shù)據(jù),轉換成內存中的對象。Java 的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系等),不便于在中高效傳輸。所以,hadoop開發(fā)了一套序列化機制(Writable),精簡、高效。2.1.1 常用數(shù)據(jù)序列化類型常用的數(shù)據(jù)類型對應的

15、hadoop 數(shù)據(jù)序列化類型2.1.2 自定義 bean 對象實現(xiàn)序列化接口1)自定義 bean 對象要想序列化傳輸,必須實現(xiàn)序列化接口,需要注意以下 7 項。(1)必須實現(xiàn) Writable 接口(2)反序列化時,需要反射調用空參構造函數(shù),所以必須有空參構造(3)重寫序列化(4)重寫反序列化(5)注意反序列化的順序和序列化的順序完全一致(6)要想把結果顯示在文件中,需要重寫 toString(),且用”t”,方便后續(xù)用(7)如果需要將自定義的 bean 放在key 中傳輸,則還需要實現(xiàn) comparable 接口,因為Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),

16、可尚硅谷(中國)官Java 類型Hadoop Writable 類型booleanBooleanWritablebyteByteWritableintIntWritablefloatFloatWritablelongLongWritabledoubleDoubleWritablestringTextmapMapWritablearrayArrayWritable尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)mapreduce 框中的 shuffle 過程一定會對 key 進行排序Java、HTML5、Android、python、大數(shù)據(jù)區(qū)】【網(wǎng)資料,可尚硅谷(中國)官/ 1 必須實現(xiàn)

17、Writable 接口public class FlowBean implements Writable private long upFlow; private long downFlow; private long sumFlow;/2 反序列化時,需要反射調用空參構造函數(shù),所以必須有public FlowBean() super();/* 3 重寫序列化* param out* throws IOException*/ Overridepublic void write(DataOutput out) throws IOException out.writeLong(upFlow);ou

18、t.writeLong(downFlow); out.writeLong(sumFlow);/* 4 重寫反序列化5 注意反序列化的順序和序列化的順序完全一致* param in* throws IOException*/ Overridepublic void readFields(DataInput in) throws IOException upFlow = in.readLong();downFlow = in.readLong(); sumFlow = in.readLong();/ 6 要想把結果顯示在文件中,需要重寫 toString(),且用”t”,方便后續(xù)用Override

19、public String toString() 尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)2)案例詳見 3.2.1 統(tǒng)計每一個號耗費的總上行流量、下行流量、總流量(序列化)。2.2 InputFormat 數(shù)據(jù)切片機制2.2.1 FileInputFormat 切片機制1)job 提交流程源碼詳解waitForCompletion()submit();/ 1 建立連接connect();/ 1)創(chuàng)建提交 job 的new Cluster(getConfiguration();/ (1)是本地 yarn 還是initialize(jobTrackAddr, conf);/ 2 提交

20、jobsubmitter.submitJobInternal(Job.this, cluster)/ 1)創(chuàng)建給集群提交數(shù)據(jù)的 Stag 路徑Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);/ 2)獲取 jobid ,并創(chuàng)建 job 路徑JobID jobId = submitClient.getNewJobID();/ 3)拷貝 jar 包到集群copyAndConfigureFiles(job, submitJobDir);rUploader.uploadFiles(job, jobSubmitDi

21、r);Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官return upFlow + t + downFlow + t + sumFlow;/7 如果需要將自定義的 bean 放在 key 中傳輸,則還需要實現(xiàn) comparable 接口,因為 mapreduce 框中的 shuffle 過程一定會對 key 進行排序Overridepublic int compareTo(FlowBean o) / 倒序排列,從大到小return this.sumFlow o.getSumFlow() ? -1 : 1;尚硅谷大數(shù)據(jù)技術之 Hadoop(MapRe

22、duce)/ 4)計算切片,生成切片文件writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);/ 5)向 Stag 路徑寫 xml 配置文件writeConf(conf, submitJobFile);conf.writeXml(out);/ 6)提交 job,返回提交狀態(tài)status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials();FileInputFor

23、mat源碼stagingDirJobSubmiterYarnRunnerLocalJobRunneryarn2)FileInputFormat 源碼(input.getSplits(job)(1)找到你數(shù)據(jù)的目錄。(2)開始遍歷處理(切片)目錄下的每一個文件(3)遍歷第一個文件 ss.txta)獲取文件大小 fs.sizeOf(ss.txt);b)計算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)=blocksize=128Mc)默認情況下,切片大小=blocksized)開始切,形成第 1 個切片:ss.tx

24、t0:128M 第 2 個切片 ss.txt128:256M第 3個切片 ss.txt256M:300M(每次切片時,都要切完剩下的部分是否大于塊的 1.1倍,Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官File:/./.staging/ .jar hfds:/.staging/job.jarmr程序運行在本地模擬器獲取job的jar包jarFile:/.staging/job.xmlhfds:/.staging/job.xml將job相關參數(shù)寫到文件Job.xmlCluster成員proxyFile:/.staging/job.splithfd

25、s:/.staging/job.split調用FileInputFormat.get Splits()獲取切片規(guī)劃List, 并序列化成文件Job.splitJob.submit();jobidFile:/.staging/jobidhfds:/.staging/jobidFile:/.staginghfds:/.stagingConfiguration conf=new Configuration(); Job=job.getInstance(conf); Job.waitForCompletion(true)尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)不大于 1.1 倍就劃分一塊

26、切片)e)將切片信息寫到一個切片文件中f)整個切片的過程在 getSplit()中完成。g)數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進行分片,并再磁盤上將其切分成分片進行。InputSplit 只了分片的元數(shù)據(jù)信息,比如起始位置、長度以及所在的節(jié)點列表等。h)注意:block 是 HDFS 上物理上的的數(shù)據(jù),切片是對數(shù)據(jù)邏輯上的劃分。(4)提交切片文件到 yarn 上,yarn 上的 MrAppMaster 就可以根據(jù)切片文件計算開啟 maptask 個數(shù)。3)FileInputFormat 中默認的切片機制:(1)簡單地按照文件的內容長度進行切片(2)切片大小,默認等于 block 大小(3)切片時不

27、考慮數(shù)據(jù)集整體,而是逐個每一個文件單獨切片比如待處理數(shù)據(jù)有兩個文件:經(jīng)過 FileInputFormat 的切片機制運算后,形成的切片信息如下:4)FileInputFormat 切片大小的參數(shù)配置(1)通過分析源碼,在 FileInputFormat 中,計算切片大小的邏輯:Math.max(minSize,Math.min(maxSize, blockSize);切片主要由這幾個值來運算決定mapreduce.input.fileinputformat.split.minsize=1 默認值為 1mapreduce.input.fileinputformat.split.maxsize=

28、Long.MAXValue 默認值 Long.MAXValue因此,默認情況下,切片大小=blocksize。maxsize(切片最大值):參數(shù)如果調得比 blocksize 小,則會讓切片變小,而且就等于配置的這個參數(shù)的值。minsize (切片最小值):參數(shù)調的比 blockSize 大,則可以讓切片變得比 blocksize 還Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官file1.txt.split1-0128 file1.txt.split2-128256 file1.txt.split3-256320 file2.txt.split1

29、-010Mfile1.txt320Mfile2.txt10M尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)大。5)獲取切片信息 API2.2.2 CombineTextInputFormat 切片機制關于大量件的優(yōu)化策略1)默認情況下 TextInputformat 對任務的切片機制是按文件切片,不管文件多小,都會是一個單獨的切片,都會交給一個 maptask,這樣如果有大量件,就會產(chǎn)生大量的maptask,處理效率極其低下。2)優(yōu)化策略(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預處理/),將件先合并成大文件,再上傳到 HDFS 做后續(xù)分析。(2)補救措施:如果已經(jīng)是大量件在 HDFS

30、 中了,可以使用另一種 InputFormat來做切片(CombineTextInputFormat),它的切片邏輯跟 TextFileInputFormat 不同:它可以將多個件從邏輯上到一個切片中,這樣,多個件就可以交給一個 maptask。(3)優(yōu)先滿足最小切片大小,不超過最大切片大小CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);/ 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);/ 2m舉例:0.5m+1m+0.3m+5m=2m + 4.8m=

31、2m + 4m + 0.8m3)具體實現(xiàn)步驟4)案例詳見 3.1.4 需求 4:大量件的切片優(yōu)化(CombineTextInputFormat)。2.2.3 自定義 InputFormat1)概述(1)自定義一個 InputFormatJava、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官/ 9 如果不設置 InputFormat,它默認用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxIn

32、putSplitSize(job, 4194304);/ 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);/ 2m/ 根據(jù)文件類型獲取切片信息FileSplit inputSplit = (FileSplit) context.getInputSplit();/ 獲取切片的文件名稱String name = inputSplit.getPath().getName();尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)(2)改寫 RecordReader,實現(xiàn)一次一個完整文件封裝為 KV(3)在輸出時使用 Seque

33、nceFileOutPutFormat 輸出合并文件2)案例詳見 3.5件處理(自定義 InputFormat)。2.3 MapTask 工作機制1)引出maptask 的并行度決定 map 階段的任務處理并發(fā)度,進而影響到整個 job 的處理速度。那么,mapTask 并行任務是否越多越好呢?2)MapTask 并行度決定機制一個 job 的 map 階段MapTask 并行度(個數(shù)),由客戶端提交 job 時的切片個數(shù)決定。3)MapTask 工作機制(1)Read 階段:Map Task 通過用戶編寫的 RecordReader,從輸入 InputSplit 中出一個個 key/valu

34、e。(2)Map 階段:該節(jié)點主要是將出的 key/value 交給用戶編寫 map()函數(shù)處理,并產(chǎn)生一系列新的 key/value。( 3 ) Collect 階段:在用戶編寫 map() 函數(shù)中, 當數(shù)據(jù)處理完成后, 一般會調用OutputCollector.collect()輸出結果。在該函數(shù)內部,它會將生成的 key/value 分區(qū)(調用Partitioner),并寫入一個環(huán)形內存緩沖區(qū)中。(4)Spill 階段:即“溢寫”,當環(huán)形緩沖區(qū)滿后,MapReduce 會將數(shù)據(jù)寫到本地磁盤上,個臨時文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進行一次本地排序,并在必要時對數(shù)據(jù)進

35、行合并、壓縮等操作。溢寫階段詳情:步驟 1:利用快速排序算法對緩存區(qū)內的數(shù)據(jù)進行排序,排序方式是,先按照分區(qū)編號partition 進行排序,然后按照 key 進行排序。這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為在一起,且同一分區(qū)內所有數(shù)據(jù)按照 key 有序。步驟 2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務工作目錄下的臨時文件 output/spillN.out(N 表示當前溢寫次數(shù))中。如果用戶設置了 Combiner,則寫入文件之前,對每個分區(qū)中的數(shù)據(jù)進行一次操作。步驟 3:將分區(qū)數(shù)據(jù)的元信息寫到內存索引數(shù)據(jù)結構 SpillRecord 中,其中每個分區(qū)的元Java、HTML5、Andro

36、id、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)信息在臨時文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當期內存索引大小超過 1MB,則將內存索引寫到文件 output/spillN.out.index 中。(5)Combine 階段:當所有數(shù)據(jù)處理完成后,MapTask 對所有臨時文件進行一次合并,以確保最終只會個數(shù)據(jù)文件。當所有數(shù)據(jù)處理完后,MapTask 會將所有臨時文件合并成一個大文件,并保存到文件output/file.out 中,同時生成相應的索引文件 output/file.out.index。在進行文件合并

37、過程中,MapTask 以分區(qū)為進行合并。對于某個分區(qū),它將采用多輪遞歸合并的方式。每輪合并 io.sort.factor(默認 100)個文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。讓每個 MapTask 最終只個數(shù)據(jù)文件,可避免同時打開大量文件和同時大量件產(chǎn)生的隨機帶來的開銷。2.4 Shuffle 機制2.4.1 Shuffle 機制Mapreduce 確保每個 reducer 的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過程(即將 map輸出作為輸入傳給 reducer)稱為 shuffle。2.4.2 MapReduce 工作流程1)流程示意

38、圖Java、HTML5、Android、python、大數(shù)據(jù)區(qū)】【網(wǎng)資料,可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)MapReduce詳細工作流程(一)ss.txt 0-1285 默認TextInputFormat7 環(huán)形緩沖區(qū)默認100MMap taskK,vInputFormatK,vreader()Mapper80%6 邏輯運算map(K,v)Context.write(k,v)9 溢出到文件(分區(qū)且區(qū)內有序)1 待處理文本/user/input ss.txt200m10 Merge 歸并排序outputCollector2 客戶端submit()前,獲取待

39、處理數(shù)據(jù)的信息,然后根據(jù)參數(shù)配置,形成一個任務分配的 。ss.txt 0-128ss.txt 128-200 3 提交切片信息partition0partition18 分區(qū)、排序11 合并ss.txt 128-200Merge 歸并排序 partition0partition1bb.txt4 計算出maptask數(shù)量客戶端YarnRMMapReduce詳細工作流程(二)Reducetask1reducer到reducetask本地磁盤1314 一次一組13 合并文件歸并排序Reduce(k,v)Context.write(kv)16 默認TextOutputFormatOutPutForma

40、tPart-r-00000015 分組Write(k,v)默認TextOutputFormatPart-r-00000112 所有maptask任務完成后,啟動相應數(shù)量的reducetask,并告知reducetask處理數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))Write(k,v)2)流程詳解上面的流程是整個 mapreduce 最全工作流程,但是 shuffle 過程只是從第 7 步開始到第16 步結束,具體 shuffle 過程詳解,如下:1)maptask 收集我們的 map()輸出的 kv 對,放到內存緩沖區(qū)中2)從內存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件3)多個溢出文件會被合并成大的溢出文件4

41、)在溢出過程中,及合并的過程中,都要調用 partitoner 進行分組和key 進行排序5)reducetask 根據(jù)的分區(qū)號,去各個 maptask上取相應的結果分區(qū)數(shù)據(jù)6)reducetask 會取到同一個分區(qū)的來自不同 maptask 的結果文件,reducetask 會將這些Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官Mr appmasterRecordWritera 2b 1c 1d 1Reducetask2reducerReduce(k,v) Context.wr te(kv)OutPutFormatmaptask210 Merge

42、 歸并排序partition0partition1RecordWritera 2b 1c 1d 1GroupingComparator(k,knext)maptask110 Merge 歸并排序partition0partition1Mr appmasterNodeManagerJob.split wc.jar Job.xmlMap taskabcabspillerKpareTo排序Combiner合并HashPartitioner分區(qū) RecorderReader尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)文件再進行合并(歸并排序)7)合并成大文件后,shuffle 的過程也就結束

43、了,后面進入 reducetask 的邏輯運算過程(從文件中取出一個一個的鍵值對 group,調用用戶自定義的 reduce())3)注意Shuffle 中的緩沖區(qū)大小會影響到 mapreduce 程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤 io 的次數(shù)越少,執(zhí)行速度就越快。緩沖區(qū)的大小可以通過參數(shù)調整,參數(shù):io.sort.mb默認 100M2.4.3 partition 分區(qū)0)引出:要求將統(tǒng)計結果按照條件輸出到不同文件中(分區(qū))。比如:將統(tǒng)計結果按照歸屬地不同省份輸出到不同文件中(分區(qū))1)默認 partition 分區(qū)默認分區(qū)是根據(jù) key 的 hashCode 對 reduceTas

44、ks 個數(shù)取模得到的。用戶沒法哪個key到哪個分區(qū)2)自定義 Partitioner 步驟(1)自定義類繼承 Partitioner,重新getPartition()Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官public class ProvincePartitioner extends Partitioner Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) / 1 獲取號碼的前三位String preNum = key.toString(

45、).substring(0, 3);int partition = 4;/ 2是哪個省if (136.equals(preNum) partition = 0;else if (137.equals(preNum) partition = 1;else if (138.equals(preNum) public class HashPartitioner extends Partitioner /* Use link Object#hashCode() to partition. */public int getPartition(K key, V value, int numReduceTa

46、sks) return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)(2)在 job 驅動中,設置自定義 partitioner:(3)自定義partition 后,要根據(jù)自定義 partitioner 的邏輯設置相應數(shù)量的 reduce task3)注意:如果 reduceTask 的數(shù)量 getPartition 的結果數(shù), 則會多產(chǎn)生幾個空的輸出文件part-r-000xx;如果 1reduceTask 的數(shù)量getPartition 的結果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放

47、,會Exception;如果 reduceTask 的數(shù)量=1,則不管 mapTask 端輸出多少個分區(qū)文件,最終結果都交給這一個 reduceTask,最終也就只會產(chǎn)生一個結果文件 part-r-00000;例如:假設自定義分區(qū)數(shù)為 5,則(1)job.setNumReduceTasks(1);會正常運行,只不過會產(chǎn)生一個輸出文件(2)job.setNumReduceTasks(2);會報錯(3)job.setNumReduceTasks(6);大于 5,正常運行,會產(chǎn)生空文件4)案例詳見 3.2.2 需求 2:將統(tǒng)計結果按照歸屬地不同省份輸出到不同文件中(Partitioner)詳見 3.

48、1.2 需求 2:把單詞按照 ASCII 碼奇偶分區(qū)(Partitioner)2.4.4 排序排序是 MapReduce 框架中最重要的操作之一。Map Task 和 Reduce Task 均會對數(shù)據(jù)(按照 key)進行排序。該操作屬于 Hadoop 的默認行為。任何應用的數(shù)據(jù)均會被排序,而不管邏輯上是否需要。對于 Map Task,它會將處理的結果暫時放到一個緩沖區(qū)中,當緩沖區(qū)使用率達到一定閾值后,再對緩沖區(qū)中的數(shù)據(jù)進行一次排序,并將這些有序數(shù)據(jù)寫到磁盤上,而當數(shù)據(jù)處理Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官job.setNumReduc

49、eTasks(5);job.setPartitionerClass(CustomPartitioner.class)partition = 2;else if (139.equals(preNum) partition = 3;return partition;尚硅谷大數(shù)據(jù)技術之 Hadoop(MapReduce)完畢后,它會對磁盤上所有文件進行一次合并,以將這些文件合并成一個大的有序文件。對于 Reduce Task,它從每個 Map Task 上拷貝相應的數(shù)據(jù)文件,如果文件大小超過一定閾值,則放到磁盤上,否則放到內存中。如果磁盤上文件數(shù)目達到一定閾值,則進行一次合并以個更大文件;如果內存中

50、文件大小或者數(shù)目超過一定閾值,則進行一次合并后將數(shù)據(jù)寫到磁盤上。當所有數(shù)據(jù)拷貝完畢后,Reduce Task 統(tǒng)一對內存和磁盤上的所有數(shù)據(jù)進行一次合并。每個階段的默認排序1)排序的分類:(1)部分排序:MapReduce 根據(jù)輸入的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內部排序。(2)全排序:如何用 Hadoop 產(chǎn)生一個全局排序的文件?最簡單的是使用一個分區(qū)。但該在處理大型文件時效率極低, 因為一臺必須處理所有輸出文件, 從而完全喪失了MapReduce 所提供的并行架構。替代方案:首先創(chuàng)建一系列排好序的文件;其次,串聯(lián)這些文件;最后,個全局排序的文件。主要思路是使用一個分區(qū)來描述輸出的全局排序。例如:可以為上述

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論