storm入門教程資料_第1頁(yè)
storm入門教程資料_第2頁(yè)
storm入門教程資料_第3頁(yè)
storm入門教程資料_第4頁(yè)
storm入門教程資料_第5頁(yè)
已閱讀5頁(yè),還剩41頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

storm入門教程第一章storm概述

1.1實(shí)時(shí)流計(jì)算互聯(lián)網(wǎng)從誕生的第一時(shí)間起,對(duì)世界的最大的改變就是讓信息能夠?qū)崟r(shí)交互,從而大大加速了各個(gè)環(huán)節(jié)的效率。正因?yàn)榇蠹覍?duì)信息實(shí)時(shí)響應(yīng)、實(shí)時(shí)交互的需求,軟件行業(yè)除了個(gè)人操作系統(tǒng)之外,數(shù)據(jù)庫(kù)(更精確的說(shuō)是關(guān)系型數(shù)據(jù)庫(kù))應(yīng)該是軟件行業(yè)發(fā)展最快、收益最為豐厚的產(chǎn)品了。記得十年前,很多銀行別說(shuō)實(shí)時(shí)轉(zhuǎn)賬,連實(shí)時(shí)查詢都做不到,但是數(shù)據(jù)庫(kù)和高速網(wǎng)絡(luò)改變了這個(gè)情況。

隨著互聯(lián)網(wǎng)的更進(jìn)一步發(fā)展,從Portal信息瀏覽型到Search信息搜索型到SNS關(guān)系交互傳遞型,以及電子商務(wù)、互聯(lián)網(wǎng)旅游生活產(chǎn)品等將生活中的流通環(huán)節(jié)在線化。對(duì)效率的要求讓大家對(duì)于實(shí)時(shí)性的要求進(jìn)一步提升,而信息的交互和溝通正在從點(diǎn)對(duì)點(diǎn)往信息鏈甚至信息網(wǎng)的方向發(fā)展,這樣必然帶來(lái)數(shù)據(jù)在各個(gè)維度的交叉關(guān)聯(lián),數(shù)據(jù)爆炸已不可避免。因此流式處理加NoSQL產(chǎn)品應(yīng)運(yùn)而生,分別解決實(shí)時(shí)框架和數(shù)據(jù)大規(guī)模存儲(chǔ)計(jì)算的問(wèn)題。

早在7、8年前諸如UC伯克利、斯坦福等大學(xué)就開始了對(duì)流式數(shù)據(jù)處理的研究,但是由于更多的關(guān)注于金融行業(yè)的業(yè)務(wù)場(chǎng)景或者互聯(lián)網(wǎng)流量監(jiān)控的業(yè)務(wù)場(chǎng)景,以及當(dāng)時(shí)互聯(lián)網(wǎng)數(shù)據(jù)場(chǎng)景的限制,造成了研究多是基于對(duì)傳統(tǒng)數(shù)據(jù)庫(kù)處理的流式化,對(duì)流式框架本身的研究偏少。目前這樣的研究逐漸沒(méi)有了聲音,工業(yè)界更多的精力轉(zhuǎn)向了實(shí)時(shí)數(shù)據(jù)庫(kù)。

2010年Yahoo!對(duì)S4的開源,2011年twitter對(duì)Storm的開源,改變了這個(gè)情況。以前互聯(lián)網(wǎng)的開發(fā)人員在做一個(gè)實(shí)時(shí)應(yīng)用的時(shí)候,除了要關(guān)注應(yīng)用邏輯計(jì)算處理本身,還要為了數(shù)據(jù)的實(shí)時(shí)流轉(zhuǎn)、交互、分布大傷腦筋。但是現(xiàn)在情況卻大為不同,以Storm為例,開發(fā)人員可以快速的搭建一套健壯、易用的實(shí)時(shí)流處理框架,配合SQL產(chǎn)品或者NoSQL產(chǎn)品或者M(jìn)apReduce計(jì)算平臺(tái),就可以低成本的做出很多以前很難想象的實(shí)時(shí)產(chǎn)品:比如一淘數(shù)據(jù)部的量子恒道品牌旗下的多個(gè)產(chǎn)品就是構(gòu)建在實(shí)時(shí)流處理平臺(tái)上的。

本教程是一本對(duì)storm的基礎(chǔ)介紹手冊(cè),但是我們也希望它不僅僅是一本storm的使用手冊(cè),我們會(huì)在其中加入更多我們?cè)趯?shí)際數(shù)據(jù)生產(chǎn)過(guò)程的經(jīng)驗(yàn)和應(yīng)用的架構(gòu),最后的目的是幫助所有愿意使用實(shí)時(shí)流處理框架的技術(shù)同仁,同時(shí)也默默的改變這個(gè)世界。1.2Storm特點(diǎn)Storm是一個(gè)開源的分布式實(shí)時(shí)計(jì)算系統(tǒng),可以簡(jiǎn)單、可靠的處理大量的數(shù)據(jù)流。Storm有很多使用場(chǎng)景:如實(shí)時(shí)分析,在線機(jī)器學(xué)習(xí),持續(xù)計(jì)算,分布式RPC,ETL等等。Storm支持水平擴(kuò)展,具有高容錯(cuò)性,保證每個(gè)消息都會(huì)得到處理,而且處理速度很快(在一個(gè)小集群中,每個(gè)結(jié)點(diǎn)每秒可以處理數(shù)以百萬(wàn)計(jì)的消息)。Storm的部署和運(yùn)維都很便捷,而且更為重要的是可以使用任意編程語(yǔ)言來(lái)開發(fā)應(yīng)用。

Storm有如下特點(diǎn):

編程模型簡(jiǎn)單

在大數(shù)據(jù)處理方面相信大家對(duì)hadoop已經(jīng)耳熟能詳,基于GoogleMap/Reduce來(lái)實(shí)現(xiàn)的Hadoop為開發(fā)者提供了map、reduce原語(yǔ),使并行批處理程序變得非常地簡(jiǎn)單和優(yōu)美。同樣,Storm也為大數(shù)據(jù)的實(shí)時(shí)計(jì)算提供了一些簡(jiǎn)單優(yōu)美的原語(yǔ),這大大降低了開發(fā)并行實(shí)時(shí)處理的任務(wù)的復(fù)雜性,幫助你快速、高效的開發(fā)應(yīng)用。

可擴(kuò)展

在Storm集群中真正運(yùn)行topology的主要有三個(gè)實(shí)體:工作進(jìn)程、線程和任務(wù)。Storm集群中的每臺(tái)機(jī)器上都可以運(yùn)行多個(gè)工作進(jìn)程,每個(gè)工作進(jìn)程又可創(chuàng)建多個(gè)線程,每個(gè)線程可以執(zhí)行多個(gè)任務(wù),任務(wù)是真正進(jìn)行數(shù)據(jù)處理的實(shí)體,我們開發(fā)的spout、bolt就是作為一個(gè)或者多個(gè)任務(wù)的方式執(zhí)行的。

因此,計(jì)算任務(wù)在多個(gè)線程、進(jìn)程和服務(wù)器之間并行進(jìn)行,支持靈活的水平擴(kuò)展。

高可靠性

Storm可以保證spout發(fā)出的每條消息都能被“完全處理”,這也是直接區(qū)別于其他實(shí)時(shí)系統(tǒng)的地方,如S4。

請(qǐng)注意,spout發(fā)出的消息后續(xù)可能會(huì)觸發(fā)產(chǎn)生成千上萬(wàn)條消息,可以形象的理解為一棵消息樹,其中spout發(fā)出的消息為樹根,Storm會(huì)跟蹤這棵消息樹的處理情況,只有當(dāng)這棵消息樹中的所有消息都被處理了,Storm才會(huì)認(rèn)為spout發(fā)出的這個(gè)消息已經(jīng)被“完全處理”。如果這棵消息樹中的任何一個(gè)消息處理失敗了,或者整棵消息樹在限定的時(shí)間內(nèi)沒(méi)有“完全處理”,那么spout發(fā)出的消息就會(huì)重發(fā)。

考慮到盡可能減少對(duì)內(nèi)存的消耗,Storm并不會(huì)跟蹤消息樹中的每個(gè)消息,而是采用了一些特殊的策略,它把消息樹當(dāng)作一個(gè)整體來(lái)跟蹤,對(duì)消息樹中所有消息的唯一id進(jìn)行異或計(jì)算,通過(guò)是否為零來(lái)判定spout發(fā)出的消息是否被“完全處理”,這極大的節(jié)約了內(nèi)存和簡(jiǎn)化了判定邏輯,后面會(huì)對(duì)這種機(jī)制進(jìn)行詳細(xì)介紹。

這種模式,每發(fā)送一個(gè)消息,都會(huì)同步發(fā)送一個(gè)ack/fail,對(duì)于網(wǎng)絡(luò)的帶寬會(huì)有一定的消耗,如果對(duì)于可靠性要求不高,可通過(guò)使用不同的emit接口關(guān)閉該模式。

上面所說(shuō)的,Storm保證了每個(gè)消息至少被處理一次,但是對(duì)于有些計(jì)算場(chǎng)合,會(huì)嚴(yán)格要求每個(gè)消息只被處理一次,幸而Storm的0.7.0引入了事務(wù)性拓?fù)?,解決了這個(gè)問(wèn)題,后面會(huì)有詳述。

高容錯(cuò)性

如果在消息處理過(guò)程中出了一些異常,Storm會(huì)重新安排這個(gè)出問(wèn)題的處理單元。Storm保證一個(gè)處理單元永遠(yuǎn)運(yùn)行(除非你顯式殺掉這個(gè)處理單元)。

