




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
1、第1章實(shí)時(shí)處理模塊1.1模塊搭建添加scala框架1.2代碼思路1)消費(fèi)kafka中的數(shù)據(jù);2)利用redis過濾當(dāng)日已經(jīng)計(jì)入的日活設(shè)備;3) 把每批次新增的當(dāng)日日活信息保存到HBASE或ES 中;4)從ES中查詢出數(shù)據(jù),發(fā)布成數(shù)據(jù)接口,通可視化化工程調(diào)用。1.3代碼開發(fā)1 -消費(fèi)Kafka1.3.1配置1) perties# Kafka 配置kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092# Redis 配置redis.host=hadoop102rdis.port=6379 2) pom.xml
2、 com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.apache.spark spark-core_2.11 org.apache.spark spark-stream in g_2.11 org.apache.kafka kafka-clie nts org.apache.spark spark-stream in g-kafka-0-8_2.11 redis.clie nts jedis 2.9.0io.searchboxjestvversion 5.3.3 net.java.dev.j naj na 4.5.2org
3、.codehaus.ja nino com mon s-compiler 2.7.8!-該插件用于將Scala代碼編譯成n et.alchim31.mave n scala-mave n-plugi n 3.2.2!-聲明綁定到mavencompile testCompile1.3.2工具類class 文件-的 compile 階段-1 ) MykafkaUtilpackage com.atguigu.utilsimport java.util.Propertiesimport kafka.serializer.Stri ngDecoderimport org.apache.spark.str
4、eami ng.Streami ngCon textimport org.apache.spark.streami ng.dstrea m.ln putDStreamimport org.apache.spark.streami ng.kafka.KafkaUtilsobject MyKafkaUtil def getKafkaStream(ssc:Stream ingCon text,In putDStream(Stri ng, Stri ng) = topics:SetStri ng):valproperties:Properties=PropertiesUtil.load(c on fi
5、perties)val kafkaPara = Map(bootstrap.servers-properties.getProperty(kafka.broker.list),group.id - bigdata0408)/ 基于Direct方式消費(fèi)Kafka數(shù)據(jù)valkafkaDStream:In putDStream(Stri ng,Strin g)=KafkaUtils.createDirectStreamStri ng,String,Strin gDecoder,Strin gDecoder(ssc, kafkaPara, topics)/ 返回kafkaDStream2 )
6、 PropertiesUtilimport java.i o.ln putStreamReaderimport java.util.Propertiesobject PropertiesUtil def load(propertieName:Stri ng): Properties =val prop=new Properties。;prop .lo ad( newIn putStreamReader(Thread.curre ntThread().getC on textClassLoader.ge tResourceAsStream(propertieName) , UTF-8)prop
7、3 ) RedisUtil object RedisUtil var jedisPool:JedisPoo l=n ulldef getJedisClie nt: Jedis = if(jedisPool=null)printin(”開辟一個(gè)連接池)val config = PropertiesUtil.load(c on perties) val host = con fig.getProperty(redis.host)val port = con fig.getProperty(redis.port)最大連接數(shù)最大空閑最小空閑忙碌時(shí)是否等待忙碌時(shí)等待時(shí)長毫秒 每次獲得連接的
8、進(jìn)行測(cè)試val jedisPoolConfig = new JedisPoolConfig() jedisPoolC on fig.setMaxTotal(IOO) / jedisPoolCo nfig.setMaxldle(20) / jedisPoolCo nfig.setMi nldle(20) / jedisPoolCo nfig.setBlockWhe nExhausted(true) / jedisPoolC on fig.setMaxWaitMillis(500) / jedisPoolC on fig.setTest On Borrow(true) /jedisPoo l=ne
9、w JedisPool(jedisPoolC on fig,host,port.to Int)/ prin tl n(sjedisPool.getNumActive= $jedisPool.getNumActive)/ println(”獲得一個(gè)連接)jedisPool.getResource1.3.3樣例類Startuplogcase class StartUpLog(mid:Stri ng, uid:Stri ng, appid:Stri ng, area:Stri ng, os:Stri ng, ch:Stri ng, logType:Stri ng, vs:Stri ng, var l
10、ogDate:Stri ng, var logHour:Stri ng, var ts:L ong)1.3.4業(yè)務(wù)類消費(fèi)kafkaimport org.apache.phoe ni x.spark._object RealtimeStartupApp def ma in (args: ArrayStri ng): Unit = valsparkC onf:SparkC onfSparkCo nf().setMaster(local*).setAppName(gmall2019)val sc = new SparkC on text(sparkC onf)val ssc = new Stream
11、 ingCon text(sc,Sec on ds(10)newvalstartupStream:String MyKafkaUtil.getKafkaStream(ssc ARTUP)In putDStreamC on sumerRecordStri ng,Set(GmallCo nsta nts.KAFKA_TOPIC_ST/startupStream.map(_.value().foreachRDD rdd=/printin (rdd.collectOkStri ng(n)/valstartupLogDstream:startupStream.map(_.value().map log
12、=/ println( slog = $log)valstartUpLog:StartUpLogDStreamStartUpLog= JSON.parseObject(log,classOfStartUpLog) startUpLog1.4代碼開發(fā)2 -去重1.4.1流程圖142 設(shè)計(jì) Redis 的 KVkeyvaluedau:2019-01-22設(shè)備id143業(yè)務(wù)代碼import java.utilimport java.text.SimpleDateFormatimport java.util.Dateimport com.alibaba.fastjs on JSONimport com
13、.atguigu.gmall.c on sta nt.GmallC on sta ntsimport com.atguigu.gmall2019.realtime.bea n. StartupLogimport com.atguigu.gmall2019.realtime.util.MyKafkaUtil, RedisUtilimport org.apache.hadoop.c onf.Con figuratio nimport org.apache.kafka.clie nts.c on sumer.C on sumerRecordimport org.apache.spark.SparkC
14、 onfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.stream in g.dstream.DStream, In putDStreamimport org.apache.spark.stream in g.Sec on ds, Stream ingCon textimport redis.clie nts.jedis.Jedisimport org.apache.phoe ni x.spark._object DauApp def main(
15、 args: ArrayStri ng): Unit = valsparkC onf:SparkC onf=newSparkCo nf().setMaster(local*).setAppName(dau_app)val ssc = new Stream ingCon text(sparkC on f,Sec on ds(5)/ 1 消費(fèi) kafkaval inputDstream: InputDStreamConsumerRecordString, StringMyKafkaUtil.getKafkaStream(ssc,Set(GmallCo nsta nts.KAFKA_TOPIC_ST
16、ARTUP)2數(shù)據(jù)流轉(zhuǎn)換結(jié)構(gòu)變成case class補(bǔ)充兩個(gè)時(shí)間字段val startuplogDstream:DStreamStartupLog= in putDstream.map record =val json Str: String = record.value()valstartupLog:StartupLog=JSON.parseObject(js on Str,classOfStartupLog)valdateTimeStr:Stri ng= new SimpleDateFormat(yyyy-MM-ddHH).format(new Date(startupLog.ts)val
17、 dateArr: ArrayString = dateTimeStr.split(”)startupLog .lo gDate = dateArr(0)startupLog .lo gHour = dateArr(1) startupLogstartuplogDstream.cache()3利用用戶清單進(jìn)行過濾去重只保留清單中不存在的用戶訪問記錄valfilteredDstream:DStreamStartupLogstartuplogDstream.tra nsform rdd =按周期val jedis: Jedis = RedisUtil.getJedisClient /driver
18、/執(zhí)行valdateStr:Stri ng= new SimpleDateFormat(yyyy-MM-dd).format(new Date()val key = dau: + dateStrval dauMidSet: util.SetStri ng = jedis.smembers(key)jedis.close()valdauMidBC:Broadcastutil.SetStri ngssc.sparkC on text.broadcast(dauMidSet)prin tl n(”過濾前:” + rdd.cou nt()val filteredRDD: RDDStartupLog =
19、 rdd.filter startuplog =/executorval dauMidSet: util.SetStri ng = dauMidBC.value!dauMidSet.c ontain s(startuplog.mid)prin tl n(”過濾后:” + filteredRDD.cou nt()filteredRDD4批次內(nèi)進(jìn)行去重:按照mid進(jìn)行分組,每組取第一個(gè)值val groupbyMidDstream: DStream(Stri ng, IterableStartupLog)filteredDstream.map(startuplog=(startuplog.mid,s
20、tartuplog).grou pByKey()valdistictDstream:DStreamStartupLoggroupbyMidDstream.flatMap case (mid, startupLogItr)=startupLogltr.toList.take(1)/ 5 保存今日訪問過的用戶 (mid)清單 -Redis 1 key類型:setkey : dau:2019-xx-xx 3 value : middistictDstream.foreachRDDrdd=/driverrdd.foreachPartiti on startuplogItr=val jedis:Jedi
21、s=RedisUtil.getJedisClie nt /executorfor (startuplog - startuplogItr ) val key= dau:+startuplog .lo gDatejedis.sadd(key,startuplog.mid)prin tl n( startuplog)jedis.close()ssc.start()ssc.awaitTerm in ati on()1.5代碼實(shí)現(xiàn)3 -保存到HBase中1.5.1 Phoenix-HBase的 SQL化插件技術(shù)詳情參見尚硅谷大數(shù)據(jù)技術(shù)之phoe nix1.5.2利用Phoenix建立數(shù)據(jù)表create
22、 table gmall190408_dau( _mid varchar, uid varchar, appid varchar, area varchar, os varchar, ch varchar, type varchar, vs varchar, logDate varchar, logHour varchar, ts bigi nt CONSTRAINT dau_pk PRIMARY KEY (mid, logDate);1.5.3 pom.xml中增加依賴org.apache.phoe ni x phoe ni x-spark 4.14.2-HBase-1.3org.apach
23、e.spark spark-sql_業(yè)務(wù)保存代碼/把數(shù)據(jù)寫入hbase+phoenix distictDstream.foreachRDDrdd= rdd.saveToPhoe nix(GMALL2019_DAU,Seq(MID,UID,APPID,AREA, OS, CH, TYPE, VS, LOGDATE, LOGHOUR, TS) ,newCon figuratio n,Some(hadoop102,hadoop103,hadoop104:2181)第2章日活數(shù)據(jù)查詢接口2.1訪問路徑總數(shù)http:/localhost:8070/realtime-total?date
24、=2019-09-06分時(shí)統(tǒng)計(jì)http:/localhost:8070/realtime-hours?id二dau&date=2019-09-062.2要求數(shù)據(jù)格式總數(shù)id:dau,name:新增日活,”value:1200,id:new_mid,name:新增設(shè)備,”value:233分時(shí)統(tǒng)計(jì)yesterday:11:383,12:123,17:88,19:200 ,today:12:38,13:1233,17:123,19:6882.3搭建發(fā)布工程fi J1VModdtSDK! 叵聰*mit:是 JWB EntrrpnseClicxjfi# IritisSsrr Sjrvic# URL&
25、JSq&sCi!)I2MF;) Default ttpss/f Start.ipirrn.i gC) CloudsO Custom:O Epdngvatae Btirnyaur nwwork connectorii is xtiv址 before torrtiLJir.鼻 Ando;dIntelliJ PlLatform PlugirSprint h Hj-aGji.HT Mavm亍 jrdleProject (Metadata口r up:corrie 日 tg igugmll 2019,dwArtifactsdw-DiiblisberILaMai/en Projj&ct 】Paiirkghg
26、-Java Version:Name1Deme project fo-r Spring Boat0A1-SNAP WOTdw-publiibFrcem. atg big ul g rm all2019,d vM.pub I is her!* MoclukBodi 15-21 “Salectsd D百ipmrtdmnci甸#Develcper Tgl匚笫伽怕IPAChcvdoper To-ali_ M/SQL Drk-efLoinbGkWebILS JDBC API5prin Wb tasterWdSQLFFramF.ADiiifM申旳irigI- PoiTqwSCH DnvwSQLIK)匚 M
27、E QL Eerver OrfvEIDS匚 APIOpiSpring tlwd廠 H/JW 帶QLDilitJiWMy Doti FFumiwmitSpig Clcxid SecurityL Apache Dertsry Ddtdbdeii 二.:2.4配置文件241 pom.xmlvjava.vers ion 1.8org.spri ngframework.boot spri ng-boot-starter-web com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.spri ngframework.boot spri ng-boot-st
28、arter-testtestorg.mybatis.spri ng.boot mybatis-spri ng-boot-starter vversion 1.3.4org.spri ngframework.boot spri ng-boot-starter-jdbc org.apache.phoe ni x phoe ni x-core 4.14.2-HBase-1.3com.google.guava guava 20.0org.spri ngframework.boot spri ng-boot-mave n-plugi n 2.4.2 applicati on .propertiesser
29、ver.port=8070loggi ng.l evel.root=errorspri ng.datasource.driver-class-n ame=org.apache.phoe nix.jdbc.Phoe ni xDriverspri ng.datasource.url=jdbc:phoe ni x:hadoop102,hadoop103,hadoop104:2181spri ng.datasource.data-user name=spri ng.datasource.data-password=#mybatis#mybatis.typeAliasesPackage=com.exam
30、ple.phoe ni x.e ntity mybatis.mapperLocati on s=classpath:mapper/*.xml mybatis.c on figurati on. map-un derscore-to-camel-case=true2.5代碼實(shí)現(xiàn)控制層PublisherCo ntroller實(shí)現(xiàn)接口的web發(fā)布服務(wù)層PublisherService數(shù)據(jù)業(yè)務(wù)查詢in terfacePublisherServiceImpl業(yè)務(wù)查詢的實(shí)現(xiàn)類數(shù)據(jù)層DauMapper數(shù)據(jù)層查詢的in terfaceDauMapper.xml數(shù)據(jù)層查詢的實(shí)現(xiàn)配置主程序GmallPublish
31、erApplicatio n增加掃描包2.5.1 GmallPublisherApplication 增加掃描包Spri ngBootApplicati onMapperSca n( basePackagescom.atguigu.gmallXXXXXXX.publisher.m apper)public class Gmall2019PublisherApplicati onpublic static void main(String args) Spri ngApplicatio n.run (Gmall2019PublisherApplicatio n.class, args);2.5.
32、2 controller 層import com.alibaba.fastjs on. JSON;import com.alibaba.fastjs on. JSONObject;import com.atguigu.gmall2019.dw.publisher.service .P ublisherService; import mons.lan g.time.DateUtils;import org.spri ngframework.bea ns.factory.a nn otatio n. Autowired; import org.spri ngframework.web.b in d
33、.a nno tati on .GetMapp ing;import org.spri ngframework.web.bi nd.a nn otatio n.RequestParam; import org.spri ngframework.web.bi nd.a nn otatio n.RestC on troller;import java.text.ParseExcepti on;import java.text.SimpleDateFormat;import java.util.*;RestCo ntroller public class PublisherC on troller
34、AutowiredPublisherService publisherService;GetM appi ng(realtime-total)publicString realtimeHourDate(RequestParam(date)date) List list = new ArrayList();/日活總數(shù)int dauTotal = publisherService.getDauTotal(date);Map dauMap=new HashMap(); dauMap.put(id,da u);dauMap.put(”name,新增日活);dauMap.put(value,dauTot
35、al);list.add(dauMap);/新增用戶int n ewMidTotal = publisherService.getNewMidTotal(date);Map newMidMap=new HashMap();n ewMidMap.put(id, new_mid);newMidMap.put(”name,新增用戶”);n ewMidMap.put(value, newMidTotal);list.add( newMidMap);return JSON.toJSONStri ng(list);GetMapp in g(realtime-hours)publicStringrealti
36、meHourDate(RequestParam(id)id,RequestParam(date) String date)if(dau.equals(id)Map dauHoursToday = publisherService.getDauHours(date);JSONObject jso nObject = new JSONObject();jsonO bject.put(today,dauHoursToday);String yesterdayDateString=;try DatedateToday = new SimpleDateFormat(yyyy-MMdd).parse(da
37、te);Date dateYesterday = DateUtils.addDays(dateToday, -1);yesterdayDateStri ng=newSimpleDateFormat(yyyy-MMdd).format(dateYesterday); catch (ParseExceptio n e) e.pri ntStackTrace();MapdauHoursYesterdaypublisherService.getDauHours(yesterdayDateStri ng);jsonO bject.put(yesterday,dauHoursYesterday);return jso nObject.toJSONStri ng();if( n ew_order_totalam oun t.equals(id)Stri ngn ewOrderTotalam oun tJs onpublisherService.getNewOrderTotalAm oun tHours(d
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 廣東省教育技術(shù)能力中級(jí)培訓(xùn)測(cè)試試題(附答案)
- 審計(jì)結(jié)果與企業(yè)決策的關(guān)系試題及答案
- 項(xiàng)目成功關(guān)鍵因素分析試題及答案
- 《認(rèn)識(shí)圖形》教學(xué)設(shè)計(jì)-2024-2025學(xué)年一年級(jí)上冊(cè)數(shù)學(xué)北師大版
- 2025年注冊(cè)會(huì)計(jì)師考試大型企業(yè)的財(cái)務(wù)預(yù)算管理試題及答案
- 項(xiàng)目成果匯報(bào)的技巧與方法考題及答案
- 2025年中國夾膠手套市場(chǎng)調(diào)查研究報(bào)告
- 2024八年級(jí)數(shù)學(xué)下冊(cè) 第19章 平面直角坐標(biāo)系19.3坐標(biāo)與圖形的位置教學(xué)設(shè)計(jì)(新版)冀教版
- 資本市場(chǎng)動(dòng)態(tài)發(fā)展的試題及答案
- 器樂教學(xué)課題申報(bào)書
- 北京市朝陽區(qū)2025屆高三下學(xué)期一模試題 數(shù)學(xué) 含答案
- 運(yùn)輸公司安全管理制度
- 2025屆吉林省長春市高三下學(xué)期4月三模政治試題(原卷版+解析版)
- 2025屆江蘇省揚(yáng)州市中考一模語文試題(含答案)
- 2025年河北省唐山市中考一模道德與法治試題(含答案)
- 2025年一級(jí)注冊(cè)計(jì)量師考試題庫大全及答案
- 放療皮膚反應(yīng)分級(jí)護(hù)理
- 2025年03月內(nèi)蒙古鄂爾多斯市東勝區(qū)事業(yè)單位引進(jìn)高層次人才和緊缺專業(yè)人才50人筆試歷年典型考題(歷年真題考點(diǎn))解題思路附帶答案詳解
- 衛(wèi)生院全國預(yù)防接種日宣傳活動(dòng)總結(jié)(8篇)
- 小學(xué)消防知識(shí)教育
- 2024國家電投集團(tuán)中國電力招聘(22人)筆試參考題庫附帶答案詳解
評(píng)論
0/150
提交評(píng)論