Storm入門教程第二章構(gòu)建Topology_第1頁(yè)
Storm入門教程第二章構(gòu)建Topology_第2頁(yè)
Storm入門教程第二章構(gòu)建Topology_第3頁(yè)
免費(fèi)預(yù)覽已結(jié)束,剩余1頁(yè)可下載查看

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、2.1Storm基本概念1.Topologies2.Streams3.Spouts4.Bolts5.Streamgroupings6.Reliability7.Tasks8.Workers9.Configuration1.Topologies2.Streams3.Spouts4.Bolts5.Streamgroupings6.Reliability7.Tasks8.Workers9.Configuration在運(yùn)行一個(gè)Storm任務(wù)之前,需要了解一些概念:Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運(yùn)行的是MapReducejobs,而在Storm上運(yùn)行的是拓?fù)?topo

2、logy),這兩者之間是非常不一樣的。一個(gè)關(guān)鍵的區(qū)別是:一個(gè)MapReducejob最終會(huì)結(jié)束,而一個(gè)topology永遠(yuǎn)會(huì)運(yùn)行(除非你手動(dòng)kill掉)。在Storm的集群里面有兩種節(jié)點(diǎn):控制節(jié)點(diǎn)(masternode)和工作節(jié)點(diǎn)(workernode)??刂乒?jié)點(diǎn)上面運(yùn)行一個(gè)叫Nimbus后臺(tái)程序,它的作用類似Hadoop里面的JobTracker。Nimbus負(fù)責(zé)在集群里面分發(fā)代碼,分配計(jì)算任務(wù)給機(jī)器,并且監(jiān)控狀態(tài)。每一個(gè)工作節(jié)點(diǎn)上面運(yùn)行一個(gè)叫做Supervisor的節(jié)點(diǎn)。Supervisor會(huì)監(jiān)聽分配給它那臺(tái)機(jī)器的工作,根據(jù)需要啟動(dòng)/關(guān)閉工作進(jìn)程。每一個(gè)工作進(jìn)程執(zhí)行一個(gè)topology的一

3、個(gè)子集;一個(gè)運(yùn)行的topology由運(yùn)行在很多機(jī)器上的很多工作進(jìn)程組成。Nimbus和Supervisor之間的所有協(xié)調(diào)工作都是通過Zookeeper集群完成。另夕卜,Nimbus進(jìn)程和Supervisor進(jìn)程都是快速失敗(fail-fast)和無狀態(tài)的。所有的狀態(tài)要么在zookeeper里面,要么在本地磁盤上。這也就意味著你可以用kill-9來殺死Nimbus和Supervisor進(jìn)程,然后再重啟它們,就好像什么都沒有發(fā)生過。這個(gè)設(shè)計(jì)使得Storm異常的穩(wěn)定。2.1.1 Topologies一個(gè)topology是spouts和bolts組成的圖,通過streamgroupings將圖中的sp

4、outs和bolts連接起來,如下圖:一個(gè)topology會(huì)一直運(yùn)行直到你手動(dòng)kill掉,Storm自動(dòng)重新分配執(zhí)行失敗的任務(wù),并且Storm可以保證你不會(huì)有數(shù)據(jù)丟失(如果開啟了高可靠性的話)。如果一些機(jī)器意外停機(jī)它上面的所有任務(wù)會(huì)被轉(zhuǎn)移到其他機(jī)器上。運(yùn)行一個(gè)topology很簡(jiǎn)單。首先,把你所有的代碼以及所依賴的jar打進(jìn)一個(gè)jar包。然后運(yùn)行類似下面的這個(gè)命令:stormjarall-my-code.jararg1arg2這個(gè)命令會(huì)運(yùn)行主類:,參數(shù)是arg1,arg2。這個(gè)類的main函數(shù)定義這個(gè)topology并且把它提交給Nimbus。stormjar負(fù)責(zé)連接到Nimbus并且上傳ja