當(dāng)然,如果處理單元中存儲(chǔ)了中間狀態(tài),那么當(dāng)處理單元重新被Storm啟動(dòng)的時(shí)候,需要應(yīng)用自己處理中間狀態(tài)的恢復(fù)。

支持多種編程語(yǔ)言

除了用java實(shí)現(xiàn)spout和bolt,你還可以使用任何你熟悉的編程語(yǔ)言來(lái)完成這項(xiàng)工作,這一切得益于Storm所謂的多語(yǔ)言協(xié)議。多語(yǔ)言協(xié)議是Storm內(nèi)部的一種特殊協(xié)議,允許spout或者bolt使用標(biāo)準(zhǔn)輸入和標(biāo)準(zhǔn)輸出來(lái)進(jìn)行消息傳遞,傳遞的消息為單行文本或者是json編碼的多行。

Storm支持多語(yǔ)言編程主要是通過(guò)ShellBolt,ShellSpout和ShellProcess這些類來(lái)實(shí)現(xiàn)的,這些類都實(shí)現(xiàn)了IBolt和ISpout接口,以及讓shell通過(guò)java的ProcessBuilder類來(lái)執(zhí)行腳本或者程序的協(xié)議。

可以看到,采用這種方式,每個(gè)tuple在處理的時(shí)候都需要進(jìn)行json的編解碼,因此在吞吐量上會(huì)有較大影響。

支持本地模式

Storm有一種“本地模式”,也就是在進(jìn)程中模擬一個(gè)Storm集群的所有功能,以本地模式運(yùn)行topology跟在集群上運(yùn)行topology類似,這對(duì)于我們開發(fā)和測(cè)試來(lái)說(shuō)非常有用。

高效

用ZeroMQ作為底層消息隊(duì)列,保證消息能快速被處理第二章Storm術(shù)語(yǔ)介紹及構(gòu)建Topology2.1Storm基本概念在運(yùn)行一個(gè)Storm任務(wù)之前,需要了解一些概念:TopologiesStreamsSpoutsBoltsStreamgroupingsReliabilityTasksWorkersConfigurationStorm集群和Hadoop集群表面上看很類似。但是Hadoop上運(yùn)行的是MapReducejobs,而在Storm上運(yùn)行的是拓?fù)?topology),這兩者之間是非常不一樣的。一個(gè)關(guān)鍵的區(qū)別是:一個(gè)MapReducejob最終會(huì)結(jié)束,而一個(gè)topology永遠(yuǎn)會(huì)運(yùn)行(除非你手動(dòng)kill掉)。在Storm的集群里面有兩種節(jié)點(diǎn):控制節(jié)點(diǎn)(masternode)和工作節(jié)點(diǎn)(workernode)??刂乒?jié)點(diǎn)上面運(yùn)行一個(gè)叫Nimbus后臺(tái)程序,它的作用類似Hadoop里面的JobTracker。Nimbus負(fù)責(zé)在集群里面分發(fā)代碼,分配計(jì)算任務(wù)給機(jī)器,并且監(jiān)控狀態(tài)。每一個(gè)工作節(jié)點(diǎn)上面運(yùn)行一個(gè)叫做Supervisor的節(jié)點(diǎn)。Supervisor會(huì)監(jiān)聽分配給它那臺(tái)機(jī)器的工作,根據(jù)需要啟動(dòng)/關(guān)閉工作進(jìn)程。每一個(gè)工作進(jìn)程執(zhí)行一個(gè)topology的一個(gè)子集;一個(gè)運(yùn)行的topology由運(yùn)行在很多機(jī)器上的很多工作進(jìn)程組成。Nimbus和Supervisor之間的所有協(xié)調(diào)工作都是通過(guò)Zookeeper集群完成。另外,Nimbus進(jìn)程和Supervisor進(jìn)程都是快速失敗(fail-fast)和無(wú)狀態(tài)的。所有的狀態(tài)要么在zookeeper里面,要么在本地磁盤上。這也就意味著你可以用kill-9來(lái)殺死Nimbus和Supervisor進(jìn)程,然后再重啟它們,就好像什么都沒(méi)有發(fā)生過(guò)。這個(gè)設(shè)計(jì)使得Storm異常的穩(wěn)定。一個(gè)topology是spouts和bolts組成的圖,通過(guò)streamgroupings將圖中的spouts和bolts連接起來(lái),如下圖:一個(gè)topology會(huì)一直運(yùn)行直到你手動(dòng)kill掉,Storm自動(dòng)重新分配執(zhí)行失敗的任務(wù),并且Storm可以保證你不會(huì)有數(shù)據(jù)丟失(如果開啟了高可靠性的話)。如果一些機(jī)器意外停機(jī)它上面的所有任務(wù)會(huì)被轉(zhuǎn)移到其他機(jī)器上。運(yùn)行一個(gè)topology很簡(jiǎn)單。首先,把你所有的代碼以及所依賴的jar打進(jìn)一個(gè)jar包。然后運(yùn)行類似下面的這個(gè)命令:stormjarall-my-code.jarbacktype.storm.MyTopologyarg1arg2復(fù)制代碼這個(gè)命令會(huì)運(yùn)行主類:backtype.strom.MyTopology,參數(shù)是arg1,arg2。這個(gè)類的main函數(shù)定義這個(gè)topology并且把它提交給Nimbus。stormjar負(fù)責(zé)連接到Nimbus并且上傳jar包。Topology的定義是一個(gè)Thrift結(jié)構(gòu),并且Nimbus就是一個(gè)Thrift服務(wù),你可以提交由任何語(yǔ)言創(chuàng)建的topology。上面的方面是用JVM-based語(yǔ)言提交的最簡(jiǎn)單的方法。消息流stream是storm里的關(guān)鍵抽象。一個(gè)消息流是一個(gè)沒(méi)有邊界的tuple序列,而這些tuple序列會(huì)以一種分布式的方式并行地創(chuàng)建和處理。通過(guò)對(duì)stream中tuple序列中每個(gè)字段命名來(lái)定義stream。在默認(rèn)的情況下,tuple的字段類型可以是:integer,long,short,byte,string,double,float,boolean和bytearray。你也可以自定義類型(只要實(shí)現(xiàn)相應(yīng)的序列化器)。每個(gè)消息流在定義的時(shí)候會(huì)被分配給一個(gè)id,因?yàn)閱蜗蛳⒘魇褂玫南喈?dāng)普遍,OutputFieldsDeclarer定義了一些方法讓你可以定義一個(gè)stream而不用指定這個(gè)id。在這種情況下這個(gè)stream會(huì)分配個(gè)值為‘default’默認(rèn)的id。Storm提供的最基本的處理stream的原語(yǔ)是spout和bolt。你可以實(shí)現(xiàn)spout和bolt提供的接口來(lái)處理你的業(yè)務(wù)邏輯。消息源spout是Storm里面一個(gè)topology里面的消息生產(chǎn)者。一般來(lái)說(shuō)消息源會(huì)從一個(gè)外部源讀取數(shù)據(jù)并且向topology里面發(fā)出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果這個(gè)tuple沒(méi)有被storm成功處理,可靠的消息源spouts可以重新發(fā)射一個(gè)tuple,但是不可靠的消息源spouts一旦發(fā)出一個(gè)tuple就不能重發(fā)了。消息源可以發(fā)射多條消息流stream。使用OutputFieldsDeclarer.declareStream來(lái)定義多個(gè)stream,然后使用SpoutOutputCollector來(lái)發(fā)射指定的stream。Spout類里面最重要的方法是nextTuple。要么發(fā)射一個(gè)新的tuple到topology里面或者簡(jiǎn)單的返回如果已經(jīng)沒(méi)有新的tuple。要注意的是nextTuple方法不能阻塞,因?yàn)閟torm在同一個(gè)線程上面調(diào)用所有消息源spout的方法。另外兩個(gè)比較重要的spout方法是ack和fail。storm在檢測(cè)到一個(gè)tuple被整個(gè)topology成功處理的時(shí)候調(diào)用ack,否則調(diào)用fail。storm只對(duì)可靠的spout調(diào)用ack和fail。所有的消息處理邏輯被封裝在bolts里面。Bolts可以做很多事情:過(guò)濾,聚合,查詢數(shù)據(jù)庫(kù)等等。Bolts可以簡(jiǎn)單的做消息流的傳遞。復(fù)雜的消息流處理往往需要很多步驟,從而也就需要經(jīng)過(guò)很多bolts。比如算出一堆圖片里面被轉(zhuǎn)發(fā)最多的圖片就至少需要兩步:第一步算出每個(gè)圖片的轉(zhuǎn)發(fā)數(shù)量。第二步找出轉(zhuǎn)發(fā)最多的前10個(gè)圖片。(如果要把這個(gè)過(guò)程做得更具有擴(kuò)展性那么可能需要更多的步驟)。Bolts可以發(fā)射多條消息流,使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來(lái)選擇要發(fā)射的stream。Bolts的主要方法是execute,它以一個(gè)tuple作為輸入,bolts使用OutputCollector來(lái)發(fā)射tuple,bolts必須要為它處理的每一個(gè)tuple調(diào)用OutputCollector的ack方法,以通知Storm這個(gè)tuple被處理完成了,從而通知這個(gè)tuple的發(fā)射者spouts。一般的流程是:bolts處理一個(gè)輸入tuple,發(fā)射0個(gè)或者多個(gè)tuple,然后調(diào)用ack通知storm自己已經(jīng)處理過(guò)這個(gè)tuple了。storm提供了一個(gè)IBasicBolt會(huì)自動(dòng)調(diào)用ack。定義一個(gè)topology的其中一步是定義每個(gè)bolt接收什么樣的流作為輸入。streamgrouping就是用來(lái)定義一個(gè)stream應(yīng)該如果分配數(shù)據(jù)給bolts上面的多個(gè)tasks。Storm里面有7種類型的streamgroupingShuffleGrouping:隨機(jī)分組,隨機(jī)派發(fā)stream里面的tuple,保證每個(gè)bolt接收到的tuple數(shù)目大致相同。FieldsGrouping:按字段分組,比如按userid來(lái)分組,具有同樣userid的tuple會(huì)被分到相同的Bolts里的一個(gè)task,而不同的userid則會(huì)被分配到不同的bolts里的task。AllGrouping:廣播發(fā)送,對(duì)于每一個(gè)tuple,所有的bolts都會(huì)收到。GlobalGrouping:全局分組,這個(gè)tuple被分配到storm中的一個(gè)bolt的其中一個(gè)task。再具體一點(diǎn)就是分配給id值最低的那個(gè)task。NonGrouping:不分組,這個(gè)分組的意思是說(shuō)stream不關(guān)心到底誰(shuí)會(huì)收到它的tuple。目前這種分組和Shufflegrouping是一樣的效果,有一點(diǎn)不同的是storm會(huì)把這個(gè)bolt放到這個(gè)bolt的訂閱者同一個(gè)線程里面去執(zhí)行。DirectGrouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè)task處理這個(gè)消息。只有被聲明為DirectStream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來(lái)發(fā)射。消息處理者可以通過(guò)TopologyContext來(lái)獲取處理它的消息的task的id(OutputCollector.emit方法也會(huì)返回task的id)。Localorshufflegrouping:如果目標(biāo)bolt有一個(gè)或者多個(gè)task在同一個(gè)工作進(jìn)程中,tuple將會(huì)被隨機(jī)發(fā)生給這些tasks。否則,和普通的ShuffleGrouping行為一致。Storm保證每個(gè)tuple會(huì)被topology完整的執(zhí)行。Storm會(huì)追蹤由每個(gè)spouttuple所產(chǎn)生的tuple樹(一個(gè)bolt處理一個(gè)tuple之后可能會(huì)發(fā)射別的tuple從而形成樹狀結(jié)構(gòu)),并且跟蹤這棵tuple樹什么時(shí)候成功處理完。每個(gè)topology都有一個(gè)消息超時(shí)的設(shè)置,如果storm在這個(gè)超時(shí)的時(shí)間內(nèi)檢測(cè)不到某個(gè)tuple樹到底有沒(méi)有執(zhí)行成功,那么topology會(huì)把這個(gè)tuple標(biāo)記為執(zhí)行失敗,并且過(guò)一會(huì)兒重新發(fā)射這個(gè)tuple。為了利用Storm的可靠性特性,在你發(fā)出一個(gè)新的tuple以及你完成處理一個(gè)tuple的時(shí)候你必須要通知storm。這一切是由OutputCollector來(lái)完成的。通過(guò)emit方法來(lái)通知一個(gè)新的tuple產(chǎn)生了,通過(guò)ack方法通知一個(gè)tuple處理完成了。Storm的可靠性我們?cè)赟torm入門教程4會(huì)深入介紹。每一個(gè)spout和bolt會(huì)被當(dāng)作很多task在整個(gè)集群里執(zhí)行。每一個(gè)executor對(duì)應(yīng)到一個(gè)線程,在這個(gè)線程上運(yùn)行多個(gè)task,而streamgrouping則是定義怎么從一堆task發(fā)射tuple到另外一堆task。你可以調(diào)用TopologyBuilder類的setSpout和setBolt來(lái)設(shè)置并行度(也就是有多少個(gè)task)。一個(gè)topology可能會(huì)在一個(gè)或者多個(gè)worker(工作進(jìn)程)里面執(zhí)行,每個(gè)worker是一個(gè)物理JVM并且執(zhí)行整個(gè)topology的一部分。比如,對(duì)于并行度是300的topology來(lái)說(shuō),如果我們使用50個(gè)工作進(jìn)程來(lái)執(zhí)行,那么每個(gè)工作進(jìn)程會(huì)處理其中的6個(gè)tasks。Storm會(huì)盡量均勻的工作分配給所有的worker。Storm里面有一堆參數(shù)可以配置來(lái)調(diào)整Nimbus,Supervisor以及正在運(yùn)行的topology的行為,一些配置是系統(tǒng)級(jí)別的,一些配置是topology級(jí)別的。default.yaml里面有所有的默認(rèn)配置。你可以通過(guò)定義個(gè)storm.yaml在你的classpath里來(lái)覆蓋這些默認(rèn)配置。并且你也可以在代碼里面設(shè)置一些topology相關(guān)的配置信息(使用StormSubmitter)。2.2構(gòu)建Topology我們將設(shè)計(jì)一個(gè)topology,來(lái)實(shí)現(xiàn)對(duì)一個(gè)句子里面的單詞出現(xiàn)的頻率進(jìn)行統(tǒng)計(jì)。這是一個(gè)簡(jiǎn)單的例子,目的是讓大家對(duì)于topology快速上手,有一個(gè)初步的理解。在開始開發(fā)Storm項(xiàng)目的第一步,就是要設(shè)計(jì)topology。確定好你的數(shù)據(jù)處理邏輯,我們今天將的這個(gè)簡(jiǎn)單的例子,topology也非常簡(jiǎn)單。整個(gè)topology如下:

