版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)
文檔簡介
1、 大數(shù)據(jù)實時處理系統(tǒng)技術(shù)方案基于Flume+Kafka+Storm+Redis構(gòu)建大數(shù)據(jù)實時處理系統(tǒng)目 錄 TOC o 1-3 h z u HYPERLINK l _Toc519406511 一、大數(shù)據(jù)處理的常用方法 PAGEREF _Toc519406511 h 3 HYPERLINK l _Toc519406512 二、實時處理系統(tǒng)架構(gòu) PAGEREF _Toc519406512 h 4 HYPERLINK l _Toc519406513 三、Flume+Kafka整合 PAGEREF _Toc519406513 h 6 HYPERLINK l _Toc519406514 四、Kafka+
2、Storm整合 PAGEREF _Toc519406514 h 17 HYPERLINK l _Toc519406515 五、Storm+Redis整合 PAGEREF _Toc519406515 h 30 HYPERLINK l _Toc519406516 六、數(shù)據(jù)可視化處理 PAGEREF _Toc519406516 h 52 HYPERLINK l _Toc519406517 七、總結(jié) PAGEREF _Toc519406517 h 58大數(shù)據(jù)處理的常用方法之前在 HYPERLINK /s?_biz=MzI4NTA1MDEwNg=&mid=2650768689&idx=1&sn=b4fc
3、ac0c738bbf5191c6eb096483945f&chksm=f3f930a4c48eb9b2ed3e7297b24b4da8e4ea03956c737a8fa57ce4758f224df5493269fc1edb&scene=21 l wechat_redirect t _blank 采集清洗處理:基于MapReduce的離線數(shù)據(jù)分析中已經(jīng)有提及到,這里依然給出下面的圖示:前面給出的那篇文章是基于MapReduce的離線數(shù)據(jù)分析案例,其通過對網(wǎng)站產(chǎn)生的用戶訪問日志進行處理并分析出該網(wǎng)站在某天的PV、UV等數(shù)據(jù)。對應(yīng)上面的圖示,其走的就是離線處理的數(shù)據(jù)處理方式,而這里即將要介紹的是另外
4、一條路線的數(shù)據(jù)處理方式,即基于Storm的在線處理。在下面給出的完整案例中,我們將會完成下面的幾項工作:如何一步步構(gòu)建我們的實時處理系統(tǒng)(Flume+Kafka+Storm+Redis)實時處理網(wǎng)站的用戶訪問日志,并統(tǒng)計出該網(wǎng)站的PV、UV將實時分析出的PV、UV動態(tài)地展示在我們的前面頁面上如果你對上面提及的大數(shù)據(jù)組件已經(jīng)有所認(rèn)識,或者對如何構(gòu)建大數(shù)據(jù)實時處理系統(tǒng)感興趣,那么就可以盡情閱讀下面的內(nèi)容了。需要注意的是,核心在于如何構(gòu)建實時處理系統(tǒng),而這里給出的案例是實時統(tǒng)計某個網(wǎng)站的PV、UV,在實際中,基于每個人的工作環(huán)境不同,業(yè)務(wù)不同,因此業(yè)務(wù)系統(tǒng)的復(fù)雜度也不盡相同,相對來說,這里統(tǒng)計PV、
5、UV的業(yè)務(wù)是比較簡單的,但也足夠讓我們對大數(shù)據(jù)實時處理系統(tǒng)有一個基本的、清晰的了解與認(rèn)識,是的,它不再那么神秘了。實時處理系統(tǒng)架構(gòu)我們的實時處理系統(tǒng)整體架構(gòu)如下:即從上面的架構(gòu)中我們可以看出,其由下面的幾部分構(gòu)成:Flume集群Kafka集群Storm集群從構(gòu)建實時處理系統(tǒng)的角度出發(fā),我們需要做的是讓數(shù)據(jù)在各個不同的集群系統(tǒng)之間打通(從上面的圖示中也能很好地說明這一點),即需要做各個系統(tǒng)之前的整合,包括Flume與Kafka的整合,Kafka與Storm的整合。當(dāng)然,各個環(huán)境是否使用集群,依個人的實際需要而定,在我們的環(huán)境中,F(xiàn)lume、Kafka、Storm都使用集群。Flume+Kafka
6、整合1整合思路對于Flume而言,關(guān)鍵在于如何采集數(shù)據(jù),并且將其發(fā)送到Kafka上,并且由于我們這里了使用Flume集群的方式,F(xiàn)lume集群的配置也是十分關(guān)鍵的。而對于Kafka,關(guān)鍵就是如何接收來自Flume的數(shù)據(jù)。從整體上講,邏輯應(yīng)該是比較簡單的,在Kafka中創(chuàng)建一個用于我們實時處理系統(tǒng)的topic,然后Flume將其采集到的數(shù)據(jù)發(fā)送到該topic上即可。2整合過程整合過程:Flume集群配置與Kafka Topic創(chuàng)建。Flume集群配置在我們的場景中,兩個Flume Agent分別部署在兩臺Web服務(wù)器上,用來采集Web服務(wù)器上的日志數(shù)據(jù),然后其數(shù)據(jù)的下沉方式都為發(fā)送到另外一個Fl
7、ume Agent上,所以這里我們需要配置三個Flume Agent。Flume Agent01該Flume Agent部署在一臺Web服務(wù)器上,用來采集產(chǎn)生的Web日志,然后發(fā)送到Flume Consolidation Agent上,創(chuàng)建一個新的配置文件flume-sink-avro.conf,其配置內(nèi)容如下:#主要作用是監(jiān)聽文件中的新增數(shù)據(jù),采集到數(shù)據(jù)之后,輸出到avro# 注意:Flume agent的運行,主要就是配置source channel sink# 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1#a1.sources = r1a1.s
8、inks = k1a1.channels = c1#對于source的配置描述 監(jiān)聽文件中的新增數(shù)據(jù) execa1.sources.r1.type = execmand = tail -F /home/uplooking/data/data-clean/data-access.log#對于sink的配置描述 使用avro日志做數(shù)據(jù)的消費a1.sinks.k1.type = avroa1.sinks.k1.hostname = uplooking03a1.sinks.k1.port = 44444#對于channel的配置描述 使用文件做數(shù)據(jù)的臨時緩存 這種的安全性要高a1.channels.c
9、1.type = filea1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpointa1.channels.c1.dataDirs = /home/uplooking/data/flume/data#通過channel c1將source r1和sink k1關(guān)聯(lián)起來a1.sources.r1.channels = c1a1.sinks.k1.channel = c1配置完成后, 啟動Flume Agent,即可對日志文件進行監(jiān)聽:$ flume-ng agent -conf conf -n a1 -f app/fl
10、ume/conf/flume-sink-avro.conf /dev/null 2&1 &Flume Agent02該Flume Agent部署在一臺Web服務(wù)器上,用來采集產(chǎn)生的Web日志,然后發(fā)送到Flume Consolidation Agent上,創(chuàng)建一個新的配置文件flume-sink-avro.conf,其配置內(nèi)容如下:#主要作用是監(jiān)聽文件中的新增數(shù)據(jù),采集到數(shù)據(jù)之后,輸出到avro# 注意:Flume agent的運行,主要就是配置source channel sink# 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1#a1.source
11、s = r1a1.sinks = k1a1.channels = c1#對于source的配置描述 監(jiān)聽文件中的新增數(shù)據(jù) execa1.sources.r1.type = execmand = tail -F /home/uplooking/data/data-clean/data-access.log#對于sink的配置描述 使用avro日志做數(shù)據(jù)的消費a1.sinks.k1.type = avroa1.sinks.k1.hostname = uplooking03a1.sinks.k1.port = 44444#對于channel的配置描述 使用文件做數(shù)據(jù)的臨時緩存 這種的安全性要高a1.
12、channels.c1.type = filea1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpointa1.channels.c1.dataDirs = /home/uplooking/data/flume/data#通過channel c1將source r1和sink k1關(guān)聯(lián)起來a1.sources.r1.channels = c1a1.sinks.k1.channel = c1配置完成后, 啟動Flume Agent,即可對日志文件進行監(jiān)聽:$ flume-ng agent -conf conf -n a1
13、 -f app/flume/conf/flume-sink-avro.conf /dev/null 2&1 &Flume Consolidation Agent該Flume Agent用于接收其它兩個Agent發(fā)送過來的數(shù)據(jù),然后將其發(fā)送到Kafka上,創(chuàng)建一個新的配置文件flume-source_avro-sink_kafka.conf,配置內(nèi)容如下:#主要作用是監(jiān)聽目錄中的新增文件,采集到數(shù)據(jù)之后,輸出到kafka# 注意:Flume agent的運行,主要就是配置source channel sink# 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫
14、k1#a1.sources = r1a1.sinks = k1a1.channels = c1#對于source的配置描述 監(jiān)聽avroa1.sources.r1.type = avroa1.sources.r1.bind = a1.sources.r1.port = 44444#對于sink的配置描述 使用kafka做數(shù)據(jù)的消費a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = f-k-sa1.sinks.k1.brokerList = uplooking01:9092,uplooking02
15、:9092,uplooking03:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 20#對于channel的配置描述 使用內(nèi)存緩沖區(qū)域做數(shù)據(jù)的臨時緩存a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#通過channel c1將source r1和sink k1關(guān)聯(lián)起來a1.sources.r1.channels = c1a1.sinks.k1.channel = c1 配置完成后,
16、啟動Flume Agent,即可對avro的數(shù)據(jù)進行監(jiān)聽:$ flume-ng agent -conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf /dev/null 2&1 &Kafka配置在我們的Kafka中,先創(chuàng)建一個topic,用于后面接收Flume采集過來的數(shù)據(jù):kafka-topics.sh -create -topic f-k-s -zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 -partitions 3 -replicat
17、ion-factor 33整合驗證啟動Kafka的消費腳本:$ kafka-console-consumer.sh -topic f-k-s -zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181如果在Web服務(wù)器上有新增的日志數(shù)據(jù),就會被我們的Flume程序監(jiān)聽到,并且最終會傳輸?shù)降終afka的f-k-stopic中,這里作為驗證,我們上面啟動的是一個Kafka終端消費的腳本,這時會在終端中看到數(shù)據(jù)的輸出:這樣的話,我們的整合就沒有問題,當(dāng)然Kafka中的數(shù)據(jù)應(yīng)該是由我們的Storm來進行消費的,這里只是作為整合的一個測試,
18、下面就會來做Kafka+Storm的整合。Kafka+Storm整合Kafka和Storm的整合其實在Storm的官網(wǎng)上也有非常詳細(xì)清晰的文檔:/releases/1.0.6/storm-kafka.html想對其有更多了解的同學(xué)可以參考一下。1整合思路在這次的大數(shù)據(jù)實時處理系統(tǒng)的構(gòu)建中,Kafka相當(dāng)于是作為消息隊列(或者說是消息中間件)的角色,其產(chǎn)生的消息需要有消費者去消費,所以Kafka與Storm的整合,關(guān)鍵在于我們的Storm如何去消費Kafka消息topic中的消息(Kafka消息topic中的消息正是由Flume采集而來,現(xiàn)在我們需要在Storm中對其進行消費)。在Storm中,
19、topology是非常關(guān)鍵的概念。對比MapReduce,在MapReduce中,我們提交的作業(yè)稱為一個Job,在一個Job中,又包含若干個Mapper和Reducer,正是在Mapper和Reducer中有我們對數(shù)據(jù)的處理邏輯:在Storm中,我們提交的一個作業(yè)稱為topology,其又包含了spout和bolt,在Storm中,對數(shù)據(jù)的處理邏輯正是在spout和bolt中體現(xiàn):即在spout中,正是我們數(shù)據(jù)的來源,又因為其實時的特性,所以可以把它比作一個“水龍頭”,表示其源源不斷地產(chǎn)生數(shù)據(jù):所以,問題的關(guān)鍵是spout如何去獲取來自Kafka的數(shù)據(jù)?好在,Storm-Kafka的整合庫中提
20、供了這樣的API來供我們進行操作。2整合過程整合過程應(yīng)用了KafkaSpout。在代碼的邏輯中只需要創(chuàng)建一個由Storm-KafkaAPI提供的KafkaSpout對象即可:SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);return new KafkaSpout(spoutConf);下面給出完整的整合代碼:package cn.xpleaf.bigdata.storm.statics;import kafka.api.OffsetRequest;import org.apache.storm.Config;
21、import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.StormTopology;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.ap
22、ache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Tuple;/* Kafka和storm的整合,用于統(tǒng)計實時流量對應(yīng)的pv和uv*/public class KafkaStormTopology
23、/ static class MyKafkaBolt extends BaseRichBolt static class MyKafkaBolt extends BaseBasicBolt /* kafkaSpout發(fā)送的字段名為bytes*/Overridepublic void execute(Tuple input, BasicOutputCollector collector) byte binary = input.getBinary(0); / 跨jvm傳輸數(shù)據(jù),接收到的是字節(jié)數(shù)據(jù)/ byte bytes = input.getBinaryByField(bytes); / 這種方
24、式也行String line = new String(binary);System.out.println(line);Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) public static void main(String args) throws Exception TopologyBuilder builder = new TopologyBuilder();/* 設(shè)置spout和bolt的dag(有向無環(huán)圖)*/KafkaSpout kafkaSpout = createKafkaSpo
25、ut();builder.setSpout(id_kafka_spout, kafkaSpout);builder.setBolt(id_kafka_bolt, new MyKafkaBolt().shuffleGrouping(id_kafka_spout); / 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,來指定數(shù)據(jù)的上游組件/ 使用builder構(gòu)建topologyStormTopology topology = builder.createTopology();String topologyName = KafkaStormTopology.class.getSimpleName(); / 拓?fù)涞拿QCo
26、nfig config = new Config(); / Config()對象繼承自HashMap,但本身封裝了一些基本的配置/ 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitterif (args = null | args.length 1) / 沒有參數(shù)時使用本地模式,有參數(shù)時使用集群模式LocalCluster localCluster = new LocalCluster(); / 本地開發(fā)模式,創(chuàng)建的對象為LocalClusterlocalCluster.submitTopology(topologyName, config, to
27、pology); else StormSubmitter.submitTopology(topologyName, config, topology);/* BrokerHosts hosts kafka集群列表* String topic 要消費的topic主題* String zkRoot kafka在zk中的目錄(會在該節(jié)點目錄下記錄讀取kafka消息的偏移量)* String id 當(dāng)前操作的標(biāo)識id*/private static KafkaSpout createKafkaSpout() String brokerZkStr = uplooking01:2181,uplooking
28、02:2181,uplooking03:2181;BrokerHosts hosts = new ZkHosts(brokerZkStr); / 通過zookeeper中的/brokers即可找到kafka的地址String topic = f-k-s;String zkRoot = / + topic;String id = consumer-id;SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);/ 本地環(huán)境設(shè)置之后,也可以在zk中建立/f-k-s節(jié)點,在集群環(huán)境中,不用配置也可以在zk中建立/f-k-s節(jié)點
29、/spoutConf.zkServers = Arrays.asList(new Stringuplooking01, uplooking02, uplooking03);/spoutConf.zkPort = 2181;spoutConf.startOffsetTime = OffsetRequest.LatestTime(); / 設(shè)置之后,剛啟動時就不會把之前的消費也進行讀取,會從最新的偏移量開始讀取return new KafkaSpout(spoutConf);其實代碼的邏輯非常簡單,我們只創(chuàng)建了 一個由Storm-Kafka提供的KafkaSpout對象和一個包含我們處理邏輯的My
30、KafkaBolt對象,MyKafkaBolt的邏輯也很簡單,就是把Kafka的消息打印到控制臺上。需要注意的是,后面我們分析網(wǎng)站PV、UV的工作,正是在上面這部分簡單的代碼中完成的,所以其是非常重要的基礎(chǔ)。3整合驗證上面的整合代碼,可以在本地環(huán)境中運行,也可以將其打包成jar包上傳到我們的Storm集群中并提交業(yè)務(wù)來運行。如果Web服務(wù)器能夠產(chǎn)生日志,并且前面Flume+Kafka的整合也沒有問題的話,將會有下面的效果。如果是在本地環(huán)境中運行上面的代碼,那么可以在控制臺中看到日志數(shù)據(jù)的輸出:如果是在Storm集群中提交的作業(yè)運行,那么也可以在Storm的日志中看到Web服務(wù)器產(chǎn)生的日志數(shù)據(jù):
31、這樣的話就完成了Kafka+Storm的整合。Storm+Redis整合1整合思路其實所謂Storm和Redis的整合,指的是在我們的實時處理系統(tǒng)中的數(shù)據(jù)的落地方式,即在Storm中包含了我們處理數(shù)據(jù)的邏輯,而數(shù)據(jù)處理完畢后,產(chǎn)生的數(shù)據(jù)處理結(jié)果該保存到什么地方呢?顯然就有很多種方式了,關(guān)系型數(shù)據(jù)庫、NoSQL、HDFS、HBase等,這應(yīng)該取決于具體的業(yè)務(wù)和數(shù)據(jù)量,在這里,我們使用Redis來進行最后分析數(shù)據(jù)的存儲。所以實際上做這一步的整合,其實就是開始寫我們的業(yè)務(wù)處理代碼了,因為通過前面Flume-Kafka-Storm的整合,已經(jīng)打通了整個數(shù)據(jù)的流通路徑,接下來關(guān)鍵要做的是,在Storm中
32、,如何處理我們的數(shù)據(jù)并保存到Redis中。而在Storm中,spout已經(jīng)不需要我們來寫了(由Storm-Kafka的API提供了KafkaSpout對象),所以問題就變成,如何根據(jù)業(yè)務(wù)編寫分析處理數(shù)據(jù)的bolt。2整合過程整合過程:編寫Storm業(yè)務(wù)處理Bolt。日志分析我們實時獲取的日志格式如下:其中需要說明的是第二個字段和第三個字段,因為它對我們統(tǒng)計PV和UV非常有幫助,它們分別是ip字段和mid字段,說明如下:ip:用戶的IP地址mid:唯一的id,此id第一次會種在瀏覽器的cookie里。如果存在則不再種。作為瀏覽器唯一標(biāo)示。移動端或者pad直接取機器碼。因此,根據(jù)IP地址,我們可以
33、通過查詢得到其所在的省份,并且創(chuàng)建一個屬于該省份的變量,用于記錄pv數(shù),每來一條屬于該省份的日志記錄,則該省份的PV就加1,以此來完成pv的統(tǒng)計。而對于mid,我們則可以創(chuàng)建屬于該省的一個set集合,每來一條屬于該省份的日志記錄,則可以將該mid添加到set集合中,因為set集合存放的是不重復(fù)的數(shù)據(jù),這樣就可以幫我們自動過濾掉重復(fù)的mid,根據(jù)set集合的大小,就可以統(tǒng)計出UV。在我們storm的業(yè)務(wù)處理代碼中,我們需要編寫兩個bolt:第一個bolt用來對數(shù)據(jù)進行預(yù)處理,也就是提取我們需要的ip和mid,并且根據(jù)IP查詢得到省份信息;第二個bolt用來統(tǒng)計PV、UV,并定時將PV、UV數(shù)據(jù)寫
34、入到Redis中。當(dāng)然上面只是說明了整體的思路,實際上還有很多需要注意的細(xì)節(jié)問題和技巧問題,這都在我們的代碼中進行體現(xiàn),我在后面寫的代碼中都加了非常詳細(xì)的注釋進行說明。編寫第一個Bolt:ConvertIPBolt根據(jù)上面的分析,編寫用于數(shù)據(jù)預(yù)處理的bolt,代碼如下:package cn.xpleaf.bigdata.storm.statistic;import cn.xpleaf.bigdata.storm.utils.JedisUtil;import org.apache.storm.topology.BasicOutputCollector;import org.apache.stor
35、m.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import redis.clients.jedis.Jedis;/* 日志數(shù)據(jù)預(yù)處理Bolt,實現(xiàn)功能:* 1.提取實現(xiàn)業(yè)務(wù)需求所需要的信息:ip地址、客戶端唯一標(biāo)識mid* 2.查詢IP地址所屬地,并發(fā)送到下
36、一個Bolt*/public class ConvertIPBolt extends BaseBasicBolt Overridepublic void execute(Tuple input, BasicOutputCollector collector) byte binary = input.getBinary(0);String line = new String(binary);String fields = line.split(t);if(fields = null | fields.length 10) return;/ 獲取ip和midString ip = fields1;
37、String mid = fields2;/ 根據(jù)ip獲取其所屬地(省份)String province = null;if (ip != null) Jedis jedis = JedisUtil.getJedis();province = jedis.hget(ip_info_en, ip);/ 需要釋放jedis的資源,否則會報can not get resource from the poolJedisUtil.returnJedis(jedis);/ 發(fā)送數(shù)據(jù)到下一個bolt,只發(fā)送實現(xiàn)業(yè)務(wù)功能需要的province和midcollector.emit(new Values(prov
38、ince, mid);/* 定義了發(fā)送到下一個bolt的數(shù)據(jù)包含兩個域:province和mid*/Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(province, mid);編寫第二個Bolt:StatisticBolt這個bolt包含我們統(tǒng)計網(wǎng)站PV、UV的代碼邏輯,因此非常重要,其代碼如下:package cn.xpleaf.bigdata.storm.statistic;import cn.xpleaf.bigdata.storm.ut
39、ils.JedisUtil;import org.apache.storm.Config;import org.apache.storm.Constants;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Tuple;import redis.clients.jedi
40、s.Jedis;import java.text.SimpleDateFormat;import java.util.*;/* 日志數(shù)據(jù)統(tǒng)計Bolt,實現(xiàn)功能:* 1.統(tǒng)計各省份的PV、UV* 2.以天為單位,將省份對應(yīng)的PV、UV信息寫入Redis*/public class StatisticBolt extends BaseBasicBolt Map pvMap = new HashMap();MapString, HashSet midsMap = null;SimpleDateFormat sdf = new SimpleDateFormat(yyyyMMdd);Overridepu
41、blic void execute(Tuple input, BasicOutputCollector collector) if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) / 如果收到非系統(tǒng)級別的tuple,統(tǒng)計信息到局部變量midsString province = input.getStringByField(province);String mid = input.getStringByField(mid);pvMap.put(province, pvMap.get(prov
42、ince) + 1); / pv+1if(mid != null) midsMap.get(province).add(mid); / 將mid添加到該省份所對應(yīng)的set中 else / 如果收到系統(tǒng)級別的tuple,則將數(shù)據(jù)更新到Redis中,釋放JVM堆內(nèi)存空間/* 以 廣東 為例,其在Redis中保存的數(shù)據(jù)格式如下:* guangdong_pv(Redis數(shù)據(jù)結(jié)構(gòu)為hash)* -20180415* -pv數(shù)* -20180416* -pv數(shù)* guangdong_mids_20180415(Redis數(shù)據(jù)結(jié)構(gòu)為set)* -mid* -mid* -mid* * guangdong_mi
43、ds_20180415(Redis數(shù)據(jù)結(jié)構(gòu)為set)* -mid* -mid* -mid* */Jedis jedis = JedisUtil.getJedis();String dateStr = sdf.format(new Date();/ 更新pvMap數(shù)據(jù)到Redis中String pvKey = null;for(String province : pvMap.keySet() int currentPv = pvMap.get(province);if(currentPv 0) / 當(dāng)前map中的pv大于0才更新,否則沒有意義pvKey = province + _pv;Stri
44、ng oldPvStr = jedis.hget(pvKey, dateStr);if(oldPvStr = null) oldPvStr = 0;Long oldPv = Long.valueOf(oldPvStr);jedis.hset(pvKey, dateStr, oldPv + currentPv + );pvMap.replace(province, 0); / 將該省的pv重新設(shè)置為0/ 更新midsMap到Redis中String midsKey = null;HashSet midsSet = null;for(String province: midsMap.keySet(
45、) midsSet = midsMap.get(province);if(midsSet.size() 0) / 當(dāng)前省份的set的大小大于0才更新到,否則沒有意義midsKey = province + _mids_ + dateStr;jedis.sadd(midsKey, midsSet.toArray(new StringmidsSet.size();midsSet.clear();/ 釋放jedis資源JedisUtil.returnJedis(jedis);System.out.println(System.currentTimeMillis() + 寫入數(shù)據(jù)到Redis);Ove
46、rridepublic void declareOutputFields(OutputFieldsDeclarer declarer) /* 設(shè)置定時任務(wù),只對當(dāng)前bolt有效,系統(tǒng)會定時向StatisticBolt發(fā)送一個系統(tǒng)級別的tuple*/Overridepublic Map getComponentConfiguration() Map config = new HashMap();config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);return config;/* 初始化各個省份的pv和mids信息(用來臨時存儲統(tǒng)計pv和uv需
47、要的數(shù)據(jù))*/public StatisticBolt() pvMap = new HashMap();midsMap = new HashMapString, HashSet();String provinceArray = shanxi, jilin, hunan, hainan, xinjiang, hubei, zhejiang, tianjin, shanghai,anhui, guizhou, fujian, jiangsu, heilongjiang, aomen, beijing, shaanxi, chongqing,jiangxi, guangxi, gansu, guan
48、gdong, yunnan, sicuan, qinghai, xianggang, taiwan,neimenggu, henan, shandong, shanghai, hebei, liaoning, xizang;for(String province : provinceArray) pvMap.put(province, 0);midsMap.put(province, new HashSet();(上下滑動可查看完整代碼)編寫Topology我們需要編寫一個topology用來組織前面編寫的Bolt,代碼如下:package cn.xpleaf.bigdata.storm.st
49、atistic;import kafka.api.OffsetRequest;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.StormTopology;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm
50、.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.topology.TopologyBuilder;/* 構(gòu)建topology*/public class StatisticTopology public static void main(String args) throws Exception TopologyBuilder builder = new TopologyBuilder();/* 設(shè)置spout和bolt的dag(有向無環(huán)圖)*/KafkaSpout kafkaSp
51、out = createKafkaSpout();builder.setSpout(id_kafka_spout, kafkaSpout);builder.setBolt(id_convertIp_bolt, new ConvertIPBolt().shuffleGrouping(id_kafka_spout); / 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,來指定數(shù)據(jù)的上游組件builder.setBolt(id_statistic_bolt, new StatisticBolt().shuffleGrouping(id_convertIp_bolt); / 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,來指定數(shù)據(jù)的上游組件/ 使
52、用builder構(gòu)建topologyStormTopology topology = builder.createTopology();String topologyName = KafkaStormTopology.class.getSimpleName(); / 拓?fù)涞拿QConfig config = new Config(); / Config()對象繼承自HashMap,但本身封裝了一些基本的配置/ 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitterif (args = null | args.length 1) / 沒有參數(shù)時使用本
53、地模式,有參數(shù)時使用集群模式LocalCluster localCluster = new LocalCluster(); / 本地開發(fā)模式,創(chuàng)建的對象為LocalClusterlocalCluster.submitTopology(topologyName, config, topology); else StormSubmitter.submitTopology(topologyName, config, topology);/* BrokerHosts hosts kafka集群列表* String topic 要消費的topic主題* String zkRoot kafka在zk中的目
54、錄(會在該節(jié)點目錄下記錄讀取kafka消息的偏移量)* String id 當(dāng)前操作的標(biāo)識id*/private static KafkaSpout createKafkaSpout() String brokerZkStr = uplooking01:2181,uplooking02:2181,uplooking03:2181;BrokerHosts hosts = new ZkHosts(brokerZkStr); / 通過zookeeper中的/brokers即可找到kafka的地址String topic = f-k-s;String zkRoot = / + topic;String id = consumer-id;SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);/ 本地環(huán)境設(shè)置之后,也可以在zk中建立/f-k-s節(jié)點,在集群環(huán)境中,不用配置也可以在zk中建立/f-k-s節(jié)點/spoutConf.zkServers = Arrays.asList(new Stringuplooking01, uplooking02, uplooking03);/spoutConf.zkPort = 2181;spoutConf.star
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年安徽蚌埠南棠實業(yè)有限公司招聘筆試參考題庫含答案解析
- 2025年貴州金海開發(fā)投資有限公司招聘筆試參考題庫含答案解析
- 2025年湘師大新版選擇性必修2語文下冊階段測試試卷含答案
- 2025年北師大新版高三地理上冊階段測試試卷
- 2025年度汽車租賃車輛安全性能檢測合同標(biāo)準(zhǔn)2篇
- 2024年度青海省公共營養(yǎng)師之四級營養(yǎng)師考前沖刺試卷B卷含答案
- 2024-2025學(xué)年高中歷史第四單元內(nèi)憂外患與中華民族的奮起第16課五四愛國運動學(xué)案含解析岳麓版必修1
- 2024-2025學(xué)年高中歷史專題五現(xiàn)代中國的對外關(guān)系5.2外交關(guān)系的突破課時作業(yè)含解析人民版必修1
- 2024年度黑龍江省公共營養(yǎng)師之三級營養(yǎng)師強化訓(xùn)練試卷B卷附答案
- 2025年度新型智能家電租賃合同范本3篇
- 高考滿分作文常見結(jié)構(gòu)完全解讀
- 理光投影機pj k360功能介紹
- 六年級數(shù)學(xué)上冊100道口算題(全冊完整版)
- 八年級數(shù)學(xué)下冊《第十九章 一次函數(shù)》單元檢測卷帶答案-人教版
- 帕薩特B5維修手冊及帕薩特B5全車電路圖
- 系統(tǒng)解剖學(xué)考試重點筆記
- 小學(xué)五年級解方程應(yīng)用題6
- 云南省地圖含市縣地圖矢量分層地圖行政區(qū)劃市縣概況ppt模板
- 年月江西省南昌市某綜合樓工程造價指標(biāo)及
- 作物栽培學(xué)課件棉花
評論
0/150
提交評論