5、r包。Topology的定義是一個(gè)Thrift結(jié)構(gòu),并且Nimbus就是一個(gè)Thrift服務(wù),你可以提交由任何語(yǔ)言創(chuàng)建的topologyo上面的方面是用JVM-based語(yǔ)言提交的最簡(jiǎn)單的方法。2.1.2 Streams消息流stream是storm里的關(guān)鍵抽象。一個(gè)消息流是一個(gè)沒有邊界的tuple序列,而這些tuple序列會(huì)以一種分布式的方式并行地創(chuàng)建和處理。通過對(duì)stream中tuple序列中每個(gè)字段命名來定義stream。在默認(rèn)的情況下,tuple的字段類型可以是:integer,long,short,byte,string,double,float,boolean和bytearray。

6、你也可以自定義類型(只要實(shí)現(xiàn)相應(yīng)的序列化器)。每個(gè)消息流在定義的時(shí)候會(huì)被分配給一個(gè)id,因?yàn)閱蜗蛳⒘魇褂玫南喈?dāng)普遍,OutputFieldsDeclarer定義了一些方法讓你可以定義一個(gè)stream而不用指定這個(gè)id。在這種情況下這個(gè)stream會(huì)分配個(gè)值為defau默認(rèn)的id。Storm提供的最基本的處理stream的原語(yǔ)是spout和bolt。你可以實(shí)現(xiàn)spout和bolt提供的接口來處理你的業(yè)務(wù)邏輯。2.1.3 Spouts消息源spout是Storm里面一個(gè)topology里面的消息生產(chǎn)者。一般來說消息源會(huì)從一個(gè)外部源讀取數(shù)據(jù)并且向topology里面發(fā)出消息:tuple。Spout

7、可以是可靠的也可以是不可靠的。如果這個(gè)tuple沒有被storm成功處理,可靠的消息源spouts可以重新發(fā)射一個(gè)tuple,但是不可靠的消息源spouts一旦發(fā)出一個(gè)tuple就不能重發(fā)了。消息源可以發(fā)射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個(gè)stream,然后使用SpoutOutputCollector來發(fā)射指定的stream。Spout類里面最重要的方法是nextTuple。要么發(fā)射一個(gè)新的tuple到topology里面或者簡(jiǎn)單的返回如果已經(jīng)沒有新的tuple。要注意的是nextTuple方法不能阻塞,因?yàn)閟torm在同

8、一個(gè)線程上面調(diào)用所有消息源spout的方法。另外兩個(gè)比較重要的spout方法是ack和fail。storm在檢測(cè)到一個(gè)tuple被整個(gè)topology成功處理的時(shí)候調(diào)用ack,否則調(diào)用fail。storm只對(duì)可靠的spout調(diào)用ack和fail。2.1.4 Bolts所有的消息處理邏輯被封裝在bolts里面。Bolts可以做很多事情:過濾,聚合,查詢數(shù)據(jù)庫(kù)等等。Bolts可以簡(jiǎn)單的做消息流的傳遞。復(fù)雜的消息流處理往往需要很多步驟,從而也就需要經(jīng)過很多bolts。比如算岀一堆圖片里面被轉(zhuǎn)發(fā)最多的圖片就至少需要兩步:第一步算岀每個(gè)圖片的轉(zhuǎn)發(fā)數(shù)量。第二步找岀轉(zhuǎn)發(fā)最多的前10個(gè)圖片。(如果要把這個(gè)過程

9、做得更具有擴(kuò)展性那么可能需要更多的步驟)。Bolts可以發(fā)射多條消息流,使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發(fā)射的stream。Bolts的主要方法是execute,它以一個(gè)tuple作為輸入,bolts使用Outputcollector來發(fā)射tuple,bolts必須要為它處理的每一個(gè)tuple調(diào)用OutputCollector的ack方法,以通知Storm這個(gè)tuple被處理完成了,從而通知這個(gè)tuple的發(fā)射者spouts。一般的流程是:bolts處理一個(gè)輸入tuple,發(fā)射0個(gè)或者