整個(gè)topology分為三個(gè)部分:KestrelSpout:數(shù)據(jù)源,負(fù)責(zé)發(fā)送sentenceSplitsentence:負(fù)責(zé)將sentence切分Wordcount:負(fù)責(zé)對(duì)單詞的頻率進(jìn)行累加這個(gè)topology從kestrelqueue讀取句子,并把句子劃分成單詞,然后匯總每個(gè)單詞出現(xiàn)的次數(shù),一個(gè)tuple負(fù)責(zé)讀取句子,每一個(gè)tuple分別對(duì)應(yīng)計(jì)算每一個(gè)單詞出現(xiàn)的次數(shù),大概樣子如下所示:

1)構(gòu)建maven環(huán)境:為了開發(fā)stormtopology,你需要把storm相關(guān)的jar包添加到classpath里面去:要么手動(dòng)添加所有相關(guān)的jar包,要么使用maven來(lái)管理所有的依賴。storm的jar包發(fā)布在Clojars(一個(gè)maven庫(kù)),如果你使用maven的話,把下面的配置添加在你項(xiàng)目的pom.xml里面。<repository><id></id><url>/repo</url></repository><dependency><groupId>storm</groupId><artifactId>storm</artifactId><version>0.5.3</version><scope>test</scope></dependency>2)定義topology:TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout(1,newKestrelSpout(“”,22133,”sentence_queue”,newStringScheme()));builder.setBolt(2,newSplitSentence(),10).shuffleGrouping(1);builder.setBolt(3,newWordCount(),20).fieldsGrouping(2,newFields(“word”));這種topology的spout從句子隊(duì)列中讀取句子,在位于一個(gè)Kestrel的服務(wù)器端口22133。Spout用setSpout方法插入一個(gè)獨(dú)特的id到topology。Topology中的每個(gè)節(jié)點(diǎn)必須給予一個(gè)id,id是由其他bolts用于訂閱該節(jié)點(diǎn)的輸出流。KestrelSpout在topology中id為1。setBolt是用于在Topology中插入bolts。在topology中定義的第一個(gè)bolts是切割句子的bolts。這個(gè)bolts將句子流轉(zhuǎn)成成單詞流。讓我們看看SplitSentence實(shí)施:publicclassSplitSentenceimplementsIBasicBolt{publicvoidprepare(Mapconf,TopologyContextcontext){}publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){Stringsentence=tuple.getString(0);for(Stringword:sentence.split(“”)){collector.emit(newValues(word));}}publicvoidcleanup(){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields(“word”));}關(guān)鍵的方法是execute方法。正如你可以看到,它將句子拆分成單詞,并發(fā)出每個(gè)單詞作為一個(gè)新的元組。另一個(gè)重要的方法是declareOutputFields,其中宣布bolts輸出元組的架構(gòu)。在這里宣布,它發(fā)出一個(gè)域?yàn)閣ord的元組setBolt的最后一個(gè)參數(shù)是你想為bolts的并行量。SplitSentencebolts是10個(gè)并發(fā),這將導(dǎo)致在storm集群中有十個(gè)線程并行執(zhí)行。你所要做的的是增加bolts的并行量在遇到topology的瓶頸時(shí)。setBolt方法返回一個(gè)對(duì)象,用來(lái)定義bolts的輸入。例如,SplitSentence螺栓訂閱組件“1”使用隨機(jī)分組的輸出流?!?”是指已經(jīng)定義KestrelSpout。我將解釋在某一時(shí)刻的隨機(jī)分組的一部分。到目前為止,最要緊的是,SplitSentencebolts會(huì)消耗KestrelSpout發(fā)出的每一個(gè)元組。下面在讓我們看看wordcount的實(shí)現(xiàn):publicclassWordCountimplementsIBasicBolt{privateMap<String,Integer>_counts=newHashMap<String,Integer>();publicvoidprepare(Mapconf,TopologyContextcontext){}publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){Stringword=tuple.getString(0);intcount;if(_counts.containsKey(word)){count=_counts.get(word);}else{count=0;}count++;_counts.put(word,count);collector.emit(newValues(word,count));}publicvoidcleanup(){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields(“word”,“count”));}}SplitSentence對(duì)于句子里面的每個(gè)單詞發(fā)射一個(gè)新的tuple,WordCount在內(nèi)存里面維護(hù)一個(gè)單詞->次數(shù)的mapping,WordCount每收到一個(gè)單詞,它就更新內(nèi)存里面的統(tǒng)計(jì)狀態(tài)。storm的運(yùn)行有兩種模式:本地模式和分布式模式.1)本地模式:storm用一個(gè)進(jìn)程里面的線程來(lái)模擬所有的spout和bolt.本地模式對(duì)開發(fā)和測(cè)試來(lái)說(shuō)比較有用。你運(yùn)行storm-starter里面的topology的時(shí)候它們就是以本地模式運(yùn)行的,你可以看到topology里面的每一個(gè)組件在發(fā)射什么消息。2)分布式模式:storm由一堆機(jī)器組成。當(dāng)你提交topology給master的時(shí)候,你同時(shí)也把topology的代碼提交了。master負(fù)責(zé)分發(fā)你的代碼并且負(fù)責(zé)給你的topolgoy分配工作進(jìn)程。如果一個(gè)工作進(jìn)程掛掉了,master節(jié)點(diǎn)會(huì)把認(rèn)為重新分配到其它節(jié)點(diǎn)。3)下面是以本地模式運(yùn)行的代碼:Configconf=newConfig();conf.setDebug(true);conf.setNumWorkers(2);LocalClustercluster=newLocalCluster();cluster.submitTopology(“test”,conf,builder.createTopology());Utils.sleep(10000);cluster.killTopology(“test”);cluster.shutdown();首先,這個(gè)代碼定義通過(guò)定義一個(gè)LocalCluster對(duì)象來(lái)定義一個(gè)進(jìn)程內(nèi)的集群。提交topology給這個(gè)虛擬的集群和提交topology給分布式集群是一樣的。通過(guò)調(diào)用submitTopology方法來(lái)提交topology,它接受三個(gè)參數(shù):要運(yùn)行的topology的名字,一個(gè)配置對(duì)象以及要運(yùn)行的topology本身。topology的名字是用來(lái)唯一區(qū)別一個(gè)topology的,這樣你然后可以用這個(gè)名字來(lái)殺死這個(gè)topology的。前面已經(jīng)說(shuō)過(guò)了,你必須顯式的殺掉一個(gè)topology,否則它會(huì)一直運(yùn)行。Conf對(duì)象可以配置很多東西,下面兩個(gè)是最常見(jiàn)的:TOPOLOGY_WORKERS(setNumWorkers)定義你希望集群分配多少個(gè)工作進(jìn)程給你來(lái)執(zhí)行這個(gè)topology.topology里面的每個(gè)組件會(huì)被需要線程來(lái)執(zhí)行。每個(gè)組件到底用多少個(gè)線程是通過(guò)setBolt和setSpout來(lái)指定的。這些線程都運(yùn)行在工作進(jìn)程里面.每一個(gè)工作進(jìn)程包含一些節(jié)點(diǎn)的一些工作線程。比如,如果你指定300個(gè)線程,60個(gè)進(jìn)程,那么每個(gè)工作進(jìn)程里面要執(zhí)行6個(gè)線程,而這6個(gè)線程可能屬于不同的組件(Spout,Bolt)。你可以通過(guò)調(diào)整每個(gè)組件的并行度以及這些線程所在的進(jìn)程數(shù)量來(lái)調(diào)整topology的性能。TOPOLOGY_DEBUG(setDebug),當(dāng)它被設(shè)置成true的話,storm會(huì)記錄下每個(gè)組件所發(fā)射的每條消息。這在本地環(huán)境調(diào)試topology很有用,但是在線上這么做的話會(huì)影響性能的。結(jié)論:本章從storm的基本對(duì)象的定義,到廣泛的介紹了storm的開發(fā)環(huán)境,從一個(gè)簡(jiǎn)單的例子講解了topology的構(gòu)建和定義。希望大家可以從本章的內(nèi)容對(duì)storm有一個(gè)基本的理解和概念,并且已經(jīng)可以構(gòu)建一個(gè)簡(jiǎn)單的topology?。〉谌耂torm安裝部署步驟本文以TwitterStorm官方Wiki為基礎(chǔ),詳細(xì)描述如何快速搭建一個(gè)Storm集群,其中,項(xiàng)目實(shí)踐中遇到的問(wèn)題及經(jīng)驗(yàn)總結(jié),在相應(yīng)章節(jié)以“注意事項(xiàng)”的形式給出。3.1Storm集群組件Storm集群中包含兩類節(jié)點(diǎn):主控節(jié)點(diǎn)(MasterNode)和工作節(jié)點(diǎn)(WorkNode)。其分別對(duì)應(yīng)的角色如下:1.主控節(jié)點(diǎn)(MasterNode)上運(yùn)行一個(gè)被稱為Nimbus的后臺(tái)程序,它負(fù)責(zé)在Storm集群內(nèi)分發(fā)代碼,分配任務(wù)給工作機(jī)器,并且負(fù)責(zé)監(jiān)控集群運(yùn)行狀態(tài)。Nimbus的作用類似于Hadoop中JobTracker的角色。2.每個(gè)工作節(jié)點(diǎn)(WorkNode)上運(yùn)行一個(gè)被稱為Supervisor的后臺(tái)程序。Supervisor負(fù)責(zé)監(jiān)聽從Nimbus分配給它執(zhí)行的任務(wù),據(jù)此啟動(dòng)或停止執(zhí)行任務(wù)的工作進(jìn)程。每一個(gè)工作進(jìn)程執(zhí)行一個(gè)Topology的子集;一個(gè)運(yùn)行中的Topology由分布在不同工作節(jié)點(diǎn)上的多個(gè)工作進(jìn)程組成。

