用Hadoop進(jìn)行分布式并行編程-2_第1頁
用Hadoop進(jìn)行分布式并行編程-2_第2頁
用Hadoop進(jìn)行分布式并行編程-2_第3頁
用Hadoop進(jìn)行分布式并行編程-2_第4頁
用Hadoop進(jìn)行分布式并行編程-2_第5頁
已閱讀5頁,還剩6頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

1、用Hadoop進(jìn)行分布式并行編程(二)(注:本文檔來自hadoop in china)程序?qū)嵗c分析Hadoop 是一個(gè)實(shí)現(xiàn)了MapReduce 計(jì)算模型的開源分布式并行編程框架,借助于Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運(yùn)行于計(jì)算機(jī)集群上,完成海量數(shù)據(jù)的計(jì)算。在本文中,詳細(xì)介紹了如何針對一個(gè)具體的并行計(jì)算任務(wù),基于Hadoop 編寫程序,如何使用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯并運(yùn)行 Hadoop 程序。前言在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,

2、分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本文中,將針對一個(gè)具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序。分析 WordCount 程序我們先來看看 Hadoop 自帶的示例程序 WordCount, 這個(gè)程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。1.實(shí)現(xiàn)Map類見代碼清單1。這個(gè)類實(shí)現(xiàn) M

3、apper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個(gè)字符串拆成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡單地將其輸出的 <key,value> 對往 OutputCollector 中一丟即可,剩余的事框架自會幫你處理好。代碼中 LongWri

4、table, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。代碼清單1public static class MapClass extends MapReduceBase        implements Mapper<LongWritable, Text, Text,

5、IntWritable>        private final static IntWritable one = new IntWritable(1);        private Text word = new Text();                 pub