10、多個(gè)tuple,然后調(diào)用ack通知storm自己已經(jīng)處理過這個(gè)tuple了。storm提供了一個(gè)IBasicBolt會(huì)自動(dòng)調(diào)用ack。2.1.5 Streamgroupings定義一個(gè)topology的其中一步是定義每個(gè)bolt接收什么樣的流作為輸入。streamgrouping就是用來定義一個(gè)stream應(yīng)該如果分配數(shù)據(jù)給bolts上面的多個(gè)tasks。Storm里面有7種類型的streamgrouping1. ShuffleGrouping:隨機(jī)分組,隨機(jī)派發(fā)stream里面的tuple,保證每個(gè)bolt接收到的tuple數(shù)目大致相同。2. FieldsGrouping:按字段分組,比如按

11、userid來分組,具有同樣userid的tuple會(huì)被分到相同的Bolts里的一個(gè)task,而不同的userid則會(huì)被分配到不同的bolts里的task。3. AllGrouping:廣播發(fā)送,對(duì)于每一個(gè)tuple,所有的bolts都會(huì)收到。4. GlobalGrouping:全局分組,這個(gè)tuple被分配到storm中的一個(gè)bolt的其中一個(gè)task。再具體一點(diǎn)就是分配給id值最低的那個(gè)task。5. NonGrouping:不分組,這個(gè)分組的意思是說stream不關(guān)心到底誰會(huì)收到它的tuple。目前這種分組和Shufflegrouping是一樣的效果,有一點(diǎn)不同的是storm會(huì)把這個(gè)bo

12、lt放到這個(gè)bolt的訂閱者同一個(gè)線程里面去執(zhí)行。6. DirectGrouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè)task處理這個(gè)消息。只有被聲明為DirectStream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發(fā)射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id(OutputCollector.emit方法也會(huì)返回task的id)。7. Localorshufflegrouping:如果目標(biāo)bolt有一個(gè)或者多個(gè)task在同一個(gè)工作進(jìn)程中,tuple將

13、會(huì)被隨機(jī)發(fā)生給這些tasks。否則,和普通的ShuffleGrouping行為一致。2.1.6 ReliabilityStorm保證每個(gè)tuple會(huì)被topology完整的執(zhí)行。Storm會(huì)追蹤由每個(gè)spouttuple所產(chǎn)生的tuple樹(一個(gè)bolt處理一個(gè)tuple之后可能會(huì)發(fā)射別的tuple從而形成樹狀結(jié)構(gòu)),并且跟蹤這棵tuple樹什么時(shí)候成功處理完。每個(gè)topology都有一個(gè)消息超時(shí)的設(shè)置,如果storm在這個(gè)超時(shí)的時(shí)間內(nèi)檢測(cè)不到某個(gè)tuple樹到底有沒有執(zhí)行成功,那么topology會(huì)把這個(gè)tuple標(biāo)記為執(zhí)行失敗,并且過一會(huì)兒重新發(fā)射這個(gè)tuple。為了利用Storm的可靠性

14、特性,在你發(fā)出一個(gè)新的tuple以及你完成處理一個(gè)tuple的時(shí)候你必須要通知storm。這一切是由Outputcollector來完成的。通過emit方法來通知一個(gè)新的tuple產(chǎn)生了,通過ack方法通知一個(gè)tuple處理完成了。Storm的可靠性我們?cè)诘谒恼聲?huì)深入介紹。2.1.7 Tasks每一個(gè)spout和bolt會(huì)被當(dāng)作很多task在整個(gè)集群里執(zhí)行。每一個(gè)executor對(duì)應(yīng)到一個(gè)線程,在這個(gè)線程上運(yùn)行多個(gè)task,而streamgrouping則是定義怎么從一堆task發(fā)射tuple到另外一堆task。你可以調(diào)用TopologyBuilder類的setSpout和setBolt來設(shè)置