Storm集群組件Nimbus和Supervisor節(jié)點(diǎn)之間所有的協(xié)調(diào)工作是通過(guò)Zookeeper集群來(lái)實(shí)現(xiàn)的。此外,Nimbus和Supervisor進(jìn)程都是快速失敗(fail-fast)和無(wú)狀態(tài)(stateless)的;Storm集群所有的狀態(tài)要么在Zookeeper集群中,要么存儲(chǔ)在本地磁盤上。這意味著你可以用kill-9來(lái)殺死Nimbus和Supervisor進(jìn)程,它們?cè)谥貑⒑罂梢岳^續(xù)工作。這個(gè)設(shè)計(jì)使得Storm集群擁有不可思議的穩(wěn)定性。3.2安裝Storm集群這一章節(jié)將詳細(xì)描述如何搭建一個(gè)Storm集群。下面是接下來(lái)需要依次完成的安裝步驟:1.搭建Zookeeper集群;2.安裝Storm依賴庫(kù);3.下載并解壓Storm發(fā)布版本;4.修改storm.yaml配置文件;5.啟動(dòng)Storm各個(gè)后臺(tái)進(jìn)程。3.2.1搭建Zookeeper集群Storm使用Zookeeper協(xié)調(diào)集群,由于Zookeeper并不用于消息傳遞,所以Storm給Zookeeper帶來(lái)的壓力相當(dāng)?shù)?。大多?shù)情況下,單個(gè)節(jié)點(diǎn)的Zookeeper集群足夠勝任,不過(guò)為了確保故障恢復(fù)或者部署大規(guī)模Storm集群,可能需要更大規(guī)模節(jié)點(diǎn)的Zookeeper集群(對(duì)于Zookeeper集群的話,官方推薦的最小節(jié)點(diǎn)數(shù)為3個(gè))。在Zookeeper集群的每臺(tái)機(jī)器上完成以下安裝部署步驟:1.下載安裝JavaJDK,官方下載鏈接為/javase/downloads/index.jsp,JDK版本為JDK6或以上。2.根據(jù)Zookeeper集群的負(fù)載情況,合理設(shè)置Java堆大小,盡可能避免發(fā)生swap,導(dǎo)致Zookeeper性能下降。保守起見(jiàn),4GB內(nèi)存的機(jī)器可以為Zookeeper分配3GB最大堆空間。3.下載后解壓安裝Zookeeper包,官方下載鏈接為/zookeeper/releases.html。4.根據(jù)Zookeeper集群節(jié)點(diǎn)情況,在conf目錄下創(chuàng)建Zookeeper配置文件zoo.cfg:tickTime=2000

dataDir=/var/zookeeper/

clientPort=2181

initLimit=5

syncLimit=2

server.1=zoo1:2888:3888

server.2=zoo2:2888:3888

server.3=zoo3:2888:3888復(fù)制代碼其中,dataDir指定Zookeeper的數(shù)據(jù)文件目錄;其中server.id=host:port:port,id是為每個(gè)Zookeeper節(jié)點(diǎn)的編號(hào),保存在dataDir目錄下的myid文件中,zoo1~zoo3表示各個(gè)Zookeeper節(jié)點(diǎn)的hostname,第一個(gè)port是用于連接leader的端口,第二個(gè)port是用于leader選舉的端口。5.在dataDir目錄下創(chuàng)建myid文件,文件中只包含一行,且內(nèi)容為該節(jié)點(diǎn)對(duì)應(yīng)的server.id中的id編號(hào)。6.啟動(dòng)Zookeeper服務(wù):java-cpzookeeper.jar:lib/log4j-1.2.15.jar:conf\org.apache.zookeeper.server.quorum.QuorumPeerMainzoo.cfg復(fù)制代碼或者bin/zkServer.shstart復(fù)制代碼7.通過(guò)Zookeeper客戶端測(cè)試服務(wù)是否可用:java-cpzookeeper.jar:src/java/lib/log4j-1.2.15.jar:conf:src/java/lib/jline-0.9.94.jar\org.apache.zookeeper.ZooKeeperMain-server:2181復(fù)制代碼或者bin/zkCli.sh-server:2181復(fù)制代碼

