大數(shù)據(jù)技術(shù)原理與應(yīng)用第七章MapReduce分析_第1頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用第七章MapReduce分析_第2頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用第七章MapReduce分析_第3頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用第七章MapReduce分析_第4頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用第七章MapReduce分析_第5頁
已閱讀5頁,還剩40頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第七章MapReduce提要7.1概述7.2MapReduce體系構(gòu)造7.3MapReduce工作流程7.4實(shí)例分析:WordCount7.5MapReduce的詳細(xì)應(yīng)用7.6MapReduce編程實(shí)踐7.1 概述7.1.1 分布式并行編程7.1.2 MapReduce模型簡(jiǎn)介7.1.3 Map和Reduce函數(shù)7.1.1 分布式并行編程“摩爾定律”,CPU性能大約每隔18個(gè)月翻一番從2023年開始摩爾定律逐漸失效,需要處理的數(shù)據(jù)量迅速增長(zhǎng),人們開始借助于分布式并行編程來提升程序性能分布式程序運(yùn)營(yíng)在大規(guī)模計(jì)算機(jī)集群上,能夠并行執(zhí)行大規(guī)模數(shù)據(jù)處理任務(wù),從而取得海量的計(jì)算能力google企業(yè)最先提出了分布式并行編程模型MapReduce,HadoopMapReduce是它的開源實(shí)現(xiàn),后者比前者使用門檻低諸多7.1.1 分布式并行編程問題:在MapReduce出現(xiàn)之前,已經(jīng)有像MPI這么非常成熟的并行計(jì)算框架了,那么為何Google還需要MapReduce?MapReduce相較于老式的并行計(jì)算框架有什么優(yōu)勢(shì)?傳統(tǒng)并行計(jì)算框架MapReduce集群架構(gòu)/容錯(cuò)性共享式(共享內(nèi)存/共享存儲(chǔ)),容錯(cuò)性差非共享式,容錯(cuò)性好硬件/價(jià)格/擴(kuò)展性刀片服務(wù)器、高速網(wǎng)、SAN,價(jià)格貴,擴(kuò)展性差普通PC機(jī),便宜,擴(kuò)展性好編程/學(xué)習(xí)難度what-how,難what,簡(jiǎn)單適用場(chǎng)景實(shí)時(shí)、細(xì)粒度計(jì)算、計(jì)算密集型批處理、非實(shí)時(shí)、數(shù)據(jù)密集型7.1.2 MapReduce模型簡(jiǎn)介MapReduce將復(fù)雜的、運(yùn)營(yíng)于大規(guī)模集群上的并行計(jì)算過程高度地抽象到了兩個(gè)函數(shù):Map和Reduce編程輕易,不需要掌握分布式并行編程細(xì)節(jié),也能夠很輕易把自己的程序運(yùn)營(yíng)在分布式系統(tǒng)上,完畢海量數(shù)據(jù)的計(jì)算MapReduce采用“分而治之”策略,一種存儲(chǔ)在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集,會(huì)被切提成許多獨(dú)立的分片(split),這些分片能夠被多種Map任務(wù)并行處理MapReduce設(shè)計(jì)的一種理念就是“計(jì)算向數(shù)據(jù)靠攏”,而不是“數(shù)據(jù)向計(jì)算靠攏”,因?yàn)椋苿?dòng)數(shù)據(jù)需要大量的網(wǎng)絡(luò)傳播開銷MapReduce框架采用了Master/Slave架構(gòu),涉及一種Master和若干個(gè)Slave。Master上運(yùn)營(yíng)JobTracker,Slave上運(yùn)營(yíng)TaskTrackerHadoop框架是用Java實(shí)現(xiàn)的,但是,MapReduce應(yīng)用程序則不一定要用Java來寫7.1.3 Map和Reduce函數(shù)函數(shù)輸入輸出說明Map<k1,v1>如:<行號(hào),”abc”>List(<k2,v2>)如:<“a”,1><“b”,1><“c”,1>1.將小數(shù)據(jù)集進(jìn)一步解析成一批<key,value>對(duì),輸入Map函數(shù)中進(jìn)行處理2.每一個(gè)輸入的<k1,v1>會(huì)輸出一批<k2,v2>。<k2,v2>是計(jì)算的中間結(jié)果Reduce<k2,List(v2)>如:<“a”,<1,1,1>><k3,v3><“a”,3>輸入的中間結(jié)果<k2,List(v2)>中的List(v2)表示是一批屬于同一個(gè)k2的value7.2MapReduce的體系構(gòu)造MapReduce體系構(gòu)造主要由四個(gè)部分構(gòu)成,分別是:Client、JobTracker、TaskTracker以及Task7.2MapReduce的體系構(gòu)造MapReduce主要有如下4個(gè)部分構(gòu)成:1)Client顧客編寫的MapReduce程序經(jīng)過Client提交到JobTracker端顧客可經(jīng)過Client提供的某些接口查看作業(yè)運(yùn)營(yíng)狀態(tài)2)JobTrackerJobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度JobTracker監(jiān)控全部TaskTracker與Job的健康情況,一旦發(fā)覺失敗,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點(diǎn)JobTracker會(huì)跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會(huì)在資源出現(xiàn)空閑時(shí),選擇合適的任務(wù)去使用這些資源7.2MapReduce的體系構(gòu)造3)TaskTrackerTaskTracker會(huì)周期性地經(jīng)過“心跳”將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)營(yíng)進(jìn)度報(bào)告給JobTracker,同步接受JobTracker發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如開啟新任務(wù)、殺死任務(wù)等)TaskTracker使用“slot”等量劃分本節(jié)點(diǎn)上的資源量(CPU、內(nèi)存等)。一種Task獲取到一種slot后才有機(jī)會(huì)運(yùn)營(yíng),而Hadoop調(diào)度器的作用就是將各個(gè)TaskTracker上的空閑slot分配給Task使用。slot分為Mapslot和Reduceslot兩種,分別供MapTask和ReduceTask使用4)TaskTask分為MapTask和ReduceTask兩種,均由TaskTracker開啟7.3 MapReduce工作流程7.3.1 工作流程概述7.3.2 MapReduce各個(gè)執(zhí)行階段7.3.3 Shuffle過程詳解7.3.1 工作流程概述圖7-1MapReduce工作流程Shuffle7.3.1 工作流程概述不同的Map任務(wù)之間不會(huì)進(jìn)行通信不同的Reduce任務(wù)之間也不會(huì)發(fā)生任何信息互換顧客不能顯式地從一臺(tái)機(jī)器向另一臺(tái)機(jī)器發(fā)送消息全部的數(shù)據(jù)互換都是經(jīng)過MapReduce框架本身去實(shí)現(xiàn)的7.3.2 MapReduce各個(gè)執(zhí)行階段7.3.2 MapReduce各個(gè)執(zhí)行階段HDFS以固定大小的block為基本單位存儲(chǔ)數(shù)據(jù),而對(duì)于MapReduce而言,其處理單位是split。split是一種邏輯概念,它只涉及某些元數(shù)據(jù)信息,例如數(shù)據(jù)起始位置、數(shù)據(jù)長(zhǎng)度、數(shù)據(jù)所在節(jié)點(diǎn)等。它的劃分措施完全由顧客自己決定。有關(guān)Split(分片)7.3.2 MapReduce各個(gè)執(zhí)行階段Reduce任務(wù)的數(shù)量最優(yōu)的Reduce任務(wù)個(gè)數(shù)取決于集群中可用的reduce任務(wù)槽(slot)的數(shù)目一般設(shè)置比reduce任務(wù)槽數(shù)目稍微小某些的Reduce任務(wù)個(gè)數(shù)(這么能夠預(yù)留某些系統(tǒng)資源處理可能發(fā)生的錯(cuò)誤)Map任務(wù)的數(shù)量Hadoop為每個(gè)split創(chuàng)建一種Map任務(wù),split的多少?zèng)Q定了Map任務(wù)的數(shù)目。大多數(shù)情況下,理想的分片大小是一種HDFS塊7.3.3 Shuffle過程詳解圖7-3Shuffle過程

