《Hadoop大數(shù)據(jù)原理與應(yīng)用實驗教程》實驗指導書-實驗3MapReduce編程_第1頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實驗教程》實驗指導書-實驗3MapReduce編程_第2頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實驗教程》實驗指導書-實驗3MapReduce編程_第3頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實驗教程》實驗指導書-實驗3MapReduce編程_第4頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實驗教程》實驗指導書-實驗3MapReduce編程_第5頁
已閱讀5頁,還剩35頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

《Hadoop大數(shù)據(jù)原理與應(yīng)用實驗教程》配套實驗指導書實驗3MapReduce編程編寫者:國信藍橋-顏群實驗3MapReduce編程本實驗的知識地圖如圖3-1所示(表示重點表示難點)。圖3-1實驗3MapReduce編程知識地圖一、實驗目的1.理解MapReduce編程思想。2.理解MapReduce作業(yè)執(zhí)行流程。3.理解MR-App編寫步驟,掌握使用MapReduceJavaAPI進行MapReduce基本編程,熟練掌握如何在Hadoop集群上運行MR-App并查看運行結(jié)果。4.熟練掌握MapReduceWeb界面的使用。5.掌握MapReduceShell常用命令的使用。二、實驗環(huán)境本實驗所需的軟件環(huán)境包括全分布模式Hadoop集群、Eclipse。三、實驗內(nèi)容1.啟動全分布模式Hadoop集群,守護進程包括NameNode、DataNode、SecondaryNameNode、ResourceManager、NodeManager和JobHistoryServer。2.在Hadoop集群主節(jié)點上搭建MapReduce開發(fā)環(huán)境Eclipse。3.查看Hadoop自帶的MR-App單詞計數(shù)源代碼WordCount.java,在Eclipse項目MapReduceExample下建立新包com.xijing.mapreduce,模仿內(nèi)置的WordCount示例,自己編寫一個WordCount程序,最后打包成JAR形式并在Hadoop集群上運行該MR-App,查看運行結(jié)果。4分別在自編MapReduce程序WordCount運行過程中和運行結(jié)束后查看MapReduceWeb界面。5.分別在自編MapReduce程序WordCount運行過程中和運行結(jié)束后練習MapReduceShell常用命令。6.關(guān)閉Hadoop集群。四、實驗原理(一)MapReduce編程思想MapReduce是Hadoop生態(tài)中的一款分布式計算框架,它可以讓不熟悉分布式計算的人員也能編寫出優(yōu)秀的分布式系統(tǒng),因此可以讓開發(fā)人員將精力專注到業(yè)務(wù)邏輯本身。MapReduce采用“分而治之”的核心思想,可以先將一個大型任務(wù)拆分成若干個簡單的子任務(wù),然后將每個子任務(wù)交給一個獨立的節(jié)點去處理。當所有節(jié)點的子任務(wù)都處理完畢后,再匯總所有子任務(wù)的處理結(jié)果,從而形成最終的結(jié)果。以“單詞統(tǒng)計”為例,如果要統(tǒng)計一個擁有海量單詞的詞庫,就可以先將整個詞庫拆分成若干個小詞庫,然后將各個小詞庫發(fā)送給不同的節(jié)點去計算,當所有節(jié)點將分配給自己的小詞庫中的單詞統(tǒng)計完畢后,再將各個節(jié)點的統(tǒng)計結(jié)果進行匯總,形成最終的統(tǒng)計結(jié)果。以上,“拆分”任務(wù)的過程稱為Map階段,“匯總”任務(wù)的過程稱為Reduce階段,如圖3-2所示。節(jié)點節(jié)點3海量詞庫小詞庫小詞庫小詞庫統(tǒng)計部分單詞統(tǒng)計全部單詞Map階段Reduce階段節(jié)點1節(jié)點2統(tǒng)計部分單詞節(jié)點4節(jié)點5圖3-2MapReduce執(zhí)行流程MapReduce在發(fā)展史上經(jīng)過一次重大改變,舊版MapReduce(MapReduce1.0)采用的是典型的Master/Slave結(jié)構(gòu),Master表現(xiàn)為JobTracker進程,而Slave表現(xiàn)為TaskTracker,MapReduce1.0體系架構(gòu)如圖3-3所示。但是這種架構(gòu)過于簡單,例如Master的任務(wù)過于集中,并且存在單點故障等問題。因此,MapReduce進行了一次重要的升級,舍棄JobTracker和TaskTracker,而改用了ResourceManager進程負責處理資源,并且使用ApplicationMaster進程管理各個具體的應(yīng)用,用NodeManager進程對各個節(jié)點的工作情況進行監(jiān)聽。升級后的MapReduce稱為MapReduce2.0,MapReduce2.0體系架構(gòu)如圖3-4所示。JobTrackerJobTrackerTaskTrackerClientClientTaskSchedulerMapTaskMapTaskReduceTaskTaskTrackerMapTaskMapTaskReduceTaskTaskTrackerMapTaskMapTaskReduceTask圖3-3MapReduce1.0體系架構(gòu)ResourceManagerResourceManagerNameNodeNodeManagerApplicationMasterDataNodeNodeManagerApplicationMasterDataNodeNodeManagerContainerDataNodeContainerNodeManagerContainerDataNodeNodeManagerContainerDataNodeNodeManagerContainerDataNodeClientClient圖3-4MapReduce2.0執(zhí)行作業(yè)時體系架構(gòu)(二)MapReduce作業(yè)執(zhí)行流程MapReduce作業(yè)的執(zhí)行流程主要包括InputFormat、Map、Shuffle、Reduce、OutputFormat五個階段,MapReduce作業(yè)執(zhí)行流程如圖3-5所示。最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>加載文件最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>寫入文件分布式文件系統(tǒng)(如HDFS)InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點1加載文件寫入文件InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點2分布式文件系統(tǒng)(如HDFS)關(guān)于MapReduce作業(yè)各個執(zhí)行階段的詳細說明,具體如下所示。(1)InputFormatInputFormat模塊首先對輸入數(shù)據(jù)做預處理,比如驗證輸入格式是否符合輸入定義;然后將輸入文件切分為邏輯上的多個InputSplit,InputSplit是MapReduce對文件進行處理和運算的輸入單位,并沒有對文件進行實際切割;由于InputSplit是邏輯切分而非物理切分,所以還需要通過RecordReader(圖4-4中的RR)根據(jù)InputSplit中的信息來處理InputSplit中的具體記錄,加載數(shù)據(jù)并轉(zhuǎn)換為適合Map任務(wù)讀取的鍵值對<key,valule>,輸入給Map任務(wù)。(2)MapMap模塊會根據(jù)用戶自定義的映射規(guī)則,輸出一系列的<key,value>作為中間結(jié)果。(3)Shuffle為了讓Reduce可以并行處理Map的結(jié)果,需要對Map的輸出進行一定的排序、分區(qū)、合并、歸并等操作,得到<key,List(value)>形式的中間結(jié)果,再交給對應(yīng)的Reduce進行處理,這個過程叫做Shuffle。(4)ReduceReduce以一系列的<key,List(value)>中間結(jié)果作為輸入,執(zhí)行用戶定義的邏輯,輸出<key,valule>形式的結(jié)果給OutputFormat。(5)OutputFormatOutputFormat模塊會驗證輸出目錄是否已經(jīng)存在以及輸出結(jié)果類型是否符合配置文件中的配置類型,如果都滿足,就輸出Reduce的結(jié)果到分布式文件系統(tǒng)。(三)MapReduceWebUIMapReduceWebUI接口面向管理員。可以在頁面上看到已經(jīng)完成的所有MR-App執(zhí)行過程中的統(tǒng)計信息,該頁面只支持讀,不支持寫。MapReduceWebUI的默認地址為http://JobHistoryServerIP:19888,可以查看MapReduce的歷史運行情況,如圖3-6所示。圖3-6MapReduce歷史情況(四)MapReduceShellMapReduceShell接口面向MapReduce程序員。程序員通過Shell接口能夠向YARN集群提交MR-App,查看正在運行的MR-App,甚至可以終止正在運行的MR-App。MapReduceShell命令統(tǒng)一入口為:mapred,語法格式如下:mapred[--configconfdir][--loglevelloglevel]COMMAND讀者需要注意的是,若$HADOOP_HOME/bin未加入到系統(tǒng)環(huán)境變量PATH中,則需要切換到Hadoop安裝目錄下,輸入“bin/mapred”。讀者可以使用“mapred-help”查看其幫助,命令“mapred”的具體用法和參數(shù)說明如圖3-7所示。圖3-7命令“mapred”用法MapReduceShell命令分為用戶命令和管理員命令。本章僅介紹部分命令,關(guān)于MapReduceShell命令的完整說明,讀者請參考官方網(wǎng)站/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html。1.用戶命令MapReduceShell用戶命令如表3-1所示。表3-1MapReduceShell用戶命令命令選項功能描述archive創(chuàng)建一個Hadoop檔案文件archive-logs將聚合日志合并到Hadoop檔案文件中classpath打印運行MapReduce子命令所需的包路徑distcp遞歸拷貝文件或目錄job管理MapReduce作業(yè)pipes運行Pipes任務(wù),此功能允許用戶使用C++語言編寫MapReduce程序queue查看JobQueue信息2.管理員命令MapReduceShell管理員命令如表3-2所示。表3-2MapReduceShell用戶命令命令選項功能描述historyserver啟動JobHistoryServer服務(wù)hsadminJobHistoryServer管理命令接口其中,命令“mapredhistoryserver”與啟動MapReduce的命令“mr-jobhistory-daemon.shstarthistoryserver”效果相同。讀者請注意,一般不建議使用命令start-all.sh啟動HDFS和YARN,而是建議使用start-dfs.sh和start-yarn.sh命令來分別啟動。另外,對于一般計算機而言,在執(zhí)行start-dfs.sh和start-yarn.sh命令之后最好等待一會兒再操作各種MapReduce命令,防止因為線程未加載完畢而導致的各種初始化問題。在MapReduce程序運行一段時間后,可能由于各種故障造成HDFS的數(shù)據(jù)在各個DataNode中的分布不均勻的情況,此時也只需要通過以下shell命令即可重新分布HDFS集群上的各個DataNode。$HADOOP_HOME/bin/start-balancer.sh此外,在啟動時可以通過日志看到“Namenodeinsafemode”提示,這表示系統(tǒng)正在處于安全模式,此時只需要等待一會即可(通常是十幾秒)。如果硬件資源較差,也可以通過執(zhí)行以下命令直接退出安全模式。$HADOOP_HOME/bin/hadoopdfsadmin-safemodeleave(五)MapReduceJavaAPIMapReduceJavaAPI接口面向Java開發(fā)工程師。程序員可以通過該接口編寫MR-App用戶層代碼MRApplicationBusinessLogic。基于YARN編寫的MR-App和基于MapReduce1.0編寫的MR-App編程步驟相同。MR-App稱為MapReduce應(yīng)用程序,標準YARN-App包含3部分:MRv2框架中的MRAppMaster、MRClient,加上用戶編寫的MRApplicationBusinessLogic(Mapper類和Reduce類),合稱為MR-App。MR-App編寫步驟如下所示:(1)編寫MRApplicationBusinessLogic。自行編寫。(2)編寫MRApplicationMaster。無需編寫,Hadoop開發(fā)人員已編寫好MRAppMaster.java。(3)編寫MRApplicationClient。無需編寫,Hadoop開發(fā)人員已編寫好YARNRunner.java。其中,MRApplicationBusinessLogic編寫步驟如下:(1)確定<key,value>對。(2)定制輸入格式。(3)Mapper階段,其業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Mapper類。(4)Reducer階段,其業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Reducer類。(5)定制輸出格式。編寫類后,在main方法里,按下述過程依次指向各類即可:(1)實例化配置文件類。(2)實例化Job類。(3)指向InputFormat類。(4)指向Mapper類。(5)指向Partitioner類。(6)指向Reducer類。(7)指向OutputFormat類。(8)提交任務(wù)。實際開發(fā)中,MapReduceJavaAPI最常用的類是org.apache.hadoop.mapreduce.Mapper和org.apache.hadoop.mapreduce.Reducer。常用的MapReduceJava類如表3-3所示。表3-3MapReduceJavaAPI常用類類名說明org.apache.hadoop.mapreduce.JobMapReduce作業(yè)類org.apache.hadoop.mapreduce.MapperMapper類,泛型類,帶有4個參數(shù),分別表示Map階段輸入數(shù)據(jù)的key類型、輸入數(shù)據(jù)的value類型、輸出數(shù)據(jù)的key類型、輸出數(shù)據(jù)的value類型。其中,輸入的key為Object(默認是行),輸入的值為Text(Hadoop中的String類型),輸出的key為Text(關(guān)鍵字),輸出的值為IntWritable(Hadoop中的int類型)org.apache.hadoop.mapreduce.ReducerReducer類,泛型類,帶有4個參數(shù),分別表示Reduce階段輸入數(shù)據(jù)的key類型、value類型,輸出數(shù)據(jù)的key類型、value類型org.apache.hadoop.mapreduce.InputFormatMapReduce接收輸入數(shù)據(jù)的頂級類org.apache.hadoop.mapreduce.OutputFormatMapReduce接收輸出數(shù)據(jù)的頂級類關(guān)于MapReduceAPI的完整說明,讀者請參考官方網(wǎng)站/docs/r2.9.2/api/index.html。五、實驗步驟(一)啟動Hadoop集群在主節(jié)點上依次執(zhí)行以下3條命令啟動全分布模式Hadoop集群。start-dfs.shstart-yarn.shmr-jobhistory-daemon.shstarthistoryserverstart-dfs.sh命令會在主節(jié)點上啟動NameNode和SecondaryNameNode服務(wù),會在從節(jié)點上啟動DataNode服務(wù);start-yarn.sh命令會在主節(jié)點上啟動ResourceManager服務(wù),會在從節(jié)點上啟動NodeManager服務(wù);mr-jobhistory-daemon.sh命令會在主節(jié)點上啟動JobHistoryServer服務(wù)。(二)搭建MapReduce開發(fā)環(huán)境Eclipse在Hadoop集群主節(jié)點上搭建MapReduce開發(fā)環(huán)境Eclipse,具體過程請讀者參考實驗項目2,此處不再贅述。(三)編寫并運行MapReduce程序WordCount查看Hadoop自帶的MR-App單詞計數(shù)源代碼WordCount.java,在Eclipse項目MapReduceExample下建立新包com.xijing.mapreduce,模仿內(nèi)置的WordCount示例,自己編寫一個WordCount程序,最后打包成JAR形式并在Hadoop集群上運行該MR-App,查看運行結(jié)果。具體過程如下所示。1.查看示例WordCount從$HADOOP_HOME/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.9.2-sources.jar中找到單詞計數(shù)源代碼文件WordCount.java,打開并查看源代碼,完整的源代碼如下所示。packageorg.apache.hadoop.examples;importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;packageorg.apache.hadoop.examples;importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassWordCount{ publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{ privatefinalstaticIntWritableone=newIntWritable(1); privateTextword=newText(); publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ StringTokenizeritr=newStringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); } } } publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{ privateIntWritableresult=newIntWritable(); publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum); context.write(key,result); } } publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length<2){ System.err.println("Usage:wordcount<in>[<in>...]<out>"); System.exit(2); } Jobjob=Job.getInstance(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(inti=0;i<otherArgs.length-1;++i){ FileInputFormat.addInputPath(job,newPath(otherArgs[i])); } FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); }}2.在Eclipse中創(chuàng)建Java項目進入/usr/local/eclipse中通過可視化桌面打開EclipseIDE,默認的工作空間為“/home/xuluhui/eclipse-workspace”。選擇菜單『File』→『New』→『JavaProject』,創(chuàng)建Java項目“MapReduceExample”,如圖3-8所示。本書中關(guān)于MapReduce編程實例均存放在此項目下。圖3-8創(chuàng)建Java項目“MapReduceExample”3.在項目中導入所需JAR包為了編寫關(guān)于MapReduce應(yīng)用程序,需要向Java工程中添加MapReduce核心包hadoop-mapreduce-client-core-2.9.2.jar,該包中包含了可以訪問MapReduce的JavaAPI,位于$HADOOP_HOME/share/hadoop/mapreduce下。另外,由于還需要對HDFS文件進行操作,所以還需要導入JAR包hadoop-common-2.9.2.jar,該包位于$HADOOP_HOME/share/hadoop/common下。若不導入這兩個JAR包,代碼將會出現(xiàn)錯誤。讀者可以按以下步驟添加該應(yīng)用程序編寫時所需的JAR包。(1)右鍵單擊Java項目“MapReduceExample”,從彈出的菜單中選擇『BuildPath』→『ConfigureBuildPath…』,如圖3-9所示。圖3-9進入“MapReduceExample”項目“JavaBuildPath”(2)進入窗口【PropertiesforMapReduceExample】,可以看到添加JAR包的主界面,如圖3-10所示。圖3-10添加JAR包主界面(3)單擊圖中的按鈕AddExternalJARS,依次添加jar文件$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.9.2.jar和$HADOOP_HOME/share/hadoop/common/hadoop-common-2.9.2.jar。其中添加JAR包hadoop-mapreduce-client-core-2.9.2.jar的過程如圖3-11所示,找到此JAR包后選中并單擊右上角的OK按鈕,這樣就成功把mapreduce-client-core-2.9.2.jar增加到了當前Java項目中。添加hadoop-common-2.9.2.jar的過程同此,不再贅述。圖3-11添加hadoop-mapreduce-client-core-2.9.2.jar到Java項目中(4)完成JAR包添加后的界面如圖3-12所示,單擊按鈕ApplyandClose。圖3-12完成JAR包添加后的界面(5)自動返回到Eclipse界面,如圖3-13所示,從圖中可以看到,項目“MapReduceExample”目錄樹下多了“ReferencedLibraries”,內(nèi)部有以上步驟添加進來的兩個JAR包。圖3-13添加JAR包后“MapReduceExample”項目目錄樹變化4.在項目中新建包右鍵單擊項目“MapReduceExample”,從彈出的快捷菜單中選擇『New』→『Package』,創(chuàng)建包“com.xijing.mapreduce”,如圖3-14所示。圖3-14創(chuàng)建包“com.xijing.mapreduce”5.自編MapReduce程序WordCount下面模仿示例WordCount自編一個WordCount應(yīng)用程序,借助MapReduceAPI,實現(xiàn)對輸入文件單詞頻次的統(tǒng)計。1)編寫Mapper類(1)右鍵單擊Java項目“MapReduceExample”中目錄“src”下的包“com.xijing.mapreduce”,從彈出的菜單中選擇『New』→『Class』,如圖3-15所示。圖3-15進入“com.xijing.mapreduce”包的新建類窗口(2)進入窗口【NewJavaClass】。可以看出,由于上步在包“com.xijing.mapreduce”下新建類,故此處不需要選擇該類所屬包;輸入新建類的名字,例如“WordCountMapper”,之所以這樣命名,是本類要實現(xiàn)Map階段業(yè)務(wù)邏輯,建議讀者命名時也要做到見名知意;讀者還可以選擇是否創(chuàng)建main函數(shù)。本實驗中新建類“WordCountMapper”的具體輸入和選擇如圖3-16所示。完成后單擊Finish按鈕。圖3-16新建類“WordCountMapper”(3)編寫Mapper類,自編WordCount程序Mapper類的源碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{ //自定義map方法 @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ Stringline=value.toString(); String[]words=line.split(""); for(Stringword:words){ //context.write()將數(shù)據(jù)交給下一階段處理shuffle context.write(newText(word),newIntWritable(1)); } }}2)編寫Reducer類在包“com.xijing.mapreduce”下新建類“WordCountReducer”,方法同上文“WordCountMapper”類。自編WordCount程序Reducer類的源碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{ //自定義reduce方法 @Override protectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritablevalue:values) sum+=value.get(); context.write(key,newIntWritable(sum)); }}3)編寫入口Driver類Mapper類和Reducer類編寫完畢后,再通過Driver類將本次Job進行設(shè)置。在包“com.xijing.mapreduce”下新建類“WordCountDriver”,方法同上文“WordCountMapper”,入口Driver類的源碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importpress.BZip2Codec;importpress.CompressionCodec;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassWordCountDriver{ //args:輸入文件路徑和輸出文件路徑 publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{ Configurationconf=newConfiguration(); //開啟map階段的壓縮 conf.setBoolean("press",true); //指定壓縮類型 conf.setClass("press.codec",BZip2Codec.class,CompressionCodec.class); Jobjob=Job.getInstance(conf,"wordcountdiy"); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); //使用了自定義Combine job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); //指定map輸出數(shù)據(jù)的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定reduce輸出數(shù)據(jù)的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設(shè)置輸入文件路徑 FileInputFormat.setInputPaths(job,newPath(args[0])); //設(shè)置輸出文件路徑 FileOutputFormat.setOutputPath(job,newPath(args[1])); //開啟reduce階段的解壓縮 FileOutputFormat.setCompressOutput(job,true); //指定解壓縮類型(需要與壓縮類型保持一致) FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class); booleanresult=job.waitForCompletion(true); System.exit(result?0:1); }}6.將MapReduce程序打包成JAR包為了運行寫好的MapReduce程序,需要首先將程序打包成JAR包??梢允褂肕aven或者Eclipse打JAR包,下面以Eclipse為例進行介紹。(1)右鍵單擊項目“MapReduceExample”,從彈出的快捷菜單中選擇『Export...』,如圖3-17所示。圖3-17進入“MapReduceExample”項目Export窗口(2)進入窗口【Export】,選擇Java→JARfile,單擊按鈕Next>,如圖3-18所示。圖3-18在窗口【Export】中選擇Java→JARfile(3)進入窗口【JARExport】,單擊按鈕Browse…選擇JAR包的導出位置和文件名,此處編者將其保存在/home/xuluhui/eclipse-workspace/MapReduceExample下,命名為WordCountDIY.jar,效果如圖3-19所示。圖3-19選擇打包文件及JAR包存放路徑及名字7.提交JAR包到Hadoop中運行與運行hadoop-mapreduce-examples-2.9.2.jar中的wordcount程序一樣,只需要執(zhí)行以下命令,就能在Hadoop集群中成功運行自己編寫的MapReduce程序了,命令如下所示。hadoopjar/home/xuluhui/eclipse-workspace/MapReduceExample/WordCountDIY.jarcom.xijing.mapreduce.WordCountDriver/InputDataTest/OutputDataTest4上述命令中,/InputDataTest表示輸入目錄,/OutputDataTest4表示輸出目錄。執(zhí)行該命令前,假設(shè)HDFS的目錄/InputDataTest下已存在待分析詞頻的3個文件,而輸出目錄/OutputDataTest4不存在,在執(zhí)行過程中會自動創(chuàng)建。部分執(zhí)行過程如圖3-20所示。圖3-20向Hadoop集群提交并運行自編WordCount的執(zhí)行過程(部分)8.查看運行結(jié)果如圖3-21所示,上述程序執(zhí)行完畢后,會將結(jié)果輸出到/OutputDataTest4目錄中,可以使用命令“hdfsdfs-ls/OutputDataTest4”來查看。圖3-20中/OutputDataTest4目錄下有2個文件,其中/OutputDataTest4/_SUCCESS表示Hadoop程序已執(zhí)行成功,這個文件大小為0,文件名就告知了Hadoop程序的執(zhí)行狀態(tài);第二個文件/OutputDataTest4/part-r-00000.bz2才是Hadoop程序的運行結(jié)果。由于輸出結(jié)果進行了壓縮,所以無法使用命令“hdfsdfs-cat/OutputDataTest4/part-r-00000.bz2”直接查看Hadoop程序的運行結(jié)果,查看效果如圖3-21所示。圖3-21無法使用-cat選項直接查看輸出文件為.bz2的結(jié)果若想查看輸出文件擴展名為.bz2的文件,讀者可以首先使用命令“hdfsdfs-get”將HDFS上的文件/OutputDataTest4/part-r-00000.bz2下載到本地操作系統(tǒng),然后使用命令“bzcat”查看.bz2文件的結(jié)果,使用命令及運行結(jié)果如圖3-22所示。圖3-22下載.bz2文件到本地并使用bzcat查看運行結(jié)果(四)練習使用MapReduceShell命令分別在自編MapReduce程序WordCount運行過程中和運行結(jié)束后練習MapReduceShell常用命令。例如,使用如下命令查看MapReduce作業(yè)的狀態(tài)信息。mapredjob-status<job-id>如圖3-23所示,當前MapReduce作業(yè)“job_1568702465801_0002”正處于運行(RUNNING)狀態(tài)。圖3-23通過命令“mapredjob-status”查看該MapReduce作業(yè)狀態(tài)(五)練習使用MapReduceWeb界面分別在自編MapReduce程序WordCount運行過程中和運行結(jié)束后查看MapReduceWeb界面。例如,如圖3-24所示,當前MapReduce作業(yè)“job_1568702465801_0002”已運行結(jié)束,其State為成功(SUCCEEDED)狀態(tài)。圖3-24通過MapReduceWeb查看該MapReduce作業(yè)信息(六)關(guān)閉Hadoop集群關(guān)閉全分布模式Hadoop集群的命令與啟動命令次序相反,只需在主節(jié)點master上依次執(zhí)行以下3條命令即可關(guān)閉Hadoop。mr-jobhistory-daemon.shstophistoryserverstop-yarn.shstop-dfs.sh執(zhí)行mr-jobhistory-daemon.shstophistoryserver時,其*historyserver.pid文件消失;執(zhí)行stop-yarn.sh時,*resourcemanager.pid和*nodemanager.pid文件依次消失;stop-dfs.sh,*namenode.pid、*datanode.pid、*secondarynamenode.pid文件依次消失。六、實驗報告要求實驗報告以電子版和打印版雙重形式提交。實驗報告主要內(nèi)容包括實驗名稱、實驗類型、實驗地點、學時、實驗環(huán)境、實驗原理、實驗步驟、實驗結(jié)果、總結(jié)與思考等。實驗報告格式如表1-9所示。七、拓展訓練(一)在Windows平臺上開發(fā)MapReduce程序在學習階段,我們也可以直接在Windows平臺上開發(fā)并運行MapReduce程序。【案例3-1】在Windows平臺上開發(fā)并運行MapReduce程序。具體實現(xiàn)過程如下所示。(1)將編譯后的Windows版本的Hadoop解壓到本地,并將解壓后的路徑設(shè)置為環(huán)境變量,如圖3-25所示。圖3-25配置HADOOP_HOME系統(tǒng)變量(2)將Hadoop中可執(zhí)行命令的目錄\bin和\sbin添加到環(huán)境變量PATH中,如圖3-26所示。圖3-26配置Hadoop環(huán)境變量(3)將剛剛解壓后的MapReduce中的相關(guān)jar文件引入工程,或者使用Maven引入需要的JAR包,pom.xml如下所示。<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>test</groupId><artifactId>mp</artifactId><version>1.0-SNAPSHOT</version><!--統(tǒng)一Hadoop版本號--><properties><hadoop.version>2.9.2</hadoop.version></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency></dependencies></project>(4)為了在運行時可以在Eclipse控制臺觀察到MapReduce的運行時日志,可以在項目中引入Log4j,并將perties存放在項目的CLASSPATH下,perties的內(nèi)容如下所示。log4j.rootLogger=DEBUG,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%5p[%t]-%m%n(5)在運行時,由于權(quán)限限制,還需要通過運行參數(shù)設(shè)置訪問Hadoop的用戶是master。具體方法是首先在Eclipse中單擊右鍵,從彈出快捷菜單中選擇『RunAs』→『RunConfigurations...』,如圖3-27所示。圖3-27進入配置運行參數(shù)窗口然后,在虛擬機參數(shù)中,通過語句“-DHADOOP_USER_NAME=master”指定執(zhí)行的用戶是master,如圖3-28所示。圖3-28設(shè)置VM參數(shù)(6)此時,便可以在本地通過main()方法直接運行MapReduce程序了。(二)MapReduce編程實踐:使用MapReduce統(tǒng)計對象中的某些屬性之前使用MapReduce統(tǒng)計的是單詞數(shù)量,而單詞本身屬于字面值,是比較容易計算的。本案例將會講解如何使用MapReduce統(tǒng)計對象中的某些屬性?!景咐?-2】以下是某個超市的結(jié)算記錄,從左往右各字段的含義依次是會員編號、結(jié)算時間、消費金額和用戶身份,請計算會員和非會員的平均消費金額。242315 2019-10-15.18:20:10 32 會員984518 2019-10-15.18:21:02 167 會員226335 2019-10-15.18:21:54 233 非會員341665 2019-10-15.18:22:11 5 非會員273367 2019-10-15.18:23:07 361 非會員296223 2019-10-15.18:25:12 19 會員193363 2019-10-15.18:25:55 268 會員671512 2019-10-15.18:26:04 76 非會員596233 2019-10-15.18:27:42 82 非會員323444 2019-10-15.18:28:02 219 會員345672 2019-10-15.18:28:48 482 會員...本案例的實現(xiàn)思路是:先計算會員和非會員的總消費金額,然后除以會員或非會員的數(shù)量。具體實現(xiàn)過程如下所示。(1)編寫實體類編寫封裝每個消費者記錄的實體類,每個消費者至少包含了編號、消費金額和是否為會員等屬性,源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.Writable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;publicclassCustomerimplementsWritable{ //會員編號 privateStringid; //消費金額 privateintmoney; //0:非會員1:會員 privateintvip; publicCustomer(){ } publicCustomer(Stringid,intmoney,intvip){ this.id=id; this.money=money; this.vip=vip; } publicintgetMoney(){ returnmoney; } publicvoidsetMoney(intmoney){ this.money=money; } publicStringgetId(){ returnid; } publicvoidsetId(Stringid){ this.id=id; } publicintgetVip(){ returnvip; } publicvoidsetVip(intvip){ this.vip=vip; } //序列化 publicvoidwrite(DataOutputdataOutput)throwsIOException{ dataOutput.writeUTF(id); dataOutput.writeInt(money); dataOutput.writeInt(vip); } //反序列化(注意:各屬性的順序要和序列化保持一致) publicvoidreadFields(DataInputdataInput)throwsIOException{ this.id=dataInput.readUTF(); this.money=dataInput.readInt(); this.vip=dataInput.readInt(); } @Override publicStringtoString(){ returnthis.id+"\t"+this.money+"\t"+this.vip; }}由于本次統(tǒng)計的Customer對象需要在Hadoop集群中的多個節(jié)點之間傳遞數(shù)據(jù),因此需要將Customer對象通過write(DataOutputdataOutput)方法進行序列化操作,并通過readFields(DataInputdataInput)進行反序列化操作。(2)編寫Mapper類在Map階段讀取文本中的消費者記錄信息,并將消費者的各個屬性字段拆分讀取,然后根據(jù)會員情況,將消費者的消費金額輸出到MapReduce的下一個處理階段(即Shuffle),源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassCustomerMapperextendsMapper<LongWritable,Text,Text,IntWritable>{ @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ //將一行內(nèi)容轉(zhuǎn)成string Stringline=value.toString(); //獲取各個顧客的消費數(shù)據(jù) String[]fields=line.split("\t"); //獲取消費金額 intmoney=Integer.parseInt(fields[2]); //獲取會員情況 Stringvip=fields[3]; /* 輸出 Key:會員情況,value:消費金額 例如: 會員32 會員167 非會員233 非會員5 */ context.write(newText(vip),newIntWritable(money)); }}(3)編寫Reducer類Map階段的輸出數(shù)據(jù)在經(jīng)過shuffle階段混洗以后,就會傳遞給Reduce階段。Reduce拿到的數(shù)據(jù)形式是“會員(或非會員),[消費金額1,消費金額2,消費金額3,...]”。因此,與WordCount類似,只需要在Reduce階段累加會員或非會員的總消費金額就能完成本次任務(wù),源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassCustomerReducerextendsReducer<Text,IntWritable,Text,LongWritable>{ @Override protectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ //統(tǒng)計會員(或非會員)的個數(shù) intvipCount=0; //總消費金額 longsumMoney=0; for(IntWritablemoney:values){ vipCount++; sumMoney+=money.get(); } //會員(或非會員)的平均消費金額 longavgMoney=sumMoney/vipCount; context.write(key,newLongWritable(avgMoney)); }}(4)編寫MapReduce程序的驅(qū)動類在編寫Ma

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論