15、并行度(也就是有多少個(gè)task)。2.1.8 Workers一個(gè)topology可能會(huì)在一個(gè)或者多個(gè)worker(工作進(jìn)程)里面執(zhí)行,每個(gè)worker是一個(gè)物理JVM并且執(zhí)行整個(gè)topology的一部分。比如,對(duì)于并行度是300的topology來說,如果我們使用50個(gè)工作進(jìn)程來執(zhí)行,那么每個(gè)工作進(jìn)程會(huì)處理其中的6個(gè)tasks。Storm會(huì)盡量均勻的工作分配給所有的worker。2.1.9 ConfigurationStorm里面有一堆參數(shù)可以配置來調(diào)整Nimbus,Supervisor以及正在運(yùn)行的topology的行為,一些配置是系統(tǒng)級(jí)別的,一些配置是topology級(jí)別的。default

16、.yaml里面有所有的默認(rèn)配置。你可以通過定義個(gè)storm.yaml在你的classpath里來覆蓋這些默認(rèn)配置。并且你也可以在代碼里面設(shè)置一些topology相關(guān)的配置信息(使用StormSubmitter)。2.2構(gòu)建Topology實(shí)現(xiàn)的目標(biāo):我們將設(shè)計(jì)一個(gè)topology,來實(shí)現(xiàn)對(duì)一個(gè)句子里面的單詞出現(xiàn)的頻率進(jìn)行統(tǒng)計(jì)。這是一個(gè)簡(jiǎn)單的例子,目的是讓大家對(duì)于topology快速上手,有一個(gè)初步的理解。1. 設(shè)計(jì)Topology結(jié)構(gòu):在開始開發(fā)Storm項(xiàng)目的第一步,就是要設(shè)計(jì)topology。確定好你的數(shù)據(jù)處理邏輯,我們今天將的這個(gè)簡(jiǎn)單的例子,topology也非常簡(jiǎn)單。整個(gè)topolog

17、y如下:整個(gè)topology分為三個(gè)部分:KestrelSpout:數(shù)據(jù)源,負(fù)責(zé)發(fā)送sentenceSplitsentence:負(fù)責(zé)將sentence切分Wordcount:負(fù)責(zé)對(duì)單詞的頻率進(jìn)行累加設(shè)計(jì)數(shù)據(jù)流這個(gè)topology從kestrelqueue讀取句子,并把句子劃分成單詞,然后匯總每個(gè)單詞出現(xiàn)的次數(shù),一個(gè)tuple負(fù)責(zé)讀取句子,每一個(gè)tuple分別對(duì)應(yīng)計(jì)算每一個(gè)單詞出現(xiàn)的次數(shù),大概樣子如下所示:2. 代碼實(shí)現(xiàn):1) 構(gòu)建maven環(huán)境:為了開發(fā)stormtopology,你需要把storm相關(guān)的jar包添加到classpath里面去:要么手動(dòng)添加所有相關(guān)的jar包,要么使用maven

18、來管理所有的依賴。storm的jar包發(fā)布在Clojars(個(gè)maven庫(kù)),如果你使用maven的話,把下面的配置添加在你項(xiàng)目的pom.xml里面。<repository><id>v/id><url>v/url></repository><dependency>vgroupld>storm</groupld>vartifactld>storm</artifactld><version><scope>test</scope><