1.Shuffle過程簡(jiǎn)介7.3.3 Shuffle過程詳解2.Map端的Shuffle過程每個(gè)Map任務(wù)分配一種緩存MapReduce默認(rèn)100MB緩存設(shè)置溢寫百分比0.8分區(qū)默認(rèn)采用哈希函數(shù)排序是默認(rèn)的操作排序后能夠合并(Combine)合并不能變化最終止果在Map任務(wù)全部結(jié)束之邁進(jìn)行歸并歸并得到一種大的文件,放在本地磁盤文件歸并時(shí),假如溢寫文件數(shù)量不小于預(yù)定值(默認(rèn)是3)則能夠再次開啟Combiner,少于3不需要JobTracker會(huì)一直監(jiān)測(cè)Map任務(wù)的執(zhí)行,并告知Reduce任務(wù)來領(lǐng)取數(shù)據(jù)合并(Combine)和歸并(Merge)的區(qū)別:兩個(gè)鍵值對(duì)<“a”,1>和<“a”,1>,假如合并,會(huì)得到<“a”,2>,假如歸并,會(huì)得到<“a”,<1,1>>7.3.3 Shuffle過程詳解3.Reduce端的Shuffle過程Reduce任務(wù)經(jīng)過RPC向JobTracker問詢Map任務(wù)是否已經(jīng)完畢,若完畢,則領(lǐng)取數(shù)據(jù)Reduce領(lǐng)取數(shù)據(jù)先放入緩存,來自不同Map機(jī)器,先歸并,再合并,寫入磁盤多種溢寫文件歸并成一種或多種大文件,文件中的鍵值對(duì)是排序的當(dāng)數(shù)據(jù)極少時(shí),不需要溢寫到磁盤,直接在緩存中歸并,然后輸出給Reduce7.3.3 Shuffle過程詳解3.Reduce端的Shuffle過程圖7-5Reduce端的Shuffle過程7.3.4 MapReduce應(yīng)用程序執(zhí)行過程7.4 實(shí)例分析:WordCount7.4.1 WordCount程序任務(wù)7.4.2 WordCount設(shè)計(jì)思緒7.4.3 一種WordCount執(zhí)行過程的實(shí)例7.4.1 WordCount程序任務(wù)表7-2WordCount程序任務(wù)程序WordCount輸入一個(gè)包含大量單詞的文本文件輸出文件中每個(gè)單詞及其出現(xiàn)次數(shù)(頻數(shù)),并按照單詞字母順序排序,每個(gè)單詞和其頻數(shù)占一行,單詞和頻數(shù)之間有間隔表7-3一種WordCount的輸入和輸出實(shí)例輸入輸出HelloWorldHelloHadoopHelloMapReduceHadoop1Hello3MapReduce1World17.4.2 WordCount設(shè)計(jì)思緒首先,需要檢驗(yàn)WordCount程序任務(wù)是否能夠采用MapReduce來實(shí)現(xiàn)其次,擬定MapReduce程序的設(shè)計(jì)思緒最終,擬定MapReduce程序的執(zhí)行過程7.4.3 一種WordCount執(zhí)行過程的實(shí)例圖7-7Map過程示意圖7.4.3 一種WordCount執(zhí)行過程的實(shí)例圖7-8顧客沒有定義Combiner時(shí)的Reduce過程示意圖7.4.3 一種WordCount執(zhí)行過程的實(shí)例圖7-9顧客有定義Combiner時(shí)的Reduce過程示意圖7.5MapReduce的詳細(xì)應(yīng)用MapReduce能夠很好地應(yīng)用于多種計(jì)算問題關(guān)系代數(shù)運(yùn)算(選擇、投影、并、交、差、連接)分組與聚合運(yùn)算矩陣-向量乘法矩陣乘法7.5MapReduce的詳細(xì)應(yīng)用用MapReduce實(shí)現(xiàn)關(guān)系的自然連接7.5MapReduce的詳細(xì)應(yīng)用用MapReduce實(shí)現(xiàn)關(guān)系的自然連接假設(shè)有關(guān)系R(A,B)和S(B,C),對(duì)兩者進(jìn)行自然連接操作使用Map過程,把來自R的每個(gè)元組<a,b>轉(zhuǎn)換成一種鍵值對(duì)<b,<R,a>>,其中的鍵就是屬性B的值。把關(guān)系R涉及到值中,這么做使得我們能夠在Reduce階段,只把那些來自R的元組和來自S的元組進(jìn)行匹配。類似地,使用Map過程,把來自S的每個(gè)元組<b,c>,轉(zhuǎn)換成一種鍵值對(duì)<b,<S,c>>全部具有相同B值的元組被發(fā)送到同一種Reduce進(jìn)程中,Reduce進(jìn)程的任務(wù)是,把來自關(guān)系R和S的、具有相同屬性B值的元組進(jìn)行合并Reduce進(jìn)程的輸出則是連接后的元組<a,b,c>,輸出被寫到一種單獨(dú)的輸出文件中7.5MapReduce的詳細(xì)應(yīng)用用MapReduce實(shí)現(xiàn)關(guān)系的自然連接7.6 MapReduce編程實(shí)踐7.6.1 任務(wù)要求7.6.2 編寫Map處理邏輯7.6.3 編寫Reduce處理邏輯7.6.4 編寫main措施7.6.5 編譯打包代碼以及運(yùn)營(yíng)程序7.6.6Hadoop中執(zhí)行MapReduce任務(wù)的幾種方式7.6.1任務(wù)要求文件A的內(nèi)容如下:ChinaismymotherlandIloveChina文件B的內(nèi)容如下:IamfromChina期望成果如右側(cè)所示:I2is1China3my1love1am1from1motherland17.6.2編寫Map處理邏輯Map輸入類型為<key,value>期望的Map輸出類型為<單詞,出現(xiàn)次數(shù)>Map輸入類型最終擬定為<Object,Text>Map輸出類型最終擬定為<Text,IntWritable>publicstaticclassMyMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ StringTokenizeritr=newStringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); }}}7.6.3編寫Reduce處理邏輯在Reduce處理數(shù)據(jù)之前,Map的成果首先經(jīng)過Shuffle階段進(jìn)行整頓Reduce階段的任務(wù):對(duì)輸入數(shù)字序列進(jìn)行求和Reduce的輸入數(shù)據(jù)為<key,Iterable容器>Reduce任務(wù)的輸入數(shù)據(jù):<”I”,<1,1>><”is”,1>……<”from”,1><”China”,<1,1,1>>publicstaticclassMyReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum);context.write(key,result);}}7.6.3編寫Reduce處理邏輯7.6.4編寫main措施publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();//程序運(yùn)營(yíng)時(shí)參數(shù)String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2){System.err.println("Usage:wordcount<in><out>");System.exit(2);}Jobjob=newJob(conf,"wordcount");//設(shè)置環(huán)境參數(shù)job.setJarByClass(WordCount.class);//設(shè)置整個(gè)程序的類名job.setMapperClass(MyMapper.class);//添加MyMapper類job.setReducerClass(MyReducer.class);//添加MyReducer類job.setOutputKeyClass(Text.class);//設(shè)置輸出類型job.setOutputValueClass(IntWritable.class);//設(shè)置輸出類型FileInputFormat.addInputPath(job,newPath(otherArgs[0]));//設(shè)置輸入文件FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));//設(shè)置輸出文件System.exit(job.waitForCompletion(true)?0:1);}importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassWordCount{//WordCount類的詳細(xì)代碼見下一頁}完整代碼publicclassWordCount{publicstaticclassMyMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{StringTokenizeritr=newStringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); }}} publicstaticclassMyReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum); context.write(key,result);}} publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length!=2){ System.err.println("Usage:wordcount<in><out>"); System.exit(2); } Jobjob=newJob(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}7.6.5編譯打包代碼以及運(yùn)營(yíng)程序試驗(yàn)環(huán)節(jié):使用java編譯程序,生成.class文件將.class文件打包為jar包運(yùn)營(yíng)jar包(需要開啟Hadoop)查看成果7.6.5編譯打包代碼以及運(yùn)營(yíng)程序Hadoop2.x版本中的依賴jarHadoop2.x版本中jar不再集中在一種hadoop-core*.jar中,而是提成多種jar,如使用Hadoop

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論