注意事項(xiàng):由于Zookeeper是快速失敗(fail-fast)的,且遇到任何錯(cuò)誤情況,進(jìn)程均會(huì)退出,因此,最好能通過(guò)監(jiān)控程序?qū)ookeeper管理起來(lái),保證Zookeeper退出后能被自動(dòng)重啟。詳情參考這里。Zookeeper運(yùn)行過(guò)程中會(huì)在dataDir目錄下生成很多日志和快照文件,而Zookeeper運(yùn)行進(jìn)程并不負(fù)責(zé)定期清理合并這些文件,導(dǎo)致占用大量磁盤空間,因此,需要通過(guò)cron等方式定期清除沒(méi)用的日志和快照文件。詳情參考這里。具體命令格式如下:java-cpzookeeper.jar:log4j.jar:conforg.apache.zookeeper.server.PurgeTxnLog<dataDir><snapDir>-n<count>3.2.2安裝Storm依賴庫(kù)接下來(lái),需要在Nimbus和Supervisor機(jī)器上安裝Storm的依賴庫(kù),具體如下:1.ZeroMQ2.1.7

–請(qǐng)勿使用2.1.10版本,因?yàn)樵摪姹镜囊恍﹪?yán)重bug會(huì)導(dǎo)致Storm集群運(yùn)行時(shí)出現(xiàn)奇怪的問(wèn)題。少數(shù)用戶在2.1.7版本會(huì)遇到”IllegalArgumentException”的異常,此時(shí)降為2.1.4版本可修復(fù)這一問(wèn)題。2.

JZMQ3.Java64.Python2.6.65.unzip以上依賴庫(kù)的版本是經(jīng)過(guò)Storm測(cè)試的,Storm并不能保證在其他版本的Java或Python庫(kù)下可運(yùn)行。安裝ZMQ2.1.7下載后編譯安裝ZMQ:wget/zeromq-2.1.7.tar.gztar-xzfzeromq-2.1.7.tar.gzcdzeromq-2.1.7./configuremakesudomakeinstall復(fù)制代碼

注意事項(xiàng):如果安裝過(guò)程報(bào)錯(cuò)uuid找不到,則通過(guò)如下的包安裝uuid庫(kù):如果安裝過(guò)程報(bào)錯(cuò)uuid找不到,則通過(guò)如下的包安裝uuid庫(kù):sudoyuminstalle2fsprogslsudoyuminstalle2fsprogs-devel復(fù)制代碼安裝JZMQ下載后編譯安裝JZMQ:gitclone/nathanmarz/jzmq.gitcdjzmq./autogen.sh./configuremakesudomakeinstall復(fù)制代碼

為了保證JZMQ正常工作,可能需要完成以下配置:正確設(shè)置JAVA_HOME環(huán)境變量安裝Java開發(fā)包升級(jí)autoconf如果你是MacOSX,參考這里注意事項(xiàng):如果運(yùn)行./configure命令出現(xiàn)問(wèn)題,參考這里。安裝Java61.下載并安裝JDK6,參考這里;2.配置JAVA_HOME環(huán)境變量;3.運(yùn)行java、javac命令,測(cè)試java正常安裝。安裝Python2.6.61.下載Python2.6.6:wget[url]/ftp/python/2.6.6/Python-2.6.6.tar.bz2[/url]復(fù)制代碼2.編譯安裝Python2.6.6:tar–jxvfPython-2.6.6.tar.bz2cdPython-2.6.6./configuremakemakeinstall復(fù)制代碼

3.測(cè)試Python2.6.6:python-VPython2.6.6復(fù)制代碼

安裝unzip

1.如果使用RedHat系列Linux系統(tǒng),執(zhí)行以下命令安裝unzip:yuminstallunzip復(fù)制代碼

2.如果使用Debian系列Linux系統(tǒng),執(zhí)行以下命令安裝unzip:apt-getinstallunzip復(fù)制代碼3.2.3下載并解壓Storm發(fā)布版本下一步,需要在Nimbus和Supervisor機(jī)器上安裝Storm發(fā)行版本。1.下載Storm發(fā)行版本,推薦使用Storm0.8.1:wget/downloads/nathanmarz/storm/storm-0.8.1.zip復(fù)制代碼2.解壓到安裝目錄下:unzipstorm-0.8.1.zip復(fù)制代碼3.2.4修改storm.yaml配置文件

Storm發(fā)行版本解壓目錄下有一個(gè)conf/storm.yaml文件,用于配置Storm。默認(rèn)配置在這里可以查看。conf/storm.yaml中的配置選項(xiàng)將覆蓋defaults.yaml中的默認(rèn)配置。以下配置選項(xiàng)是必須在conf/storm.yaml中進(jìn)行配置的:1)storm.zookeeper.servers:

Storm集群使用的Zookeeper集群地址,其格式如下:storm.zookeeper.servers:-“111.222.333.444″-“555.666.777.888″復(fù)制代碼如果Zookeeper集群使用的不是默認(rèn)端口,那么還需要storm.zookeeper.port選項(xiàng)。2)

storm.local.dir:Nimbus和Supervisor進(jìn)程用于存儲(chǔ)少量狀態(tài),如jars、confs等的本地磁盤目錄,需要提前創(chuàng)建該目錄并給以足夠的訪問(wèn)權(quán)限。然后在storm.yaml中配置該目錄,如:storm.local.dir:"/home/admin/storm/workdir"復(fù)制代碼

3)

java.library.path:Storm使用的本地庫(kù)(ZMQ和JZMQ)加載路徑,默認(rèn)為”/usr/local/lib:/opt/local/lib:/usr/lib”,一般來(lái)說(shuō)ZMQ和JZMQ默認(rèn)安裝在/usr/local/lib下,因此不需要配置即可。4)

nimbus.host:Storm集群Nimbus機(jī)器地址,各個(gè)Supervisor工作節(jié)點(diǎn)需要知道哪個(gè)機(jī)器是Nimbus,以便下載Topologies的jars、confs等文件,如:nimbus.host:"111.222.333.444"復(fù)制代碼

5)

supervisor.slots.ports:對(duì)于每個(gè)Supervisor工作節(jié)點(diǎn),需要配置該工作節(jié)點(diǎn)可以運(yùn)行的worker數(shù)量。每個(gè)worker占用一個(gè)單獨(dú)的端口用于接收消息,該配置選項(xiàng)即用于定義哪些端口是可被worker使用的。默認(rèn)情況下,每個(gè)節(jié)點(diǎn)上可運(yùn)行4個(gè)workers,分別在6700、6701、6702和6703端口,如:supervisor.slots.ports:

-6700

-6701

-6702

-6703復(fù)制代碼3.2.5啟動(dòng)Storm各個(gè)后臺(tái)進(jìn)程最后一步,啟動(dòng)Storm的所有后臺(tái)進(jìn)程。和Zookeeper一樣,Storm也是快速失敗(fail-fast)的系統(tǒng),這樣Storm才能在任意時(shí)刻被停止,并且當(dāng)進(jìn)程重啟后被正確地恢復(fù)執(zhí)行。這也是為什么Storm不在進(jìn)程內(nèi)保存狀態(tài)的原因,即使Nimbus或Supervisors被重啟,運(yùn)行中的Topologies不會(huì)受到影響。以下是啟動(dòng)Storm各個(gè)后臺(tái)進(jìn)程的方式:Nimbus:在Storm主控節(jié)點(diǎn)上運(yùn)行”bin/stormnimbus>/dev/null2>&1&”啟動(dòng)Nimbus后臺(tái)程序,并放到后臺(tái)執(zhí)行;Supervisor:在Storm各個(gè)工作節(jié)點(diǎn)上運(yùn)行”bin/stormsupervisor>/dev/null2>&1&”啟動(dòng)Supervisor后臺(tái)程序,并放到后臺(tái)執(zhí)行;UI:在Storm主控節(jié)點(diǎn)上運(yùn)行”bin/stormui>/dev/null2>&1&”啟動(dòng)UI后臺(tái)程序,并放到后臺(tái)執(zhí)行,啟動(dòng)后可以通過(guò)http://{nimbushost}:8080觀察集群的worker資源使用情況、Topologies的運(yùn)行狀態(tài)等信息。注意事項(xiàng):?jiǎn)?dòng)Storm后臺(tái)進(jìn)程時(shí),需要對(duì)conf/storm.yaml配置文件中設(shè)置的storm.local.dir目錄具有寫權(quán)限。Storm后臺(tái)進(jìn)程被啟動(dòng)后,將在Storm安裝部署目錄下的logs/子目錄下生成各個(gè)進(jìn)程的日志文件。經(jīng)測(cè)試,StormUI必須和StormNimbus部署在同一臺(tái)機(jī)器上,否則UI無(wú)法正常工作,因?yàn)閁I進(jìn)程會(huì)檢查本機(jī)是否存在Nimbus鏈接。為了方便使用,可以將bin/storm加入到系統(tǒng)環(huán)境變量中。至此,Storm集群已經(jīng)部署、配置完畢,可以向集群提交拓?fù)溥\(yùn)行了。3.3向集群提交任務(wù)1.啟動(dòng)StormTopology:stormjarallmycode.jarorg.me.MyTopologyarg1arg2arg3復(fù)制代碼其中,allmycode.jar是包含Topology實(shí)現(xiàn)代碼的jar包,org.me.MyTopology的main方法是Topology的入口,arg1、arg2和arg3為org.me.MyTopology執(zhí)行時(shí)需要傳入的參數(shù)。2.停止StormTopology:stormkill{toponame}其中,{toponame}為Topology提交到Storm集群時(shí)指定的Topology任務(wù)名稱。3.4參考資料1.

