




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
1、尚硅谷大數(shù)據(jù)技術(shù)之Storm(作者:大海哥)官網(wǎng): 版本:V1.0 Storm 概述1.1 離線計算是什么?離線計算:批量獲取數(shù)據(jù)、批量傳輸數(shù)據(jù)、周期性批量計算數(shù)據(jù)、數(shù)據(jù)展示代表技術(shù):Sqoop 批量導(dǎo)入數(shù)據(jù)、HDFS 批量數(shù)據(jù)、MapReduce 批量計算數(shù)據(jù)、Hive批量計算數(shù)據(jù)1.2 流式計算是什么流式計算:數(shù)據(jù)實時產(chǎn)生、數(shù)據(jù)實時傳輸、數(shù)據(jù)實時計算、實時展示代表技術(shù):Flume 實時獲取數(shù)據(jù)、Kafka 實時數(shù)據(jù)、Storm/JStorm 實時數(shù)據(jù)計算、Redis 實時結(jié)果緩存、持久化(mysql)。離線計算與實時計算最大的區(qū)別:實時收集、實時計算、實時展示1.3 Storm 是什么?S
2、torm 是一個分布式計算框架,主要使用 Clojure 與Java 語言編寫,最初是由Nathan Marz帶領(lǐng) Backtype 公司團隊創(chuàng)建,在 Backtype 公司被公司收購后進行開源。最初的版本是在 2011 年 9 月 17 日,版本號 0.5.0。2013 年9 月,Apache開始接管并孵化Storm 項目。Apache Storm 是在Eclipse PublicLicense 下進行開發(fā)的,它提供給大多數(shù)企業(yè)使用。經(jīng)過 1 年多時間,2014 年 9 月,Storm項目成為 Apache 的頂級項目。目前,Storm 的最新版本 1.1.0。Storm 是一個開源的分布式
3、實時計算系統(tǒng)。Storm 能輕松可靠地處理的數(shù)據(jù)流,就像 Hadoop 對數(shù)據(jù)進行批處理;1.4 Storm 與 Hadoop 的區(qū)別1)Storm 用于實時計算,Hadoop 用于離線計算。2)Storm 處理的數(shù)據(jù)保存在內(nèi)存中,源源不斷;Hadoop 處理的數(shù)據(jù)保存在文件系統(tǒng)中,一批一批處理。3)Storm 的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸進來;Hadoop 的數(shù)據(jù)保存在磁盤中。4)Storm 與Hadoop 的編程模型相似(1)hadoop 相關(guān)名稱Job:任務(wù)名稱JobTracker:項目經(jīng)理(JobTracker 對應(yīng)于 NameNode;JobTracker 是一個 master 服務(wù),軟件啟動
4、之后 JobTracker 接收 Job,負責(zé)調(diào)度 Job 的每一個子任務(wù) task 運行于 TaskTracker上,并它們,如果發(fā)現(xiàn)有失敗的 task 就重新運行它。)TaskTracker:開發(fā)組長(TaskTracker 對應(yīng)于 DataNode;TaskTracker 是運行在多個節(jié)點上的 slaver 服務(wù)。TaskTracker 主動與JobTracker 通收作業(yè),并負責(zé)直接執(zhí)行每一個任務(wù)。)Child:負責(zé)開發(fā)的Mapper/Reduce:開發(fā)中的兩種,一種是服務(wù)器開發(fā)、一種是客戶端開發(fā)(2)storm 相關(guān)名稱Topology:任務(wù)名稱Nimbus:項目經(jīng)理Supervis
5、or:開組長Worker:開Spout/Bolt:開中的兩種,一種是服務(wù)器開發(fā)、一種是客戶端開發(fā)1.5 Storm 應(yīng)用場景及行業(yè)案例Storm 用來實時計算源源不斷產(chǎn)生的數(shù)據(jù),如同流水線生產(chǎn)。1.5.1 運用場景Storm 能用到很多場景中,包括:實時分析、學(xué)習(xí)、連續(xù)計算等。StormhadoopNimbusJobTrackerSupervisorTaskTrackerWorkerChild應(yīng)用名稱TopologyJob編程接口Spout/BoltMapper/Reducer1)推薦系統(tǒng):實時推薦,根據(jù)下單或加入購物車推薦相關(guān)商品2)金融系統(tǒng):實時分析股票信息數(shù)據(jù)3)系統(tǒng):根據(jù)實時數(shù)據(jù),判斷
6、是否到了閾值。4)統(tǒng)計:實時銷量、流量統(tǒng)計,如淘寶雙 11 效果圖1.5.2典型案列1)京東-實時分析系統(tǒng):實時分析用戶的屬性,并反饋給搜索引擎最初,用戶屬性分析是通過每天在云上定時運行的 MR job 來完成的。為了滿足實時性的要求,希望能夠?qū)崟r分析用戶的行為日志,將最新的用戶屬性反饋給搜索引擎,能夠為用戶展現(xiàn)最貼近其當(dāng)前需求的結(jié)果。2)攜程-性能:實時分析系統(tǒng)攜程網(wǎng)的性能利用 HTML5 提供的 performance 標準獲得可用的指標,并日志。Storm 集群實時分析日志和入庫。使用DRPC 聚報表,通過歷史數(shù)據(jù)對比等判斷規(guī)則,觸發(fā)。3)淘寶雙十一:實時統(tǒng)計銷售總額1.6 Storm
7、特點1)適用場景廣泛:Storm 可以適用實時處理消息、更新數(shù)據(jù)庫、持續(xù)計算等場景。2)可伸縮性高:Storm 的可伸縮性可以讓 Storm 每秒處理的消息量達到很高。擴展一個實時計算任務(wù),你所需要做的就是加并且提高這個計算任務(wù)的并行度。Storm 使用Zookeeper 來協(xié)調(diào)內(nèi)的各種配置使得 Storm 的集群可以很容易的擴展。3)保證無數(shù)據(jù)丟失:Storm 保證所有的數(shù)據(jù)都被處理。4)異常健壯:Storm 集群非常容易管理,輪流重啟節(jié)點不影響應(yīng)用。5)容錯性好:在消息處理過程中出現(xiàn)異常,Storm 會進行重試。二 Storm 基礎(chǔ)知識2.1 Storm 編程模型Storm編程模型6)To
8、pology:Storm中運行的一個實時應(yīng)用程序的名稱BolthdfsBolt5)Stream是指數(shù)據(jù)流DataSourceSpoutBoltHbase1)文件、數(shù)據(jù)庫、緩沖隊列kafka等4)tuple是一次消息傳遞的基本單元.MongodbDataSourceSpoutBoltRedis3)bolt接收數(shù)據(jù)后,根據(jù)用戶需求執(zhí)行相應(yīng)的操作2)spout從外部數(shù)據(jù)源中數(shù)據(jù),然后轉(zhuǎn)換為topology內(nèi)部的源數(shù)據(jù)2.1.1 元組(Tuple)元組(Tuple),是消息傳遞的基本單元,是一個命名的值列表,元組中的字段可以是任何類型的對象。Storm 使用元組作為其數(shù)據(jù)模型,元組支持所有的基本類型、
9、字符串和字節(jié)數(shù)組作為字段值,只要實現(xiàn)類型的序列化接口就可以使用該類型的對象。元組本來應(yīng)該是一個 key-value 的 Map,但是由于各個組件間傳遞的元組的字段名稱已經(jīng)事先定義好,所以只要按序把元組填入各個 value 即可,所以元組是一個 value 的 List。2.1.2 流(Stream)流是 Storm 的抽象,是一個的元組系列。源源不斷傳遞的元組就組成了流,在分布式環(huán)境中并行地進行創(chuàng)建和處理。2.1.3 水龍頭(Spout)Spout 是拓撲的流的來源,是一個拓撲中產(chǎn)生源數(shù)據(jù)流的組件。通常情況下,Spout 會從外部數(shù)據(jù)源中數(shù)據(jù),然后轉(zhuǎn)換為拓撲內(nèi)部的源數(shù)據(jù)。Spout 可以是可靠
10、的,也可以是不可靠的。如果 Storm 處理元組失敗,可靠的 Spout 能夠重新發(fā)射,而不可靠的 Spout 就盡快忘記發(fā)出的元組。Spout 可以發(fā)出超過一個流。Spout 的主要方法是 nextTuple()。NextTuple()會發(fā)出一個新的 Tuple 到拓撲,如果沒有新的元組發(fā)出,則簡單返回。Spout 的其他方法是ack()和fail()。當(dāng)Storm 檢測到一個元組從Spout 發(fā)出時,ack()和fail()會被調(diào)用,要么成功完成通過拓撲,要么未能完成。Ack()和 fail()僅被可靠的 Spout 調(diào)用。IRichSpout 是 Spout 必須實現(xiàn)的接口。2.1.4
11、轉(zhuǎn)接頭(Bolt)在拓撲中所有處理都在 Bolt 中完成,Bolt 是流的處理節(jié)點,從一個拓撲接收數(shù)據(jù),然后執(zhí)行進行處理的組件。Bolt 可以完成過濾、業(yè)務(wù)處理、連接運算、連接與數(shù)據(jù)庫等任何操作。Bolt 是一個的,七接口中有一個 execute()方法,在接收到消息后會調(diào)用此方法,用戶可以在其中執(zhí)行自己希望的操作。Bolt 可以完成簡單的流的轉(zhuǎn)換,而完成復(fù)雜的流的轉(zhuǎn)換通常需要多個步驟,因此需要多個 Bolt。Bolt 可以發(fā)出超過一個的流。2.1.5 拓撲(Topology)拓撲(Topology)是 Storm 中運行的一個實時應(yīng)用程序,因為各個組件間的消息而形成邏輯上的拓撲結(jié)構(gòu)。把實時應(yīng)
12、用程序的運行邏輯打成 jar 包后提交到 Storm 的拓撲(Topology)。Storm 的拓撲類似于 MapReduce 的作業(yè)(Job)。其主要的區(qū)別是,MapReduce 的作業(yè)最終會完成,而一個拓撲永遠都在運行直到它被殺死。一個拓撲是一個圖的 Spout 和Bolt 的連接流分組。2.2 Storm組件Storm組件supervisor1) Nimbus:主控節(jié)點,用于接收提交任務(wù)、分配集群任務(wù)、集群 等Zookeepersupervisor4)Worker:運行具體處理組件邏輯的進程ZookeepersupervisorZookeeper0)提交Topologysuperviso
13、r2) Zookeeper:集群中數(shù)據(jù)的( 如心跳信client息、集群狀態(tài)和配置信息) 、 Nimbus 分配給supervisor的任務(wù)等3 ) Supervisor : 負責(zé)接收Nimbus分配的任務(wù),管理屬于自己的Worker進程。nimbus 是整個集群的控管,負責(zé) topology 的提交、運行狀態(tài)、任務(wù)重新分配等工作。zk 就是一個管理者,者??傮w描述:nimbus 下命令(分配任務(wù)),zk 監(jiān)督執(zhí)行(心跳,worker、supurvisor的心跳都歸它管),supervisor 領(lǐng)旨(代碼),招募人馬(創(chuàng)建 worker 和線程等),worker、executor 就給活!ta
14、sk 就是具體要干的活。2.2.1 主控節(jié)點與工作節(jié)點Storm 集群中有兩類節(jié)點:主控節(jié)點(Master Node)和工作節(jié)點(Worker Node)。其中,主控節(jié)點只有一個,而工作節(jié)點可以有多個。workerexecutorSpouttasktaskexecutorbolt tasktaskworkerNimbusworkerworker2.2.2 Nimbus 進程與 Supervisor 進程主控節(jié)點運行一個稱為 Nimbus 的守護進程類似于 Hadoop 的 JobTracker。Nimbus 負責(zé)在集群中分發(fā)代碼,對節(jié)點分配任務(wù),并監(jiān)視主機故障。每個工作節(jié)點運行一個稱為 Sup
15、ervisor 的守護進程。Supervisor其主機上已經(jīng)分配的主機的作業(yè),啟動和停止 Nimbus 已經(jīng)分配的工作進程。2.2.3 流分組(Stream grouping)流分組,是拓撲定義中的一部分,為每個 Bolt 指定應(yīng)該接收哪個流作為輸入。流分組定義了流/元組如何在 Bolt 的任務(wù)之間進行分發(fā)。Storm 內(nèi)置了 8 種流分組方式。2.2.4 工作進程(Worker)Worker 是 Spout/Bolt 中運行具體處理邏輯的進程。一個 worker 就是一個進程,進程里面包含一個或多個線程。2.2.5 執(zhí)行器(Executor)一個線程就是一個 executor,一個線程會處理
16、一個或多個任務(wù)。2.2.6 任務(wù)(Task)一個任務(wù)就是一個 task。2.3 實時流計算常見架構(gòu)圖1)Flume 獲取數(shù)據(jù)。2)Kafka 臨時保存數(shù)據(jù)。3)Strom 計算數(shù)據(jù)。4)Redis 是個內(nèi)存數(shù)據(jù)庫,用來保存數(shù)據(jù)。Redis集群Storm集群Kafka集群Flume集群系統(tǒng)三 Storm 集群搭建3.1 環(huán)境準備3.1.1 集群規(guī)劃hadoop102 zkstorm3.1.2 jar 包hadoop103 zkstormhadoop104 zkstorm(1):(2)安裝集群步驟:3.1.3 虛擬機準備1)準備 3 臺虛擬機2)配置 ip 地址尚硅谷大數(shù)據(jù)技術(shù)之修改為靜態(tài)ip.d
17、o3)配置主機名稱尚硅谷大數(shù)據(jù)技術(shù)之修改主機名.doc4)3 臺主機分別關(guān)閉roothadoop102 atguigu# chkconfig iptables offroothadoop103 atguigu# chkconfig iptables offroothadoop104 atguigu# chkconfig iptables off3.1.4 安裝 jdk尚硅谷大數(shù)據(jù)技術(shù)之安裝jdk.doc3.1.5 安裝 Zookeeper0)集群規(guī)劃在 hadoop102、hadoop103 和 hadoop104 三個節(jié)點上部署 Zookeeper。1)解壓安裝(1)解壓 zookeeper
18、 安裝包到/opt/module/目錄下atguiguhadoop102 software$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/(2)在/opt/module/zookeeper-3.4.10/這個目錄下創(chuàng)建 zkDatamkdir -p zkData(3)重命名/opt/module/zookeeper-3.4.10/conf 這個目錄下的 zoo_sample.cfg 為zoo.cfgmv zoo_sample.cfg zoo.cfg2)配置 zoo.cfg 文件(1)具體配置dataDir=/opt/module/zooke
19、eper-3.4.10/zkData增加如下配置#cluster#server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888(2)配置參數(shù)解讀Server.A=B:C:D。A 是一個數(shù)字,表示這個是第幾號服務(wù)器;B 是這個服務(wù)器的 ip 地址;C 是這個服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口;D 是萬一集群中的 Leader 服務(wù)器掛了,需要一個端口來重新進行,選出一個新的Leader,而這個端口就是用來執(zhí)行時服務(wù)器相互通信的端口。集群模式下配置一個文件 myid,這個文
20、件在 dataDir 目錄下,這個文件里面有一個數(shù)據(jù)就是 A 的值,Zookeeper 啟動時此文件,拿到里面的數(shù)據(jù)與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個 server。3)集群操作(1)在/opt/module/zookeeper-3.4.10/zkData 目錄下創(chuàng)建一個 myid 的文件touch myid添加 myid 文件,注意一定要在 linux 里面創(chuàng)建,在 notepad+里面很可能亂碼(2)編輯 myid 文件vi myid在文件中添加與 server 對應(yīng)的編號:如 2(3)拷貝配置好的 zookeeper 到其他上scp -r zookeeper-3.4
21、.10/ roothad:/opt/app/scp -r zookeeper-3.4.10/ roothad:/opt/app/并分別修改 myid 文件中內(nèi)容為 3、4(4)分別啟動 zookeeperroothadoop102 zookeeper-3.4.10# bin/zkServer.sh startroothadoop103 zookeeper-3.4.10# bin/zkServer.sh startroothadoop104 zookeeper-3.4.10# bin/zkServer.sh start(5)查看狀態(tài)roothadoop102 zookeeper-3.4.10#
22、bin/zkServer.sh statusJMX enabled by defaultUsing config: /opt/module/zookeeper-3.4.10/bin/./conf/zoo.cfgMode: followerroothadoop103 zookeeper-3.4.10# bin/zkServer.sh statusJMX enabled by defaultUsing config: /opt/module/zookeeper-3.4.10/bin/./conf/zoo.cfgMode: leaderroothadoop104 zookeeper-3.4.5# b
23、in/zkServer.sh statusJMX enabled by defaultUsing config: /opt/module/zookeeper-3.4.10/bin/./conf/zoo.cfgMode: follower3.2 Storm 集群部署3.2.1 配置集群1)拷貝 jar 包到 hadoop102 的/opt/software 目錄下2)解壓 jar 包到/opt/module 目錄下atguiguhadoop102 software$ tar -zxvf apache-storm-1.1.0.tar.gz -C /opt/module/3)修改解壓后的 apach
24、e-storm-1.1.0.tar.gz 文件名稱為 stormatguiguhadoop102 module$ mv apache-storm-1.1.0/ storm4)在/opt/module/storm/目錄下創(chuàng)建 data 文件夾atguiguhadoop102 storm$ mkdir data5)修改配置文件atguiguhadoop102 conf$ pwd/opt/module/storm/confatguiguhadoop102 conf$ vi storm.yaml# 設(shè)置 Zookeeper 的主機名稱storm.zookeeper.servers:- "ha
25、doop102"6)配置環(huán)境變量roothadoop102 storm# vi /etc/profileroothadoop102 storm# source /etc/profile7)分發(fā)配置好的 Storm 安裝包atguiguhadoop102 storm$ xsync storm/8)啟動集群(1)啟動 nimbusatguiguhadoop102 storm$ bin/storm nimbus &atguiguhadoop103 storm$ bin/storm nimbus &atguiguhadoop104 storm$ bin/storm nimbu
26、s &(2)啟動 supervisoratguiguhadoop102 storm$ bin/storm supervisor &atguiguhadoop102 storm$ bin/storm supervisor &atguiguhadoop102 storm$ bin/storm supervisor &(3)啟動 Storm uiatguiguhadoop102 storm$ bin/storm ui9)通過瀏覽器查看集群狀態(tài)#STORM_HOMEexport STORM_HOME=/opt/module/storm export PATH=$PATH
27、:$STORM_HOME/bin- "hadoop103"- "hadoop104"# 設(shè)置主節(jié)點的主機名稱nimbus.seeds: "hadoop102"# 設(shè)置 Storm 的數(shù)據(jù)路徑storm.local.dir: "/opt/module/storm/data"# 設(shè)置 Worker 的端supervisor.slots.ports:- 6700- 6701- 6702- 67033.2.2 Storm 日志信息查看1)查看 nimbus 的日志信息在 nimbus 的服務(wù)器上cd /opt/module
28、/storm/logstail -100f /opt/module/storm/logs/nimbus.log2)查看 ui 運行日志信息在 ui 的服務(wù)器上,一般和 nimbus 一個服務(wù)器cd /opt/module/storm/logstail -100f /opt/module/storm/logs/ui.log3)查看 supervisor 運行日志信息在 supervisor 服務(wù)上cd /opt/module/storm/logstail -100f /opt/module/storm/logs/supervisor.log4)查看 supervisor 上 worker 運行日
29、志信息在 supervisor 服務(wù)上cd /opt/module/storm/logstail -100f /opt/module/storm/logs/worker-6702.log5)logviewer,可以在 web 頁面點擊相應(yīng)的端即可查看日志分別在 supervisor 節(jié)點上執(zhí)行:atguiguhadoop102 storm$ bin/storm logviewer &atguiguhadoop103 storm$ bin/storm logviewer &atguiguhadoop104 storm$ bin/storm logviewer &3.2.3
30、 Storm 命令行操作1)nimbus:啟動 nimbus 守護進程storm nimbus2)supervisor:啟動 supervisor 守護進程storm supervisor3)ui:啟動 UI 守護進程。storm ui4)list:列出正在運行的拓撲及其狀態(tài)storm list5)logviewer:Logviewer 提供一個 web 接口查看 Storm 日志文件。storm logviewer6)jar:storm jar 【jar 路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】7)kill:殺死名為 Topology-name 的拓撲storm kill topolog
31、y-name -w wait-time-secs-w:等待多久后殺死拓撲8)active:激活指定的拓撲 spout。storm activate topology-name9)deactivate:禁用指定的拓撲 Spout。storm deactivate topology-name10)help:打印一條幫助消息或者可用命令的列表。storm helpstorm help <command>四 常用 API4.1 API 簡介4.1.1 Component 組件1)基本接口(1)IComponent 接口(2)ISpout 接口(3)IRichSpout 接口(4)IStat
32、eSpout 接口(5)IRichStateSpout 接口(6)IBolt 接口(7)IRichBolt 接口(8)IBasicBolt 接口2)基本抽象類(1)ponent 抽象類(2)BaseRichSpout 抽象類(3)BaseRichBolt 抽象類(4)BaseTransactionalBolt 抽象類(5)BaseBasicBolt 抽象類4.1.2 spout 水龍頭Spout 的最頂層抽象是 ISpout 接口(1)Open()是初始化方法(2)close()在該 spout 關(guān)閉前執(zhí)行,但是并不能得到保證其一定被執(zhí)行,kill -9 時不執(zhí)行,Storm killtopo
33、Name 時執(zhí)行(3)activate()當(dāng)Spout 已經(jīng)從失效模式中激活時被調(diào)用。該 Spout 的nextTuple()方法很快就會被調(diào)用。(4)deactivate ()當(dāng) Spout 已經(jīng)失效時被調(diào)用。在 Spout 失效期間,nextTuple被調(diào)用。Spout 將來可能會也可能被重新激活。(5)nextTuple()當(dāng)調(diào)用 nextTuple()方法時,Storm 要求Spout 發(fā)射元組到輸出收集器(OutputCollecctor)。NextTuple 方法應(yīng)該是非阻塞的,所以,如果 Spout 沒有元組可以發(fā)射,該方法應(yīng)該返回。nextTuple()、ack()和 fail
34、()方法都在 Spout 任務(wù)的單一線程內(nèi)緊密循環(huán)被調(diào)用。當(dāng)沒有元組可以發(fā)射時,可以讓 nextTuple 去 sleep 很短的時間,例如 1 毫秒,這樣就浪費太多的 CPU。(6)ack()成功處理 tuple 回調(diào)方法(7)fail()處理失敗 tuple 回調(diào)方法原則:通常情況下(Shell 和事務(wù)型的除外),實現(xiàn)一個 Spout,可以直接實現(xiàn)接口IRichSpout,如果不想寫多余的代碼,可以直接繼承 BaseRichSpout。4.1.3 bolt 轉(zhuǎn)接頭bolt 的最頂層抽象是 IBolt 接口(1)prepare()prepare ()方法在集群的工作進程內(nèi)被初始化時被調(diào)用,提
35、供了 Bolt 執(zhí)行所需要的環(huán)境。(2)execute()接受一個 tuple 進行處理,也可 emit 數(shù)據(jù)到下一級組件。(3)cleanup()Cleanup 方法當(dāng)一個 IBolt 即將關(guān)閉時被調(diào)用。不能保證 cleanup()方法一定會被調(diào)用,因為 Supervisor 可以對集群的工作進程使用 kill -9 命令強制殺死進程命令。如果在本地模式下運行 Storm,當(dāng)拓撲被殺死的時候,可以保證 cleanup()方法一定會被調(diào)用。實現(xiàn)一個 Bolt,可以實現(xiàn) IRichBolt 接口或繼承 BaseRichBolt,如果不想自己處理結(jié)果反饋,可以實現(xiàn) IBasicBolt 接口或繼承
36、 BaseBasicBolt,它實際上相當(dāng)于自動做了 prepare 方法和 collector.emit.ack(inputTuple)。4.1.4 spout 的 tail 特性Storm 可以實時監(jiān)測文件數(shù)據(jù),當(dāng)文件數(shù)據(jù)變化時,Storm 自動。4.2日志處理案例4.2.1 實操環(huán)境準備1)打開 eclipse,創(chuàng)建一個 java 工程2)在工程目錄中創(chuàng)建 lib 文件夾3)解壓 apache-storm-1.1.0,并把解壓后 lib 包下的文件到 java 工程的 lib 文件夾中。然后執(zhí)行 build path。4.2.2 需求 1:將接收到日志的會話 id 打印在控制臺1)需求:
37、(1)模擬的日志信息,包括:名稱、會話 id、時間等(2)將接收到日志的會話 id 打印到控制臺2)分析(1)創(chuàng)建日志工具類(2)在 spout 中日志文件,并一行一行發(fā)射出去(3)在 bolt 中將獲取到的一行一行數(shù)據(jù)的會話 id 獲取到,并打印到控制臺。(4)main 方法負責(zé)拼接 spout 和bolt 的拓撲。XXYH6YCGFJYERTT834R52FD3)案例實操(1)創(chuàng)建日志package com.atguigu.storm.weblog; import java.io.File;import java.io.FileOutputStream; import java.io.IO
38、Exception; import java.util.Random;/ 生成數(shù)據(jù)public class GenerateData public static void main(String args) File logFile = new File("e:/website.log"); Random random = new Random();/ 1名稱String hosts = "" ;/ 2 會話idStringsession_id="ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCG
39、FJYERTT834R52FDXV9U34","BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" ;/ 3時間String time = "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07(2)創(chuàng)建 spoutpackage com.atguigu.storm.weblog; im
40、port java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout;import org.apache.storm.topology.OutputFie
41、ldsDeclarer; import org.apache.storm.tuple.Fields;08:40:52", "2017-08-07 08:40:53","2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49","2017-08-07 12:40:49" ;/ 4 拼接日志StringBuffer sbBuffer = new StringBuffer(); for (int i = 0; i &
42、lt; 40; i+) sbBuffer.append(hosts0 + "t" + session_idrandom.nextInt(5) + "t" + timerandom.nextInt(8) + "n");/ 5 判斷l(xiāng)og 日志是否存在,不存在要創(chuàng)建if (!logFile.exists() try logFile.createNewFile(); catch (IOException e) System.out.println("Create logFile fail !");byte b = (sb
43、Buffer.toString().getBytes();/ 6 將拼接的日志信息寫到日志文件中FileOutputStream fs; try fs = new FileOutputStream(logFile); fs.write(b);fs.close();System.out.println("generate data over"); catch (Exception e) e.printStackTrace();import org.apache.storm.tuple.Values;public class WebLogSpout implements IRi
44、chSpout private static final long serialVersionUID = 1L; private BufferedReader br;private SpoutOutputCollector collector = null; private String str = null;Overridepublic void nextTuple() / 循環(huán)調(diào)用的方法try while (str = this.br.readLine() != null) / 發(fā)射出去collector.emit(new Values(str);/Thread.sleep(3000);
45、catch (Exception e) SuppressWarnings("rawtypes") Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) / 打開輸入的文件try this.collector = collector;this.br=newBufferedReader(newInputStreamReader(new FileInputStream("e:/website.log"), "UTF-8&q
46、uot;); catch (Exception e) e.printStackTrace();Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) /輸出字段類型declarer.declare(new Fields("log");Override(3)創(chuàng)建 boltpackage com.atguigu.storm.weblog; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.
47、apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;public class WebLogBolt implements IRichBolt private static final long serialVersionUID = 1L;
48、private OutputCollector collector = null; private int num = 0;private String valueString = null;Overridepublic void execute(Tuple input) try public void ack(Object arg0) Overridepublic void activate() Overridepublic void close() Overridepublic void deactivate() Overridepublic void fail(Object arg0)
49、Overridepublic Map<String, Object>ponentConfiguration() return null;/ 1 獲取傳遞過來的數(shù)據(jù)valueString = input.getStringByField("log");/ 2 如果輸入的數(shù)據(jù)不為空,行數(shù)+ if (valueString != null) num+;System.err.println(Thread.currentThread().getName() + "lines:" + num+ "session_id:" + valu
50、eString.split("t")1);/ 3 應(yīng)答 Spout 接收成功collector.ack(input);Thread.sleep(2000); catch (Exception e) / 4 應(yīng)答 Spout 接收失敗collector.fail(input);e.printStackTrace();SuppressWarnings("rawtypes") Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) thi
51、s.collector = collector;Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) /輸出字段類型declarer.declare(new Fields("");Overridepublic void cleanup() Overridepublic Map<String, Object>ponentConfiguration() return null;(4)創(chuàng)建 main4.2.3 需求 2:動態(tài)增加日志,查看控制臺打印信息(tail 特性)1)在需求
52、1 基礎(chǔ)上,運行程序。2)打開 website.log 日志文件,增加日志調(diào)試并保存。3)觀察控制臺打印的信息。package com.atguigu.storm.weblog; import org.apache.storm.Config; import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.topology.TopologyBuilder; public class WebLogMain public static void main(Stri
53、ng args) / 1 創(chuàng)建拓撲對象TopologyBuilder builder = new TopologyBuilder();/ 2 設(shè)置 Spout 和boltbuilder.setSpout("weblogspout", new WebLogSpout(), 1); builder.setBolt("weblogbolt",newWebLogBolt(),1).shuffleGrouping("weblogspout");/ 3 配置 Worker 開啟個數(shù)Config conf =new Config(); conf.s
54、etNumWorkers(4);if (args.length > 0) try / 4 分布式提交StormSubmitter.submitTopology(args0, conf, builder.createTopology(); catch (Exception e) e.printStackTrace();else / 5 本地模式提交LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("weblogtopology",conf,builder.createT
55、opology();結(jié)論:Storm 可以動態(tài)實時監(jiān)測文件的增加信息,并把信息到再處理。五 分組策略和并發(fā)度5.1文件案例思考1)spout 數(shù)據(jù)源:數(shù)據(jù)庫、文件、MQ(比如:Kafka)2)數(shù)據(jù)源是數(shù)據(jù)庫:只適合數(shù)據(jù)庫的配置文件3)數(shù)據(jù)源是文件:只適合測試、講課用(因為集群是分布式集群)4)企業(yè)產(chǎn)生的 log 文件處理步驟:(1)讀出內(nèi)容寫入 MQ(2)Storm 再處理5.2 分組策略(Stream Grouping)stream grouping 用來定義一個 stream 應(yīng)該如何分配給 Bolts 上面的多個 Executors(多線程、多并發(fā))。Storm 里面有 7 種類型的 stream grouping1)Shuffle Grouping: 隨機分組,輪詢,平均分配。隨機派發(fā) stream 里面的 tuple,保證每個bolt 接收到的 tuple 數(shù)目大致相同。2)Fields Grouping:按字段分組,比如按 userid 來分
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 浙江中醫(yī)藥大學(xué)濱江學(xué)院《醫(yī)患溝通與技巧》2023-2024學(xué)年第二學(xué)期期末試卷
- 圖木舒克職業(yè)技術(shù)學(xué)院《學(xué)前教育史》2023-2024學(xué)年第二學(xué)期期末試卷
- 濰坊環(huán)境工程職業(yè)學(xué)院《科研方法論》2023-2024學(xué)年第二學(xué)期期末試卷
- 厚、薄膜混合集成電路及消費類電路項目效益評估報告
- 浙江警官職業(yè)學(xué)院《地域史研究方法與實踐》2023-2024學(xué)年第二學(xué)期期末試卷
- 河池廣西河池市環(huán)江縣招聘教師29人筆試歷年參考題庫附帶答案詳解
- 演藝導(dǎo)演合同范本
- 山西農(nóng)業(yè)大學(xué)《工程力學(xué)A1》2023-2024學(xué)年第二學(xué)期期末試卷
- 福州英華職業(yè)學(xué)院《簡筆畫與繪本》2023-2024學(xué)年第二學(xué)期期末試卷
- 蘇州工藝美術(shù)職業(yè)技術(shù)學(xué)院《JAVA企業(yè)級開發(fā)》2023-2024學(xué)年第二學(xué)期期末試卷
- 外研版三年級起點四年級(下冊)英語集體備課教(學(xué))案
- 中華民族的形成發(fā)展
- 《如何做美篇》課件
- “一帶一路”視域下印度尼西亞中資企業(yè)所得稅返還案例解析
- 咨詢服務(wù)協(xié)議書范本:教育咨詢和培訓(xùn)
- 潔凈空調(diào)負荷計算表格
- 瀘州食品安全承諾書
- 《機械基礎(chǔ)》課程標準
- 大理市房地產(chǎn)市場調(diào)研報告
- 倉庫固定資產(chǎn)管理規(guī)范
- 企業(yè)關(guān)停方案
評論
0/150
提交評論