




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、.Hadoop Streaming 編程源地址: ://mapreduce/hadoop-streaming-programming/1、概述Hadoop Streaming是Hadoop提供的一個(gè)編程工具,它允許用戶使用任何可執(zhí)行文件或者腳本文件作為Mapper和Reducer,例如:采用shell腳本語(yǔ)言中的一些命令作為mapper和reducercat作為mapper,wc作為reducer$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input myInputDir
2、s -output myOutputDir -mapper cat -reducer wc本文安排如下,第二節(jié)介紹Hadoop Streaming的原理,第三節(jié)介紹Hadoop Streaming的使用方法,第四節(jié)介紹Hadoop Streaming的程序編寫(xiě)方法,在這一節(jié)中,用C+、C、shell腳本 和python實(shí)現(xiàn)了WordCount作業(yè),第五節(jié)總結(jié)了常見(jiàn)的問(wèn)題。文章最后給出了程序下載地址。本文內(nèi)容基于Hadoop-0.20.2版本注:假設(shè)你采用的語(yǔ)言為C或者C+,也可以使用Hadoop Pipes,詳細(xì)可參考這篇文章:Hadoop Pipes編程。關(guān)于Hadoop Streaming
3、高級(jí)編程方法,可參考這篇文章:Hadoop Streaming高級(jí)編程。2、Hadoop Streaming原理mapper和reducer會(huì)從標(biāo)準(zhǔn)輸入中讀取用戶數(shù)據(jù),一行一行處理后發(fā)送給標(biāo)準(zhǔn)輸出。Streaming工具會(huì)創(chuàng)立MapReduce作業(yè),發(fā)送給各個(gè)tasktracker,同時(shí)監(jiān)控整個(gè)作業(yè)的執(zhí)行過(guò)程。假設(shè)一個(gè)文件可執(zhí)行或者腳本作為mapper,mapper初始化時(shí),每一個(gè)mapper任務(wù)會(huì)把該文件作為一個(gè)單獨(dú)進(jìn)程啟動(dòng),mapper任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。 同時(shí),mapper搜集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把收到的每一行內(nèi)容轉(zhuǎn)化成key
4、/value對(duì),作為mapper的輸出。 默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的不包括tab作為value。假設(shè)沒(méi)有tab,整行作為key值,value值為null。對(duì)于reducer,類似。以上是Map/Reduce框架和streaming mapper/reducer之間的根本通信協(xié)議。3、Hadoop Streaming用法Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar optionsoptions:1-input:輸入文件途徑2-output:輸出文件途徑3-mapper:用戶
5、自己寫(xiě)的mapper程序,可以是可執(zhí)行文件或者腳本4-reducer:用戶自己寫(xiě)的reducer程序,可以是可執(zhí)行文件或者腳本5-file:打包文件到提交的作業(yè)中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。6-partitioner:用戶自定義的partitioner程序7-combiner:用戶自定義的combiner程序必須用java實(shí)現(xiàn)8-D:作業(yè)的一些屬性以前用的是-jonconf,詳細(xì)有: 1mapred.map
6、.tasks:map task數(shù)目 2mapred.reduce.tasks:reduce task數(shù)目 3stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為t。
7、160; 4stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數(shù)目 5stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為t。&
8、#160; 6stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數(shù)目另外,Hadoop本身還自帶一些好用的Mapper和Reducer:1 Hadoop聚集功能Aggregate提供一個(gè)特殊的reducer類和一個(gè)特殊的combiner類,并且有一系列的“聚合器例如“sum,“max,“min等用于聚合一組value的序列。用戶可以使用Aggregate定義
9、一個(gè)mapper插件類,這個(gè)類用于為mapper輸入的每個(gè)key/value對(duì)產(chǎn)生“可聚合項(xiàng)。Combiner/reducer利用適當(dāng)?shù)木酆掀骶酆线@些可聚合項(xiàng)。要使用Aggregate,只需指定“-reducer aggregate。2字段的選取類似于Unix中的cutHadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduc幫助用戶高效處理文本數(shù)據(jù),就像unix中的“cut工具。工具類中的map函數(shù)把輸入的key/value對(duì)看作字段的列表。 用戶可以指定字段的分隔符默認(rèn)是tab,可以選擇字段列表中任意一段由列表中一個(gè)或多個(gè)字段
10、組成作為map輸出的key或者value。 同樣,工具類中的reduce函數(shù)也把輸入的key/value對(duì)看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。4、Mapper和Reducer實(shí)現(xiàn)本節(jié)試圖用盡可能多的語(yǔ)言編寫(xiě)Mapper和Reducer,包括Java,C,C+,Shell腳本,python等。由于Hadoop會(huì)自動(dòng)解析數(shù)據(jù)文件到Mapper或者Reducer的標(biāo)準(zhǔn)輸入中,以供它們讀取使用,所有應(yīng)先理解各個(gè)語(yǔ)言獲取標(biāo)準(zhǔn)輸入的方法。程序見(jiàn)Hadoop streaming.rar5、常見(jiàn)問(wèn)題1作業(yè)總是運(yùn)行失?。盒枰裮apper文件和reducer文件放到各個(gè)
11、tasktracker上,保證各個(gè)節(jié)點(diǎn)均有一份。也可在提交作業(yè)時(shí),采用-file選項(xiàng)指定這些文件。2用腳本編寫(xiě)時(shí),第一行需注明腳本解釋器,默認(rèn)是shell6、參考資料【1】C+&Python實(shí)現(xiàn)Hadoop Streaming的paritioner和模塊化【2】如何在Hadoop中使用Streaming編寫(xiě)MapReduce【3】Hadoop如何與C+結(jié)合【4】Hadoop Streaming和pipes理解 下面是一個(gè)用C+寫(xiě)的WordCount例如: :/bbs.hadoopor /thread-420-1-2.html1. #include "hadoop/Pipes.
12、hh"2. #include "hadoop/TemplateFactory.hh"3. #include "hadoop/StringUtils.hh"4.5. class WordCountMap: public HadoopPipes:Mapper 6. public:7. WordCountMapHadoopPipes:TaskContext& context8. void mapHadoopPipes:MapContext& context 9.
13、; std:vector<std:string> words =10. HadoopUtils:splitStringcontext.getInputValue, " "11. forunsigned int i=0; i < words.size; +i 12. context.emitwordsi, "1"13. 14. 15. ;16.17. cla
14、ss WordCountReduce: public HadoopPipes:Reducer 18. public:19. WordCountReduceHadoopPipes:TaskContext& context20. void reduceHadoopPipes:ReduceContext& context 21. int sum = 0;22. while context.nextValue 23. sum +=
15、HadoopUtils:toIntcontext.getInputValue;24. 25. context.emitcontext.getInputKey, HadoopUtils:toStringsum;26. 27. ;28.29. int mainint argc, char *argv 30. return HadoopPipes:runTaskHadoopPipes:TemplateFactory<WordCountMap,31. &
16、#160; WordCountReduce>32. 復(fù)制代碼編譯,生成可執(zhí)行文件:# ant -Dcompile.c+=yes examples將可執(zhí)行文件編譯上傳到HDFS:# bin/hadoop fs -put build/c+-examples/Linux-i386-32/bin /examples/bin創(chuàng)立配置文件:1. # vi src/examples/pipes/co
17、nf/word.xml2.3. <?xml version="1.0"?>4. <configuration>5. <property>6. / Set the binary path on DFS7. <name>hadoop.pipes.executable</name>8. <value>/examples/bin/wordcount</value>9.
18、</property>10. <property>11. <name>hadoop.pipes.java.recordreader</name>12. <value>true</value>13. </property>14. <property>15. <name>hadoop.pipes.java.recordwriter<
19、/name>16. <value>true</value>17. </property>18. </configuration>復(fù)制代碼運(yùn)行例如:# bin/hadoop pipes -conf src/examples/pipes/conf/word.xml -input in-dir -output out-dir如何在Hadoop中使用Streaming編寫(xiě)MapReduce :/bbs.hadoopor /thread-256-1-1.html 馬士華 發(fā)表于:2020-03-05
20、 12:51 最后更新于:2020-03-25 11:18版權(quán)聲明:可以任意轉(zhuǎn)載,轉(zhuǎn)載時(shí)請(qǐng)務(wù)必以超鏈接形式標(biāo)明文章原始出處和作者信息。 :/ Michael G. Noll在他的Blog中提到如何在Hadoop中用Python編寫(xiě)MapReduce程序,韓國(guó)的gogamza在其Bolg中也提到如何用C編寫(xiě)MapReduce程序我略微修改了一下原程序,因?yàn)樗腗ap對(duì)單詞切分使用tab鍵。我合并他們兩人的文章,也讓國(guó)內(nèi)的Hadoop用戶可以使用別的語(yǔ)言來(lái)編寫(xiě)MapReduce程序。首先您得配好您的Hadoop集群,這方面的介紹網(wǎng)上比較多,這兒給個(gè)鏈接Hadoop學(xué)習(xí)筆記二 安裝部署。Hadoop
21、 Streaming幫助我們用非Java的編程語(yǔ)言使用MapReduce,Streaming用STDIN 標(biāo)準(zhǔn)輸入和STDOUT 標(biāo)準(zhǔn)輸出來(lái)和我們編寫(xiě)的Map和Reduce進(jìn)展數(shù)據(jù)的交換數(shù)據(jù)。任何可以使用STDIN和STDOUT都可以用來(lái)編寫(xiě)MapReduce程序,比方我們用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。我們還是使用Hadoop的例子WordCount來(lái)做示范如何編寫(xiě)MapReduce,在WordCount的例子中我們要解決計(jì)算在一批文檔中每一個(gè)單詞的出現(xiàn)頻率。首先我們?cè)贛ap程序中會(huì)承受到這批文檔每一行的數(shù)據(jù),然后我們編寫(xiě)的Ma
22、p程序把這一行按空格切開(kāi)成一個(gè)數(shù)組。并對(duì)這個(gè)數(shù)組遍歷按"<word> 1"用標(biāo)準(zhǔn)的輸出輸出來(lái),代表這個(gè)單詞出現(xiàn)了一次。在Reduce中我們來(lái)統(tǒng)計(jì)單詞的出現(xiàn)頻率。Python CodeMap: mapper.py#!/usr/bin/env pythonimport sys# maps words to their countsword2count = # input comes from STDIN standard inputfor line in sys.stdin: # remove leading and trailing whitespace line
23、 = line.strip # split the line into words while removing any empty strings words = filterlambda word: word, line.split # increase counters for word in words: # write the results to STDOUT standard output; # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py #
24、# tab-delimited; the trivial word count is 1 print '%st%s' % word, 1復(fù)制代碼Reduce: reducer.py#!/usr/bin/env pythonfrom operator import itemgetterimport sys# maps words to their countsword2count = # input comes from STDINfor line in sys.stdin: # remove leading and trailing whitespace line = line
25、.strip # parse the input we got from mapper.py word, count = line.split # convert count currently a string to int try: count = intcount word2countword = word2count.getword, 0 + count except ValueError: # count was not a number, so silently # ignore/discard this line pass# sort the words lexigraphica
26、lly;# this step is NOT required, we just do it so that our# final output will look more like the official Hadoop# word count examplessorted_word2count = sortedword2count.items, key=itemgetter0# write the results to STDOUT standard outputfor word, count in sorted_word2count: print '%st%s'% wo
27、rd, count復(fù)制代碼C CodeMap: Mapper.c#include <stdio.h>#include <string.h>#include <stdio.h>#include <stdlib.h>#define BUF_SIZE 2048#define DELIM "n"int mainint argc, char *argv char bufferBUF_SIZE; whilefgetsbuffer, BUF_SIZE - 1, stdin int len = strlenbuffer; ifbufferle
28、n-1 = 'n' bufferlen-1 = 0; char *querys = indexbuffer, ' ' char *query = NULL; ifquerys = NULL continue; querys += 1; /* not to include 't' */ query = strtokbuffer, " " whilequery printf"%st1n", query; query = strtokNULL, " " return 0;復(fù)制代碼Reduce:
29、 Reducer.c#include <stdio.h>#include <string.h>#include <stdio.h>#include <stdlib.h>#define BUFFER_SIZE 1024#define DELIM "t"int mainint argc, char *argv char strLastKeyBUFFER_SIZE; char strLineBUFFER_SIZE; int count = 0; *strLastKey = '0' *strLine = '0&
30、#39; while fgetsstrLine, BUFFER_SIZE - 1, stdin char *strCurrKey = NULL; char *strCurrNum = NULL; strCurrKey = strtokstrLine, DELIM; strCurrNum = strtokNULL, DELIM; /* necessary to check error but. */ if strLastKey0 = '0' strcpystrLastKey, strCurrKey; ifstrcmpstrCurrKey, strLastKey printf&qu
31、ot;%st%dn", strLastKey, count; count = atoistrCurrNum; else count += atoistrCurrNum; strcpystrLastKey, strCurrKey; printf"%st%dn", strLastKey, count; /* flush the count */ return 0;復(fù)制代碼首先我們調(diào)試一下源碼:chmod +x mapper.py chmod +x reducer.py echo "foo foo quux labs foo bar quux" |
32、./mapper.py | ./reducer.py bar 1 foo 3 labs 1 quux 2 g+ Mapper.c -o Mapper g+ Reducer.c -o Reducer chmod +x Mapper chmod +x Reducer echo "foo foo quux labs foo bar quux" | ./Mapper | ./Reducer bar 1 foo 2 labs 1 quux 1 foo 1 quux 1復(fù)制代碼你可能看到C的輸出和Python的不一樣,因?yàn)镻ython是把他放在詞典里了.我們?cè)贖adoop時(shí),會(huì)對(duì)這進(jìn)展
33、排序,然后一樣的單詞會(huì)連續(xù)在標(biāo)準(zhǔn)輸出中輸出.在Hadoop中運(yùn)行程序首先我們要下載我們的測(cè)試文檔wget :/ /dirs/etext04/7ldvc10.txt.我們把文檔存放在/tmp/doc這個(gè)目錄下.拷貝測(cè)試文檔到HDFS中.bin/hadoop dfs -copyFromLocal /tmp/doc doc復(fù)制代碼運(yùn)行 bin/hadoop dfs -ls doc 看看拷貝是否成功.接下來(lái)我們運(yùn)行我們的MapReduce的Job.bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -
34、mapper /home/hadoop/Mapper-reducer /home/hadoop/Reducer -input doc/* -output c-output -jobconf mapred.reduce.tasks=1additionalConfSpec_:nullnull=userJobConfProps_.getstream.shipped.hadoopstreamingpackageJobJar: /home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar /tmp/streamjob60816.jar tmpDi
35、r=null08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 108/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs: /home/msh/data/filesystem/mapred/local08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_202003031752_000308/03/04 19:03:13 INFO streaming.StreamJo
36、b: To kill this job, run:08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/./bin/hadoop job -Dmapred.job.tracker=2:9001 -kill job_202003031752_000308/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: :/hadoop-master:50030/jobdetails.jsp?jobid=job_202003031752_000308/03
37、/04 19:03:14 INFO streaming.StreamJob: map 0% reduce 0%08/03/04 19:03:15 INFO streaming.StreamJob: map 33% reduce 0%08/03/04 19:03:16 INFO streaming.StreamJob: map 100% reduce 0%08/03/04 19:03:19 INFO streaming.StreamJob: map 100% reduce 100%08/03/04 19:03:19 INFO streaming.StreamJob: Job complete:
38、job_202003031752_000308/03/04 19:03:19 INFO streaming.StreamJob: Output: c-outputbin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/mapper.py-reducer /home/hadoop/reducer.py -input doc/* -output python-output -jobconf mapred.reduce.tasks=1additionalConfSpec_:nullnul
39、l=userJobConfProps_.getstream.shipped.hadoopstreamingpackageJobJar: /home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar /tmp/streamjob26099.jar tmpDir=null08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 108/03/04 19:05:41 INFO streaming.StreamJob: getLocalDir
40、s: /home/msh/data/filesystem/mapred/local08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_202003031752_000408/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/./bin/hadoop job -Dmapred.job.tracker=2
41、:9001 -kill job_202003031752_000408/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: :/hadoop-master:50030/jobdetails.jsp?jobid=job_202003031752_000408/03/04 19:05:42 INFO streaming.StreamJob: map 0% reduce 0%08/03/04 19:05:48 INFO streaming.StreamJob: map 33% reduce 0%08/03/04 19:05:49 INFO s
42、treaming.StreamJob: map 100% reduce 0%08/03/04 19:05:52 INFO streaming.StreamJob: map 100% reduce 100%08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_202003031752_000408/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output復(fù)制代碼當(dāng)Job提交后我們還可以在web的界面 :/localhost:50030/看到每一個(gè)工作的運(yùn)行情況
43、。 當(dāng)Job工作完成后我們可以在c-output和python-output看到一些文件bin/hadoop dfs -ls c-output復(fù)制代碼輸入下面的命令我們可以看到我們運(yùn)行完MapReduce的結(jié)果bin/hadoop dfs -cat c-output/part-00000復(fù)制代碼用Hadoop Streaming運(yùn)行MapReduce會(huì)比較用Java的代碼要慢,因?yàn)橛袃煞矫娴脑?使用 Java API >> C Streaming >> Perl Streaming 這樣的一個(gè)流程運(yùn)行會(huì)阻塞IO.不像Java在運(yùn)行Map后輸出結(jié)果有一定數(shù)量的結(jié)果集就啟
44、動(dòng)Reduce的程序,用Streaming要等到所有的Map都運(yùn)行完畢后才啟動(dòng)Reduce假設(shè)用Python編寫(xiě)MapReduce的話,另一個(gè)可選的是使用Jython來(lái)轉(zhuǎn)編譯Pyhton為Java的原生碼另外對(duì)于C程序員更好的選擇是使用Hadoop新的C+ MapReduce API Pipes來(lái)編寫(xiě)不管怎樣,畢竟Hadoop提供了一種不使用Java來(lái)進(jìn)展分布式運(yùn)算的方法下面是從 :/ lunchpauze /2007/10/writing-hadoop-mapreduce-program-in-php.html頁(yè)面中摘下的用php編寫(xiě)的MapReduce程序,供php程序員參考:Map: m
45、apper.php#!/usr/bin/php<?$word2count = array;/ input comes from STDIN standard inputwhile $line = fgetsSTDIN != false / remove leading and trailing whitespace and lowercase $line = strtolowertrim$line; / split the line into words while removing any empty string $words = preg_split'/W/', $
46、line, 0, PREG_SPLIT_NO_EMPTY; / increase counters foreach $words as $word $word2count$word += 1; / write the results to STDOUT standard output/ what we output here will be the input for the/ Reduce step, i.e. the input for reducer.pyforeach $word2count as $word => $count / tab-delimited echo $wor
47、d, chr9, $count, PHP_EOL;?>復(fù)制代碼Reduce: mapper.php#!/usr/bin/php<?$word2count = array;/ input comes from STDINwhile $line = fgetsSTDIN != false / remove leading and trailing whitespace $line = trim$line; / parse the input we got from mapper.php list$word, $count = explodechr9, $line; / convert
48、count currently a string to int $count = intval$count; / sum counts if $count > 0 $word2count$word += $count;/ sort the words lexigraphically/ this set is NOT required, we just do it so that our/ final output will look more like the official Hadoop/ word count examplesksort$word2count;/ write the
49、 results to STDOUT standard outputforeach $word2count as $word => $count echo $word, chr9, $count, PHP_EOL;?>復(fù)制代碼c+ && python 實(shí)現(xiàn)Hadoop Streaming 的partitioner和模塊化 :/ 這些東西是我自己的理解,假設(shè)有錯(cuò)誤的地方,或者有哪些地方走了彎路,請(qǐng)幫我指出我的錯(cuò)誤,謝謝Hadoop Streaming 是一個(gè)工具, 代替編寫(xiě)Java的實(shí)現(xiàn)類,而利用可執(zhí)行程序來(lái)完成map-reduce過(guò)程工作流程:InputFile -
50、> mappers -> Partitioner -> reducers -> outputFiles理解 :1 輸入文件,可以是指定遠(yuǎn)程文件系統(tǒng)內(nèi)的文件夾下的 *2 通過(guò)集群自己分解到各個(gè)PC上,每個(gè)mapper是一個(gè)可執(zhí)行文件,相應(yīng)的啟動(dòng)一個(gè)進(jìn)程,來(lái)實(shí)現(xiàn)你的邏輯3 mapper的輸入為標(biāo)準(zhǔn)輸入,所以,任何可以支持標(biāo)準(zhǔn)輸入的可執(zhí)行的東西,c,c+編譯出來(lái)的可執(zhí)行文件,python,.都可以作 為mapper 和 reducermapper的輸出為標(biāo)準(zhǔn)輸出,假設(shè)有Partitioner,就給它,假設(shè)沒(méi)有,它的輸出將作為reducer的輸入4 Partitioner 為可
51、選的項(xiàng),二次排序,可以對(duì)結(jié)果進(jìn)展分類打到結(jié)果文件里面,它的輸入是mapper的標(biāo)準(zhǔn)輸出,它的輸出,將作為reducer的標(biāo)準(zhǔn)輸入5 reducer 同 mapper6 輸出文件夾,在遠(yuǎn)端文件不能重名Hadoop Streaming1 : hadoop-streaming.jar 的位置 : $HADOOP_HOME/contrib/streaming 內(nèi)官方上面關(guān)于hadoop-streaming 的介紹已經(jīng)很詳細(xì)了,而且也有了關(guān)于python的例子,我就不說(shuō)了,這里總結(jié)下自己的經(jīng)歷1 指定 mapper or reducer 的 task 官方上說(shuō)要用 -jobconf但是這個(gè)參數(shù)已經(jīng)過(guò)時(shí),
52、不可以用了,官方說(shuō)要用 -D, 注意這個(gè)-D是要作為最開(kāi)場(chǎng)的配置出現(xiàn)的,因?yàn)槭窃趍aper 和 reducer執(zhí)行之前,就需要硬性指定好的,所以要出如今參數(shù)的最前面 ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D .-input .類似這樣,這樣,即使你程序最后只指定了一個(gè)輸出管道,但是還是會(huì)有你指定的task數(shù)量的結(jié)果文件,只不過(guò)多余的就是空的實(shí)驗(yàn)以下 就知道了2 關(guān)于二次排序,由于是用的streaming 所以,在可執(zhí)行文件內(nèi),只可以處理邏輯,還有就是輸出,當(dāng)然我們也可以指定二次排序,但是由于是全部參數(shù)化,不是很靈敏。比方:0
53、 3 1 renren 1 baidu 0 1這樣一個(gè)很規(guī)整的輸入文件,需求是要把記錄獨(dú)立的ip和url的count但是輸出文件要分分割出來(lái)。官方網(wǎng)站的例子,是指定 key然后對(duì)key 指定 主-key和 key用來(lái)排序,而 主-key 用來(lái)二次排序,這樣會(huì)輸出你想要的東西,但是對(duì)于上面最簡(jiǎn)單的需求,對(duì)于傳遞參數(shù),我們?nèi)绾巫瞿?其實(shí)我們還是可以利用這一點(diǎn),在我們mapper里面,還是按照/t來(lái)分割key value但是我
54、們要給key指定一個(gè)主-key用來(lái)給Partitioner 來(lái)實(shí)現(xiàn)二次排序,所以我們可以略微處理下這個(gè)KEY,我們可以簡(jiǎn)單的判斷出來(lái)ip和 url的區(qū)別,這樣,我們就人為的加上一個(gè)主-key我們?cè)趍apper里面,給每個(gè)key人為的加上一個(gè)"標(biāo)簽",用來(lái)給partitioner做 二次排序用,比方我們的mapper的輸出是這樣D&0 1D&3 1W& renren 1W& baidu 1D&10.2.
55、3.40 1 然后通過(guò)傳遞命令參數(shù)-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner /指定要求二次排序-jobconf map.output.key.field.separator='&'/這里假設(shè)不加兩個(gè)單引號(hào)的話我的命令會(huì)死掉-jobconf num.key.fields.for.partition=1/這里指第一個(gè) &符號(hào)來(lái)分割,保證不會(huì)出錯(cuò)這樣我們就可以通過(guò) partitioner來(lái)實(shí)現(xiàn)二次排序了在reducer里面,我們?cè)侔?/p>
56、"標(biāo)簽"摘掉不費(fèi)吹灰之力就可以做到悄無(wú)聲息的完成二次排序了。3:關(guān)于模塊化強(qiáng)調(diào):沒(méi)有在集群上測(cè)試,只在單機(jī)上做測(cè)試程序員最悲劇的就是不能代碼復(fù)用,做這個(gè)也一樣,用hadoop-streaming也一樣,要做到代碼重用,是我第一個(gè)考慮的問(wèn)題當(dāng)我看到 -file詳細(xì)可以看官方網(wǎng)站上的講解的時(shí)候,我就想到利用這個(gè)東西,果然,我的在本機(jī)上建立了一個(gè)py模塊,簡(jiǎn)單的一個(gè)函數(shù)然后在我的mapper里面import 它,本地測(cè)試通過(guò)后,利用-file把模塊所在的問(wèn)價(jià)夾用 -file moudle/*這個(gè)參數(shù),傳入streaming執(zhí)行的結(jié)果毫無(wú)錯(cuò)誤,這樣,我們就可以抽象出來(lái)一些模塊的東西
57、,來(lái)實(shí)現(xiàn)我們模塊化的需求注 : 不要忘記 chmod +x *.py 將py變成可執(zhí)行的,不然不可以運(yùn)行代碼 :1: 模塊代碼 mg.py 用來(lái)給 mapper貼標(biāo)簽def mgFunctionline: ifline0 >= '0' and line0 <= '9': &
58、#160; return "D&" + line return "W&" + line2: mapper.py #!/usr/bin/env pythonimport syssys.path.append'/home/liuguoqing/Desktop/hadoop-0.19.2/moudle'
59、;import mgfor line in sys.stdin: line = mg.mgFunctionline line = line.strip# print line words = line.split print '%st%s' % words0, words13: reducer.py#!/usr/bin/env&
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 礦石買賣協(xié)議合同范本
- 聚合物薄膜加工中結(jié)晶與流動(dòng)失穩(wěn)的數(shù)值模擬研究
- 施工機(jī)械揚(yáng)塵排放治理措施及改進(jìn)方案
- 珠寶行業(yè)供貨方案與市場(chǎng)分析
- 鋼增強(qiáng)塑料復(fù)合管項(xiàng)目籌資方案
- 氣管切開(kāi)護(hù)理流程的培訓(xùn)方案
- 銷售協(xié)議樣本書(shū)樣格
- 外墻涂料施工安全管理方案范文
- 七年級(jí)下學(xué)期體育健康教育計(jì)劃
- 工廠消防安全檢查流程與標(biāo)準(zhǔn)
- 家校共育之道
- DeepSeek入門(mén)寶典培訓(xùn)課件
- 西安2025年陜西西安音樂(lè)學(xué)院專職輔導(dǎo)員招聘2人筆試歷年參考題庫(kù)附帶答案詳解
- 《作文中間技巧》課件
- 廣東省2025年中考物理仿真模擬卷(深圳)附答案
- 2025屆八省聯(lián)考 新高考適應(yīng)性聯(lián)考英語(yǔ)試題(原卷版)
- 新蘇教版一年級(jí)下冊(cè)數(shù)學(xué)第1單元第3課時(shí)《8、7加幾》作業(yè)
- 2024年山東電力高等??茖W(xué)校高職單招職業(yè)技能測(cè)驗(yàn)歷年參考題庫(kù)(頻考版)含答案解析
- 《平面廣告賞析》課件
- 人教鄂教版六年級(jí)下冊(cè)科學(xué)全冊(cè)知識(shí)點(diǎn)
- (正式版)HGT 22820-2024 化工安全儀表系統(tǒng)工程設(shè)計(jì)規(guī)范
評(píng)論
0/150
提交評(píng)論