6、lic void map(LongWritable key, Text value,                OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException         String line = value.toStrin

7、g();        StringTokenizer itr = new StringTokenizer(line);        while (itr.hasMoreTokens()      word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn)Reduce類見代碼清單2。這個(gè)類實(shí)現(xiàn) Reducer 接口中的

8、reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個(gè) Iterator, 遍歷這個(gè) Iterator, 就可以得到屬于同一個(gè) key 的所有 value. 此處,key 是一個(gè)單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個(gè)單詞的總的出現(xiàn)次數(shù)。代碼清單 2public static class Reduce extends MapReduceBase        implements Reducer<Text, IntW

9、ritable, Text, IntWritable>          public void reduce(Text key, Iterator<IntWritable> values,                OutputCollector<Text, IntWritable> output, Report

10、er reporter) throws IOException         int sum = 0; while (values.hasNext()         output.collect(key, new IntWritable(sum);        sum += values.next().get(); 3.運(yùn)行Job在 Hado

11、op 中一次計(jì)算任務(wù)稱之為一個(gè) job, 可以通過一個(gè) JobConf 對象設(shè)置如何運(yùn)行這個(gè) job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類,使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫到輸出路徑下。然后將 JobConf 對象作為參數(shù),調(diào)用 JobClient 的 runJob, 開始執(zhí)行這個(gè)計(jì)算任務(wù)。至于 main 方

12、法中使用的 ToolRunner 是一個(gè)運(yùn)行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。代碼清單 3public int run(String args) throws Exception  JobConf conf = new JobConf(getConf(), WordCount.class);  conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.c

13、lass); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0;public static void main(String args) throws Excep

14、tion if(args.length != 2)           int res = ToolRunner.run(new Configuration(), new WordCount(), args);           System.exit(res);System.err.println("Usage: WordCount <input path> <output

15、path>");  System.exit(-1);以上就是 WordCount 程序的全部細(xì)節(jié),簡單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。4. 通過JobConf 定制計(jì)算任務(wù)通過上文所述的 JobConf 對象,程序員可以設(shè)定各種參數(shù),定制如何完成一個(gè)計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個(gè) java 接口,通過注入這些接口的特定實(shí)現(xiàn),可以定義一個(gè)計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計(jì)算程序時(shí)做到輕車熟路,游刃有余,明白哪些類是需要自己實(shí)現(xiàn)的,哪些類用 Hado

16、op 的缺省實(shí)現(xiàn)即可。表一是對 JobConf 對象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說明,表中第一列中的參數(shù)在 JobConf 中均會有相應(yīng)的 get/set 方法,對程序員來說,只有在表中第三列中的缺省值無法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會有一些其它的實(shí)現(xiàn),我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。表

17、一 JobConf 常用可定制參數(shù)參數(shù)作用缺省值其它實(shí)現(xiàn)inputFormat將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集inputSplits, 每一個(gè)InputSplit將 由一個(gè)Mapper負(fù)責(zé)處理。此外inputFormat中還提供一個(gè)RecordReader的 實(shí)現(xiàn), 將一個(gè)InputSplit解 析成key,value 對提供給 map 函數(shù)。TextInputFormat(針 對文本文件,按行將文本文件切割成InputSplits, 并用LineRecordReader將InputSplit解 析成 key,value 對,key是行在文件中的位置,value是文件中的一行)SequenceFi

18、leInputFormatOutputFormat提供一個(gè) RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果TextOutputFormat(用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個(gè)key,value對一行,key 和 value 之間用 tab 分隔)SequenceFileOutputFormatOutputKeyClass輸出的最終結(jié)果中 key 的類型LongWritable OutputValueClass輸出的最終結(jié)果中 value 的類型Text MapperClassMapper 類,實(shí)現(xiàn) map 函數(shù),完成輸入的 key,va

19、lue 到中間結(jié)果的映射IdentityMapper(將 輸入的 key,value 原封不動的輸出為中間結(jié)果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并null(不對中間結(jié)果中的重復(fù) key 做合并) ReducerClassReducer 類,實(shí)現(xiàn) reduce 函數(shù),對中間結(jié)果做合并,形成最終結(jié)果IdentityReducer(將 中間結(jié)果直接輸出為最終結(jié)果)AccumulatingReducer,LongSumReducerInputPath設(shè)定

20、 job 的輸入目錄, job 運(yùn)行時(shí)會處理輸入目錄下的所有文件null OutputPath設(shè)定 job 的輸出目錄,job 的最終結(jié)果會寫入輸出目錄下null MapOutputKeyClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型如果用戶沒有設(shè)定的話,使用OutputKeyClass MapOutputValueClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中value 的類型 如果用戶沒有設(shè)定的話,使用 OutputValuesClass OutputKeyComparator對結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器WritableC

21、omparable PartitionerClass對中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個(gè) Reducer 負(fù)責(zé)處理。HashPartitioner(使 用 Hash 函數(shù)做 partition)KeyFieldBasedPartitionerPipesPartitioner改進(jìn)的 WordCount 程序現(xiàn)在你對 Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標(biāo)點(diǎn)符號與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞

22、,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出 現(xiàn)頻率的降序進(jìn)行排序。1.修改 Mapper 類,實(shí)現(xiàn)目標(biāo)(1)實(shí)現(xiàn)很簡單,見代碼清單4中 的注釋。代碼清單 4public static class MapClass extends MapReduceBase          implements Mapper<LongWritable, Text, Text, IntWritable>         

23、         private final static IntWritable one = new IntWritable(1);                 private Text word = new Text();         &

24、#160;       private String pattern="w" /正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符                public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> out

25、put, Reporter reporter)        throws IOException                 String line = value.toString().toLowerCase(); /全部轉(zhuǎn)為小寫字母          

26、;       line = line.replaceAll(pattern, " "); /將非0-9, a-z, A-Z的字符替換為空格                StringTokenizer itr = new StringTokenizer(line);       

27、;  while (itr.hasMoreTokens()                         word.set(itr.nextToken(); output.collect(word, one);          

28、0;              2.實(shí)現(xiàn)目標(biāo)(2)用一個(gè)并行計(jì)算任務(wù)顯然是無法同時(shí)完成單詞詞頻統(tǒng)計(jì)和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個(gè)任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個(gè)任務(wù)(排序)的輸入,順序執(zhí)行兩個(gè)并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個(gè)排序任務(wù)并運(yùn)行之。在 Hadoop 中要實(shí)現(xiàn)排序是很簡單的,因?yàn)樵?MapReduce 的過程中,會把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給

29、R 個(gè) Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會有一個(gè)按 key 進(jìn)行排序的過程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)按詞頻排序,我們指定使用 InverseMapper 類作為排序任務(wù)的 Mapper 類( sortJob.setMapperClass(InverseMapper.class );),這個(gè)類的 map 函數(shù)簡單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序

30、的最終結(jié)果。我們無需指定 Reduce 類,Hadoop 會使用缺省的 IdentityReducer 類,將中間結(jié)果原樣輸出。還有一個(gè)問題需要解決: 排序任務(wù)中的 Key 的類型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class), Hadoop 默認(rèn)對 IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個(gè) IntWritableDecreasingComparator 類,并指定使用這個(gè)自定義的 Comparator 類對輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutput

31、KeyComparatorClass(IntWritableDecreasingComparator.class)詳見代碼清單 5 及其中的注釋。代碼清單 5public int run(String args) throws Exception         Path tempDir = new Path("wordcount-temp-" + Integer.toString(        new Rando

32、m().nextInt(Integer.MAX_VALUE); /定義一個(gè)臨時(shí)目錄         JobConf conf = new JobConf(getConf(), WordCount.class);try         conf.setJobName("wordcount");        conf.setOutputKe

33、yClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(MapClass.class);conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputPath(new Path(args0);conf.setOutputPath(tempDir); /先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫到臨時(shí)目/錄中, 下一個(gè)排序任務(wù)以臨時(shí)目錄為輸入目錄。conf.setOutputFormat(Sequ

34、enceFileOutputFormat.class);JobClient.runJob(conf);JobConf sortJob = new JobConf(getConf(), WordCount.class);sortJob.setJobName("sort"); sortJob.setInputPath(tempDir);sortJob.setInputFormat(SequenceFileInputFormat.class);sortJob.setMapperClass(InverseMapper.class);sortJob.setNumReduceTasks

35、(1); /將 Reducer 的個(gè)數(shù)限定為1, 最終輸出的結(jié)果/文件就是一個(gè)。 sortJob.setOutputPath(new Path(args1);sortJob.setOutputKeyClass(IntWritable.class);sortJob.setOutputValueClass(Text.class);sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class);JobClient.runJob(sortJob);  finally    &#

36、160;            FileSystem.get(conf).delete(tempDir); /刪除臨時(shí)目錄                        return 0;  private static clas

37、s IntWritableDecreasingComparator extends IntWritable.Comparator         public int compare(WritableComparable a, WritableComparable b) return -pare(a, b);         public int compare(byte b1, int s1, int l1, byte b2, int

38、 s2, int l2)                    return -pare(b1, s1, l1, b2, s2, l2);          在Eclipse 環(huán)境下進(jìn)行開發(fā)和調(diào)試在 Eclipse 環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個(gè) Eclipse plugin 可以簡化開發(fā)和部署 Hadoop 并行程序的過程?;谶@個(gè) plugin, 可以在 Eclipse 中創(chuàng)建一個(gè) Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類開發(fā)的向?qū)?,可以打包?JAR 文件,部署一個(gè) Hadoop MapReduce 應(yīng)用程序到一個(gè) Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過一個(gè)專門的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)??稍?IBM alphaWorks 網(wǎng)站下

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論