19、;/dependency>2) 定義topology:TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout(1,newKestrelSpout(22133,sentence_queue,newStringScheme();builder.setBolt(2,newSplitSentence(),10).shuffleGrouping(l);builder.setBolt(3,newWordCount(),20).fieldsGrouping(2,newFields(“word”這種topology的spout從句子隊(duì)列中

20、讀取句子,在Spout用setSpout方法插入一個(gè)獨(dú)特的id到topology。Topology中的每個(gè)節(jié)點(diǎn)必須給予一個(gè)id,id是由其他bolts用于訂閱該節(jié)點(diǎn)的輸出流。KestrelSpout在topology中id為1。setBolt是用于在Topology中插入bolts。在topology中定義的第一個(gè)bolts是切割句子的bolts。這個(gè)bolts將句子流轉(zhuǎn)成成單詞流。讓我們看看SplitSentenee實(shí)施:publicclassSplitSentenceimplementsIBasicBoltpublicvoidprepare(Mapconf,TopologyContextc

21、ontext)publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)Stringsentence=tuple.getString(O);for(Stringword:sentence.split(”)“collector.emit(newValues(word);publicvoidcleanup()publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)declarer.declare(newFields(“word”);關(guān)鍵的方法是execute方法。正如你可以看到

22、,它將句子拆分成單詞,并發(fā)出每個(gè)單詞作為一個(gè)新的元組。另一個(gè)重要的方法是declareOutputFields,其中宣布bolts輸出元組的架構(gòu)。在這里宣布,它發(fā)出一個(gè)域?yàn)閣ord的元組setBolt的最后一個(gè)參數(shù)是你想為bolts的并行量。SplitSentencebolts是10個(gè)并發(fā),這將導(dǎo)致在storm集群中有十個(gè)線程并行執(zhí)行。你所要做的的是增加bolts的并行量在遇到topology的瓶頸時(shí)。setBolt方法返回一個(gè)對(duì)象,用來定義bolts的輸入。例如,SplitSentence螺栓訂閱組件“使用隨機(jī)分組的輸出流?!?是指已經(jīng)定義KestrelSpout。我將解釋在某一時(shí)刻的隨機(jī)分

23、組的一部分。到目前為止,最要緊的是,SplitSentencebolts會(huì)消耗KestrelSpout發(fā)出的每一個(gè)元組。下面在讓我們看看wordcount的實(shí)現(xiàn):publicclassWordCountimplementsIBasicBoltprivateMapvString,Integer_counts=newHashMapvString,lnteger>();publicvoidprepare(Mapconf,TopologyContextcontext)publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)String

24、word=tuple.getString(O);intcount;if(_counts.containsKey(word)count=_counts.get(word);elsecount=0;count+;_counts.put(word,count);collector.emit(newValues(word,count);publicvoidcleanup()publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)declarer.declare(newFields(“word”,“count”);SplitSentence對(duì)

25、于句子里面的每個(gè)單詞發(fā)射一個(gè)新的tuple,WordCount在內(nèi)存里面維護(hù)一個(gè)單詞->次數(shù)的mapping,WordCount每收到一個(gè)單詞,它就更新內(nèi)存里面的統(tǒng)計(jì)狀態(tài)。3. 運(yùn)行Topologystorm的運(yùn)行有兩種模式:本地模式和分布式模式.1) 本地模式:storm用一個(gè)進(jìn)程里面的線程來模擬所有的spout和bolt.本地模式對(duì)開發(fā)和測(cè)試來說比較有用。你運(yùn)行storm-starter里面的topology的時(shí)候它們就是以本地模式運(yùn)行的,你可以看到topology里面的每一個(gè)組件在發(fā)射什么消息。2) 分布式模式:storm由一堆機(jī)器組成。當(dāng)你提交topology給master的時(shí)候,你同時(shí)也把topology的代碼提交了。master負(fù)責(zé)分發(fā)你的代碼并且負(fù)責(zé)給你的topolgoy分配工作進(jìn)程。如果一個(gè)工作進(jìn)程掛掉了,master節(jié)點(diǎn)會(huì)把認(rèn)為重新分配到其它節(jié)點(diǎn)。3) 下面是以本地模式運(yùn)行的代碼:Configconf=newConfig();conf.setDebug(true);conf.setNumWorkers(2);LocalClustercluster=newLocalCluster();cluster.submitTopology(“test”

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論