/nathanmarz/storm/wiki/Tutorial/nathanmarz/st...-up-a-Storm-cluster3./?p=1892第四章torm消息的可靠處理4.1簡(jiǎn)介storm可以確保spout發(fā)送出來(lái)的每個(gè)消息都會(huì)被完整的處理。本章將會(huì)描述storm體系是如何達(dá)到這個(gè)目標(biāo)的,并將會(huì)詳述開發(fā)者應(yīng)該如何使用storm的這些機(jī)制來(lái)實(shí)現(xiàn)數(shù)據(jù)的可靠處理。4.2理解消息被完整處理一個(gè)消息(tuple)從spout發(fā)送出來(lái),可能會(huì)導(dǎo)致成百上千的消息基于此消息被創(chuàng)建。我們來(lái)思考一下流式的“單詞統(tǒng)計(jì)”的例子:storm任務(wù)從數(shù)據(jù)源(Kestrelqueue)每次讀取一個(gè)完整的英文句子;將這個(gè)句子分解為獨(dú)立的單詞,最后,實(shí)時(shí)的輸出每個(gè)單詞以及它出現(xiàn)過(guò)的次數(shù)。本例中,每個(gè)從spout發(fā)送出來(lái)的消息(每個(gè)英文句子)都會(huì)觸發(fā)很多的消息被創(chuàng)建,那些從句子中分隔出來(lái)的單詞就是被創(chuàng)建出來(lái)的新消息。這些消息構(gòu)成一個(gè)樹狀結(jié)構(gòu),我們稱之為“tupletree”,看起來(lái)如圖1所示:圖1示例tupletree在什么條件下,Storm才會(huì)認(rèn)為一個(gè)從spout發(fā)送出來(lái)的消息被完整處理呢?答案就是下面的條件同時(shí)被滿足:tupletree不再生長(zhǎng)樹中的任何消息被標(biāo)識(shí)為“已處理”如果在指定的時(shí)間內(nèi),一個(gè)消息衍生出來(lái)的tupletree未被完全處理成功,則認(rèn)為此消息未被完整處理。這個(gè)超時(shí)值可以通過(guò)任務(wù)級(jí)參數(shù)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS

進(jìn)行配置,默認(rèn)超時(shí)值為30秒。4.3消息的生命周期如果消息被完整處理或者未被完整處理,Storm會(huì)如何進(jìn)行接下來(lái)的操作呢?為了弄清這個(gè)問(wèn)題,我們來(lái)研究一下從spout發(fā)出來(lái)的消息的生命周期。這里列出了spout應(yīng)該實(shí)現(xiàn)的接口:

首先,Storm使用spout實(shí)例的nextTuple()方法從spout請(qǐng)求一個(gè)消息(tuple)。收到請(qǐng)求以后,spout使用open方法中提供的SpoutOutputCollector向它的輸出流發(fā)送一個(gè)或多個(gè)消息。每發(fā)送一個(gè)消息,Spout會(huì)給這個(gè)消息提供一個(gè)messageID,它將會(huì)被用來(lái)標(biāo)識(shí)這個(gè)消息。假設(shè)我們從kestrel隊(duì)列中讀取消息,Spout會(huì)將kestrel隊(duì)列為這個(gè)消息設(shè)置的ID作為此消息的messageID。向SpoutOutputCollector中發(fā)送消息格式如下:

接來(lái)下,這些消息會(huì)被發(fā)送到后續(xù)業(yè)務(wù)處理的bolts,并且Storm會(huì)跟蹤由此消息產(chǎn)生出來(lái)的新消息。當(dāng)檢測(cè)到一個(gè)消息衍生出來(lái)的tupletree被完整處理后,Storm會(huì)調(diào)用Spout中的ack方法,并將此消息的messageID作為參數(shù)傳入。同理,如果某消息處理超時(shí),則此消息對(duì)應(yīng)的Spout的fail方法會(huì)被調(diào)用,調(diào)用時(shí)此消息的messageID會(huì)被作為參數(shù)傳入。

注意:一個(gè)消息只會(huì)由發(fā)送它的那個(gè)spout任務(wù)來(lái)調(diào)用ack或fail。如果系統(tǒng)中某個(gè)spout由多個(gè)任務(wù)運(yùn)行,消息也只會(huì)由創(chuàng)建它的spout任務(wù)來(lái)應(yīng)答(ack或fail),決不會(huì)由其他的spout任務(wù)來(lái)應(yīng)答。

我們繼續(xù)使用從kestrel隊(duì)列中讀取消息的例子來(lái)闡述高可靠性下spout需要做些什么(假設(shè)這個(gè)spout的名字是KestrelSpout)。

我們先簡(jiǎn)述一下kestrel消息隊(duì)列:當(dāng)KestrelSpout從kestrel隊(duì)列中讀取一個(gè)消息,表示它“打開”了隊(duì)列中某個(gè)消息。這意味著,此消息并未從隊(duì)列中真正的刪除,而是將此消息設(shè)置為“pending”狀態(tài),它等待來(lái)自客戶端的應(yīng)答,被應(yīng)答以后,此消息才會(huì)被真正的從隊(duì)列中刪除。處于“pending”狀態(tài)的消息不會(huì)被其他的客戶端看到。另外,如果一個(gè)客戶端意外的斷開連接,則由此客戶端“打開”的所有消息都會(huì)被重新加入到隊(duì)列中。當(dāng)消息被“打開”的時(shí)候,kestrel隊(duì)列同時(shí)會(huì)為這個(gè)消息提供一個(gè)唯一的標(biāo)識(shí)。KestrelSpout就是使用這個(gè)唯一的標(biāo)識(shí)作為這個(gè)tuple的messageID的。稍后當(dāng)ack或fail被調(diào)用的時(shí)候,KestrelSpout會(huì)把a(bǔ)ck或者fail連同messageID一起發(fā)送給kestrel隊(duì)列,kestrel會(huì)將消息從隊(duì)列中真正刪除或者將它重新放回隊(duì)列中。4.4靠相關(guān)的API為了使用Storm提供的可靠處理特性,我們需要做兩件事情:

無(wú)論何時(shí)在tupletree中創(chuàng)建了一個(gè)新的節(jié)點(diǎn),我們需要明確的通知Storm;當(dāng)處理完一個(gè)單獨(dú)的消息時(shí),我們需要告訴Storm這棵tupletree的變化狀態(tài)。通過(guò)上面的兩步,storm就可以檢測(cè)到一個(gè)tupletree何時(shí)被完全處理了,并且會(huì)調(diào)用相關(guān)的ack或fail方法。Storm提供了簡(jiǎn)單明了的方法來(lái)完成上述兩步。為tupletree中指定的節(jié)點(diǎn)增加一個(gè)新的節(jié)點(diǎn),我們稱之為錨定(anchoring)。錨定是在我們發(fā)送消息的同時(shí)進(jìn)行的。為了更容易說(shuō)明問(wèn)題,我們使用下面代碼作為例子。本示例的bolt將包含整句話的消息分解為一系列的子消息,每個(gè)子消息包含一個(gè)單詞。

每個(gè)消息都通過(guò)這種方式被錨定:把輸入消息作為emit方法的第一個(gè)參數(shù)。因?yàn)閣ord消息被錨定在了輸入消息上,這個(gè)輸入消息是spout發(fā)送過(guò)來(lái)的tupletree的根節(jié)點(diǎn),如果任意一個(gè)word消息處理失敗,派生這個(gè)tupletree那個(gè)spout消息將會(huì)被重新發(fā)送。與此相反,我們來(lái)看看使用下面的方式emit消息時(shí),Storm會(huì)如何處理:

如果以這種方式發(fā)送消息,將會(huì)導(dǎo)致這個(gè)消息不會(huì)被錨定。如果此tupletree中的消息處理失敗,派生此tupletree的根消息不會(huì)被重新發(fā)送。根據(jù)任務(wù)的容錯(cuò)級(jí)別,有時(shí)候很適合發(fā)送一個(gè)非錨定的消息。一個(gè)輸出消息可以被錨定在一個(gè)或者多個(gè)輸入消息上,這在做join或聚合的時(shí)候是很有用的。一個(gè)被多重錨定的消息處理失敗,會(huì)導(dǎo)致與之關(guān)聯(lián)的多個(gè)spout消息被重新發(fā)送。多重錨定通過(guò)在emit方法中指定多個(gè)輸入消息來(lái)實(shí)現(xiàn):多重錨定會(huì)將被錨定的消息加到多棵tupletree上。注意:多重綁定可能會(huì)破壞傳統(tǒng)的樹形結(jié)構(gòu),從而構(gòu)成一個(gè)DAGs(有向無(wú)環(huán)圖),如圖2所示:

圖2多重錨定構(gòu)成的鉆石型結(jié)構(gòu)Storm的實(shí)現(xiàn)可以像處理樹那樣來(lái)處理DAGs。錨定表明了如何將一個(gè)消息加入到指定的tupletree中,高可靠處理API的接下來(lái)部分將向您描述當(dāng)處理完tupletree中一個(gè)單獨(dú)的消息時(shí)我們?cè)撟鲂┦裁础_@些是通過(guò)OutputCollector的ack和fail方法來(lái)實(shí)現(xiàn)的。回頭看一下例子SplitSentence,可以發(fā)現(xiàn)當(dāng)所有的word消息被發(fā)送完成后,輸入的表示句子的消息會(huì)被應(yīng)答(acked)。每個(gè)被處理的消息必須表明成功或失敗(acked或者failed)。Storm是使用內(nèi)存來(lái)跟蹤每個(gè)消息的處理情況的,如果被處理的消息沒(méi)有應(yīng)答的話,遲早內(nèi)存會(huì)被耗盡!很多bolt遵循特定的處理流程:讀取一個(gè)消息、發(fā)送它派生出來(lái)的子消息、在execute結(jié)尾處應(yīng)答此消息。一般的過(guò)濾器(filter)或者是簡(jiǎn)單的處理功能都是這類的應(yīng)用。Storm有一個(gè)BasicBolt接口封裝了上述的流程。示例SplitSentence可以使用BasicBolt來(lái)重寫:

使用這種方式,代碼比之前稍微簡(jiǎn)單了一些,但是實(shí)現(xiàn)的功能是一樣的。發(fā)送到BasicOutputCollector的消息會(huì)被自動(dòng)的錨定到輸入消息,并且,當(dāng)execute執(zhí)行完畢的時(shí)候,會(huì)自動(dòng)的應(yīng)答輸入消息。很多情況下,一個(gè)消息需要延遲應(yīng)答,例如聚合或者是join。只有根據(jù)一組輸入消息得到一個(gè)結(jié)果之后,才會(huì)應(yīng)答之前所有的輸入消息。并且聚合和join大部分時(shí)候?qū)敵鱿⒍际嵌嘀劐^定。然而,這些特性不是IBasicBolt所能處理的。4.5高效的實(shí)現(xiàn)tupletreeStorm系統(tǒng)中有一組叫做“acker”的特殊的任務(wù),它們負(fù)責(zé)跟蹤DAG(有向無(wú)環(huán)圖)中的每個(gè)消息。每當(dāng)發(fā)現(xiàn)一個(gè)DAG被完全處理,它就向創(chuàng)建這個(gè)根消息的spout任務(wù)發(fā)送一個(gè)信號(hào)。拓?fù)渲衋cker任務(wù)的并行度可以通過(guò)配置參數(shù)Config.TOPOLOGY_ACKERS來(lái)設(shè)置。默認(rèn)的acker任務(wù)并行度為1,當(dāng)系統(tǒng)中有大量的消息時(shí),應(yīng)該適當(dāng)提高acker任務(wù)的并發(fā)度。為了理解Storm可靠性處理機(jī)制,我們從研究一個(gè)消息的生命周期和tupletree的管理入手。當(dāng)一個(gè)消息被創(chuàng)建的時(shí)候(無(wú)論是在spout還是bolt中),系統(tǒng)都為該消息分配一個(gè)64bit的隨機(jī)值作為id。這些隨機(jī)的id是acker用來(lái)跟蹤由spout消息派生出來(lái)的tupletree的。每個(gè)消息都知道它所在的tupletree對(duì)應(yīng)的根消息的id。每當(dāng)bolt新生成一個(gè)消息,對(duì)應(yīng)tupletree中的根消息的messageId就拷貝到這個(gè)消息中。當(dāng)這個(gè)消息被應(yīng)答的時(shí)候,它就把關(guān)于tupletree變化的信息發(fā)送給跟蹤這棵樹的acker。例如,他會(huì)告訴acker:本消息已經(jīng)處理完畢,但是我派生出了一些新的消息,幫忙跟蹤一下吧。舉個(gè)例子,假設(shè)消息D和E是由消息C派生出來(lái)的,這里演示了消息C被應(yīng)答時(shí),tupletree是如何變化的。

因?yàn)樵贑被從樹中移除的同時(shí)D和E會(huì)被加入到tupletree中,因此tupletree不會(huì)被過(guò)早的認(rèn)為已完全處理。關(guān)于Storm如何跟蹤tupletree,我們?cè)偕钊氲奶接懸幌?。前面說(shuō)過(guò)系統(tǒng)中可以有任意個(gè)數(shù)的acker,那么,每當(dāng)一個(gè)消息被創(chuàng)建或應(yīng)答的時(shí)候,它怎么知道應(yīng)該通知哪個(gè)acker呢?系統(tǒng)使用一種哈希算法來(lái)根據(jù)spout消息的messageId確定由哪個(gè)acker跟蹤此消息派生出來(lái)的tupletree。因?yàn)槊總€(gè)消息都知道與之對(duì)應(yīng)的根消息的messageId,因此它知道應(yīng)該與哪個(gè)acker通信。當(dāng)spout發(fā)送一個(gè)消息的時(shí)候,它就通知對(duì)應(yīng)的acker一個(gè)新的根消息產(chǎn)生了,這時(shí)acker就會(huì)創(chuàng)建一個(gè)新的tupletree。當(dāng)acker發(fā)現(xiàn)這棵樹被完全處理之后,他就會(huì)通知對(duì)應(yīng)的spout任務(wù)。tuple是如何被跟蹤的呢?系統(tǒng)中有成千上萬(wàn)的消息,如果為每個(gè)spout發(fā)送的消息都構(gòu)建一棵樹的話,很快內(nèi)存就會(huì)耗盡。所以,必須采用不同的策略來(lái)跟蹤每個(gè)消息。由于使用了新的跟蹤算法,Storm只需要固定的內(nèi)存(大約20字節(jié))就可以跟蹤一棵樹。這個(gè)算法是storm正確運(yùn)行的核心,也是storm最大的突破。acker任務(wù)保存了spout消息id到一對(duì)值的映射。第一個(gè)值就是spout的任務(wù)id,通過(guò)這個(gè)id,acker就知道消息處理完成時(shí)該通知哪個(gè)spout任務(wù)。第二個(gè)值是一個(gè)64bit的數(shù)字,我們稱之為“ackval”,它是樹中所有消息的隨機(jī)id的異或結(jié)果。ackval表示了整棵樹的的狀態(tài),無(wú)論這棵樹多大,只需要這個(gè)固定大小的數(shù)字就可以跟蹤整棵樹。當(dāng)消息被創(chuàng)建和被應(yīng)答的時(shí)候都會(huì)有相同的消息id發(fā)送過(guò)來(lái)做異或。每當(dāng)acker發(fā)現(xiàn)一棵樹的ackval值為0的時(shí)候,它就知道這棵樹已經(jīng)被完全處理了。因?yàn)橄⒌碾S機(jī)ID是一個(gè)64bit的值,因此ackval在樹處理完之前被置為0的概率非常小。假設(shè)你每秒鐘發(fā)送一萬(wàn)個(gè)消息,從概率上說(shuō),至少需要50,000,000年才會(huì)有機(jī)會(huì)發(fā)生一次錯(cuò)誤。即使如此,也只有在這個(gè)消息確實(shí)處理失敗的情況下才會(huì)有數(shù)據(jù)的丟失!4.6選擇合適的可靠性級(jí)別Acker任務(wù)是輕量級(jí)的,所以在拓?fù)渲胁⒉恍枰嗟腶cker存在??梢酝ㄟ^(guò)StormUI來(lái)觀察acker任務(wù)的吞吐量,如果看上去吞吐量不夠的話,說(shuō)明需要添加額外的acker。如果你并不要求每個(gè)消息必須被處理(你允許在處理過(guò)程中丟失一些信息),那么可以關(guān)閉消息的可靠處理機(jī)制,從而可以獲取較好的性能。關(guān)閉消息的可靠處理機(jī)制意味著系統(tǒng)中的消息數(shù)會(huì)減半(每個(gè)消息不需要應(yīng)答了)。另外,關(guān)閉消息的可靠處理可以減少消息的大?。ú恍枰總€(gè)tuple記錄它的根id了),從而節(jié)省帶寬。有三種方法可以關(guān)系消息的可靠處理機(jī)制:將參數(shù)Config.TOPOLOGY_ACKERS設(shè)置為0,通過(guò)此方法,當(dāng)Spout發(fā)送一個(gè)消息的時(shí)候,它的ack方法將立刻被調(diào)用;第二個(gè)方法是Spout發(fā)送一個(gè)消息時(shí),不指定此消息的messageID。當(dāng)需要關(guān)閉特定消息可靠性的時(shí)候,可以使用此方法;最后,如果你不在意某個(gè)消息派生出來(lái)的子孫消息的可靠性,則此消息派生出來(lái)的子消息在發(fā)送時(shí)不要做錨定,即在emit方法中不指定輸入消息。因?yàn)檫@些子孫消息沒(méi)有被錨定在任何tupletree中,因此他們的失敗不會(huì)引起任何spout重新發(fā)送消息。4.7集群的各級(jí)容錯(cuò)到現(xiàn)在為止,大家已經(jīng)理解了Storm的可靠性機(jī)制,并且知道了如何選擇不同的可靠性級(jí)別來(lái)滿足需求。接下來(lái)我們研究一下Storm如何保證在各種情況下確保數(shù)據(jù)不丟失。4.7.1任務(wù)級(jí)失敗因?yàn)閎olt任務(wù)crash引起的消息未被應(yīng)答。此時(shí),acker中所有與此bolt任務(wù)關(guān)聯(lián)的消息都會(huì)因?yàn)槌瑫r(shí)而失敗,對(duì)應(yīng)spout的fail方法將被調(diào)用。acker任務(wù)失敗。如果acker任務(wù)本身失敗了,它在失敗之前持有的所有消息都將會(huì)因?yàn)槌瑫r(shí)而失敗。Spout的fail方法將被調(diào)用。Spout任務(wù)失敗。這種情況下,Spout任務(wù)對(duì)接的外部設(shè)備(如MQ)負(fù)責(zé)消息的完整性。例如當(dāng)客戶端異常的情況下,kestrel隊(duì)列會(huì)將處于pending狀態(tài)的所有的消息重新放回到隊(duì)列中。4.7.2

任務(wù)槽(slot)故障worker失敗。每個(gè)worker中包含數(shù)個(gè)bolt(或spout)任務(wù)。supervisor負(fù)責(zé)監(jiān)控這些任務(wù),當(dāng)worker失敗后,supervisor會(huì)嘗試在本機(jī)重啟它。supervisor失敗。supervisor是無(wú)狀態(tài)的,因此supervisor的失敗不會(huì)影響當(dāng)前正在運(yùn)行的任務(wù),只要及時(shí)的將它重新啟動(dòng)即可。supervisor不是自舉的,需要外部監(jiān)控來(lái)及時(shí)重啟。nimbus失敗。nimbus是無(wú)狀態(tài)的,因此nimbus的失敗不會(huì)影響當(dāng)前正在運(yùn)行的任務(wù)(nimbus失敗時(shí),無(wú)法提交新的任務(wù)),只要及時(shí)的將它重新啟動(dòng)即可。nimbus不是自舉的,需要外部監(jiān)控來(lái)及時(shí)重啟。4.7.3.

集群節(jié)點(diǎn)(機(jī)器)故障storm集群中的節(jié)點(diǎn)故障。此時(shí)nimbus會(huì)將此機(jī)器上所有正在運(yùn)行的任務(wù)轉(zhuǎn)移到其他可用的機(jī)器上運(yùn)行。zookeeper集群中的節(jié)點(diǎn)故障。zookeeper保證少于半數(shù)的機(jī)器宕機(jī)仍可正常運(yùn)行,及時(shí)修復(fù)故障機(jī)器即可。4.8小結(jié)本章介紹了storm集群如何實(shí)現(xiàn)數(shù)據(jù)的可靠處理。借助于創(chuàng)新性的tupletree跟蹤技術(shù),storm高效的通過(guò)數(shù)據(jù)的應(yīng)答機(jī)制來(lái)保證數(shù)據(jù)不丟失。storm集群中除nimbus外,沒(méi)有單點(diǎn)存在,任何節(jié)點(diǎn)都可以出故障而保證數(shù)據(jù)不會(huì)丟失。nimbus被設(shè)計(jì)為無(wú)狀態(tài)的,只要可以及時(shí)重啟,就不會(huì)影響正在運(yùn)行的任務(wù)。第五章一致性事務(wù)Storm是一個(gè)分布式的流處理系統(tǒng),利用anchor和ack機(jī)制保證所有tuple都被成功處理。如果tuple出錯(cuò),則可以被重傳,但是如何保證出錯(cuò)的tuple只被處理一次呢?Storm提供了一套事務(wù)性組件TransactionTopology,用來(lái)解決這個(gè)問(wèn)題。TransactionalTopology目前已經(jīng)不再維護(hù),由Trident來(lái)實(shí)現(xiàn)事務(wù)性topology,但是原理相同。5.1一致性事務(wù)的設(shè)計(jì)Storm如何實(shí)現(xiàn)即對(duì)tuple并行處理,又保證事務(wù)性。本節(jié)從簡(jiǎn)單的事務(wù)性實(shí)現(xiàn)方法入手,逐步引出TransactionalTopology的原理。保證tuple只被處理一次,最簡(jiǎn)單的方法就是將tuple流變成強(qiáng)順序的,并且每次只處理一個(gè)tuple。從1開始,給每個(gè)tuple都順序加上一個(gè)id。在處理tuple的時(shí)候,將處理成功的tupleid和計(jì)算結(jié)果存在數(shù)據(jù)庫(kù)中。下一個(gè)tuple到來(lái)的時(shí)候,將其id與數(shù)據(jù)庫(kù)中的id做比較。如果相同,則說(shuō)明這個(gè)tuple已經(jīng)被成功處理過(guò)了,忽略它;如果不同,根據(jù)強(qiáng)順序性,說(shuō)明這個(gè)tuple沒(méi)有被處理過(guò),將它的id及計(jì)算結(jié)果更新到數(shù)據(jù)庫(kù)中。以統(tǒng)計(jì)消息總數(shù)為例。每來(lái)一個(gè)tuple,如果數(shù)據(jù)庫(kù)中存儲(chǔ)的id與當(dāng)前tupleid不同,則數(shù)據(jù)庫(kù)中的消息總數(shù)加1,同時(shí)更新數(shù)據(jù)庫(kù)中的當(dāng)前tupleid值。如圖:

但是這種機(jī)制使得系統(tǒng)一次只能處理一個(gè)tuple,無(wú)法實(shí)現(xiàn)分布式計(jì)算。為了實(shí)現(xiàn)分布式,我們可以每次處理一批tuple,稱為一個(gè)batch。一個(gè)batch中的tuple可以被并行處理。我們要保證一個(gè)batch只被處理一次,機(jī)制和上一節(jié)類似。只不過(guò)數(shù)據(jù)庫(kù)中存儲(chǔ)的是batchid。batch的中間計(jì)算結(jié)果先存在局部變量中,當(dāng)一個(gè)batch中的所有tuple都被處理完之后,判斷batchid,如果跟數(shù)據(jù)庫(kù)中的id不同,則將中間計(jì)算結(jié)果更新到數(shù)據(jù)庫(kù)中。如何確保一個(gè)batch里面的所有tuple都被處理完了呢?可以利用Storm提供的CoordinateBolt。如圖:

但是強(qiáng)順序batch流也有局限,每次只能處理一個(gè)batch,batch之間無(wú)法并行。要想實(shí)現(xiàn)真正的分布式事務(wù)處理,可以使用storm提供的TransactionalTopology。在此之前,我們先詳細(xì)介紹一下CoordinateBolt的原理。CoordinateBolt具體原理如下:真正執(zhí)行計(jì)算的bolt外面封裝了一個(gè)CoordinateBolt。真正執(zhí)行任務(wù)的bolt我們稱為realbolt。每個(gè)CoordinateBolt記錄兩個(gè)值:有哪些task給我發(fā)送了tuple(根據(jù)topology的grouping信息);我要給哪些tuple發(fā)送信息(同樣根據(jù)groping信息)Realbolt發(fā)出一個(gè)tuple后,其外層的CoordinateBolt會(huì)記錄下這個(gè)tuple發(fā)送給哪個(gè)task了。等所有的tuple都發(fā)送完了之后,CoordinateBolt通過(guò)另外一個(gè)特殊的stream以emitDirect的方式告訴所有它發(fā)送過(guò)tuple的task,它發(fā)送了多少tuple給這個(gè)task。下游task會(huì)將這個(gè)數(shù)字和自己已經(jīng)接收到的tuple數(shù)量做對(duì)比,如果相等,則說(shuō)明處理完了所有的tuple。下游CoordinateBolt會(huì)重復(fù)上面的步驟,通知其下游。整個(gè)過(guò)程如圖所示:

CoordinateBolt主要用于兩個(gè)場(chǎng)景:DRPCTransactionalTopologyCoordinatedBolt對(duì)于業(yè)務(wù)是有侵入的,要使用CoordinatedBolt提供的功能,你必須要保證你的每個(gè)bolt發(fā)送的每個(gè)tuple的第一個(gè)field是request-id。所謂的“我已經(jīng)處理完我的上游”的意思是說(shuō)當(dāng)前這個(gè)bolt對(duì)于當(dāng)前這個(gè)request-id所需要做的工作做完了。這個(gè)request-id在DRPC里面代表一個(gè)DRPC請(qǐng)求;在TransactionalTopology里面代表一個(gè)batch。Storm提供的TransactionalTopology將batch計(jì)算分為process和commit兩個(gè)階段。Process階段可以同時(shí)處理多個(gè)batch,不用保證順序性;commit階段保證batch的強(qiáng)順序性,并且一次只能處理一個(gè)batch,第1個(gè)batch成功提交之前,第2個(gè)batch不能被提交。還是以統(tǒng)計(jì)消息總數(shù)為例,以下代碼來(lái)自storm-starter里面的TransactionalGlobalCount。MemoryTransactionalSpoutspout=newMemoryTransactionalSpout(DATA,newFields(“word“),PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilderbuilder=newTransactionalTopologyBuilder(“global-count“,“spout“,spout,3);builder.setBolt(“partial-count“,newBatchCount(),5).noneGrouping(“spout“);builder.setBolt(“sum“,newUpdateGlobalCount()).globalGrouping(“partial-count“);TransactionalTopologyBuilder共接收四個(gè)參數(shù)。這個(gè)TransactionalTopology的id。Id用來(lái)在Zookeeper中保存當(dāng)前topology的進(jìn)度,如果這個(gè)topology重啟,可以繼續(xù)之前的進(jìn)度執(zhí)行。Spout在這個(gè)topology中的id一個(gè)TransactionalSpout

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論