大數(shù)據(jù)可視化 課件 項目4 PyEcharts實戰(zhàn)_第1頁
大數(shù)據(jù)可視化 課件 項目4 PyEcharts實戰(zhàn)_第2頁
大數(shù)據(jù)可視化 課件 項目4 PyEcharts實戰(zhàn)_第3頁
大數(shù)據(jù)可視化 課件 項目4 PyEcharts實戰(zhàn)_第4頁
大數(shù)據(jù)可視化 課件 項目4 PyEcharts實戰(zhàn)_第5頁
已閱讀5頁,還剩81頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

項目四:—PyEcharts實戰(zhàn)(用戶行為數(shù)據(jù)可視化)目錄CONTENTS項目概述學(xué)習(xí)目標(biāo)任務(wù)4.1 ***用戶行為數(shù)據(jù)分析任務(wù)4.2 可視化大屏應(yīng)用思考與練習(xí)1項目概述項目概述本項目將致力于使用用戶行為數(shù)據(jù)繪制可視化大屏。用戶行為數(shù)據(jù)是企業(yè)運營過程中的重要參考依據(jù),通常會在數(shù)據(jù)倉庫系統(tǒng)中進(jìn)行分析,生成用戶行為數(shù)據(jù)指標(biāo),并通過報表和可視化大屏等工具展示出來。在大數(shù)據(jù)時代,不管銷售什么商品,都會涉及到對用戶行為數(shù)據(jù)的分析。分析用戶行為數(shù)據(jù)的結(jié)果可以幫助企業(yè)了解用戶的消費習(xí)慣和偏好,并根據(jù)分析結(jié)果改進(jìn)營銷策略和完善產(chǎn)品。本項目將解析用戶行為數(shù)據(jù)指標(biāo),重點是使用PyEcharts將用戶行為數(shù)據(jù)分析結(jié)果繪制成大數(shù)據(jù)可視化大屏,雖然不涉及用戶行為數(shù)據(jù)分析,但讀者仍能對此有所了解。由于僅使用PyEcharts進(jìn)行可視化大屏繪制,并未使用前端組件,所以與常見的可視化產(chǎn)品的效果略有不同。本項目具體工作如下:用戶行為數(shù)據(jù)分析指標(biāo)介紹;用戶行為數(shù)據(jù)集簡介;針對不同指標(biāo)數(shù)據(jù)繪制圖表構(gòu)成可視化大屏。2學(xué)習(xí)目標(biāo)學(xué)習(xí)目標(biāo)通過學(xué)習(xí)本項目的知識,了解用戶行為數(shù)據(jù)分析項目,并對PyEcharts技術(shù)有扎實的了解,學(xué)會靈活使用PyEcharts配置項,并掌握使用PyEcharts繪制地圖、直方圖、散點圖、折線圖等圖表,以及可視化大屏的繪制技術(shù)。具備PyEcharts大數(shù)據(jù)可視化的基本技能,拓展大數(shù)據(jù)專業(yè)學(xué)習(xí)視野。3任務(wù)4.1 ***用戶行為數(shù)據(jù)分析任務(wù)描述經(jīng)過前面幾個項目的學(xué)習(xí),對如何使用PyEcharts進(jìn)行大數(shù)據(jù)可視化有了一定的了解,但是對于實際的業(yè)務(wù)應(yīng)用場景還有所欠缺。本任務(wù)是基于對PyEcharts的理解,使用PyEcharts對用戶行為數(shù)據(jù)進(jìn)行可視化分析實踐。通過這個任務(wù),可以理解用戶行為數(shù)據(jù)分析指標(biāo),將所學(xué)知識應(yīng)用到實際業(yè)務(wù)場景中,通過實踐激發(fā)學(xué)習(xí)興趣。完成本任務(wù)需要掌握用戶行為數(shù)據(jù)指標(biāo)體系,了解為何要做用戶行為數(shù)據(jù)分析,如何做用戶行為數(shù)據(jù)分析,并使用PyEcharts根據(jù)用戶行為數(shù)據(jù)分析指標(biāo)結(jié)果繪制可視化大屏,通過學(xué)習(xí)用戶行為數(shù)據(jù)可視化大屏案例,加深對PyEcharts的應(yīng)用理解。知識與技能一、什么是用戶行為數(shù)據(jù)

用戶行為數(shù)據(jù)在絕大多數(shù)情況下,就是在用戶使用APP、瀏覽網(wǎng)頁過程中的日志數(shù)據(jù)。以電商為例,日志數(shù)據(jù)又分為頁面數(shù)據(jù)、事件數(shù)據(jù)、曝光數(shù)據(jù)、啟動數(shù)據(jù)和錯誤數(shù)據(jù)。

頁面數(shù)據(jù):主要記錄用戶在一個頁面中的訪問情況,如訪問時間、停留時間、訪問路徑等信息。

事件數(shù)據(jù):主要記錄應(yīng)用中具體的操作行為,包括操作類型、操作對象、操作對象描述等信息,如電商點擊商品、添加收藏等。

曝光數(shù)據(jù):使用APP時在首頁區(qū)域滾動的界面,主要記錄頁面曝光的內(nèi)容,包括曝光對象、曝光類型等。

啟動數(shù)據(jù):啟動APP時顯示的信息,一般是一些廣告信息,或者活動信息。

錯誤數(shù)據(jù):主要記錄應(yīng)用在使用過程中產(chǎn)生的錯誤信息。知識與技能二、用戶行為數(shù)據(jù)從哪里來

用戶行為數(shù)據(jù)一般都來源于用戶的使用日志數(shù)據(jù),而日志數(shù)據(jù)是怎么產(chǎn)生的呢?

一般而言,日志數(shù)據(jù)是采用頁面埋點的方式獲取的。

不同的日志結(jié)構(gòu)可能會有所區(qū)別,但是涉及用戶行為數(shù)據(jù)的大同小異。一般含有地區(qū)編碼、手機(jī)品牌、渠道、手機(jī)型號、操作系統(tǒng)、動作id、事件時間、用戶標(biāo)識等。知識與技能三、用戶行為數(shù)據(jù)指標(biāo)體系統(tǒng)計周期指標(biāo)說明最近1日PVpageview,頁面點擊量最近1日UVuniqueview,獨立訪客統(tǒng)計周期統(tǒng)計粒度指標(biāo)最近1/7/30日渠道訪客數(shù)最近1/7/30日渠道會話平均停留時長最近1/7/30日渠道會話平均瀏覽頁面數(shù)最近1/7/30日渠道會話總數(shù)最近1/7/30日渠道跳出率統(tǒng)計周期指標(biāo)說明最近1天流失用戶數(shù)之前活躍過的用戶,最近一段時間未活躍,就稱為流失用戶。(統(tǒng)計每天的前7天)知識與技能統(tǒng)計周期指標(biāo)說明最近1日回流用戶數(shù)之前的活躍用戶,一段時間未活躍,今日又活躍了,就稱為回流用戶。統(tǒng)計周期指標(biāo)說明最近1日留存率統(tǒng)計每天的1-7日留存率或者每天的1-30日留存率,統(tǒng)計的是新增留存率統(tǒng)計周期指標(biāo)最近1、7、30日新增用戶數(shù)最近1、7、30日活躍用戶數(shù)統(tǒng)計周期指標(biāo)最近1、7、30日選擇人數(shù)最近1、7、30日掃碼人數(shù)最近1、7、30日領(lǐng)取人數(shù)統(tǒng)計周期統(tǒng)計粒度指標(biāo)最近1、7、30日年齡段下單人數(shù)環(huán)境安裝一、Ubuntu20.4安裝Step1卸載自帶python在Ubuntu中,通常會預(yù)裝Python2.x和Python3.x版本。如果您想要完全卸載所有現(xiàn)有的Python版本,可以使用以下命令:```sudoapt-getremove--purgepython2.7-minimalpython2.7sudoapt-getremove--purgepython3-minimalpython3sudoapt-getautoremove```

在UbuntuServer20.04中安裝Python3.8.10環(huán)境,可以按照以下步驟進(jìn)行:Step2更新系統(tǒng)軟件包sudoapt-getupdate環(huán)境安裝Step3安裝必要的依賴項sudoapt-getinstallbuild-essentiallibssl-devzlib1g-devlibncurses5-devlibncursesw5-dev\libreadline-devlibsqlite3-devlibgdbm-devlibdb5.3-devlibbz2-devlibexpat1-devliblzma-dev\tk-devlibffi-devwgetcurl環(huán)境安裝Step4下載Python3.8源碼包cd~wget/mirrors/python/3.8.10/Python-3.8.10.tgzStep5解壓源碼包并進(jìn)入目錄tar-xzvfPython-3.8.10.tgzcdPython-3.8.10Step6編譯和安裝Python3.8./configure--enable-optimizationsmake-j8sudomakealtinstallStep7驗證Python安裝是否成功python3.8--version如果出現(xiàn)類似以下輸出結(jié)果,則說明Python3.8安裝成功:Python3.8.10環(huán)境安裝環(huán)境安裝Step8更新軟鏈接#先看一看python的鏈接sudofind/-typef-namepython3.8#如果輸出結(jié)果是/usr/local/bin/python3.8#再繼續(xù)執(zhí)行下面的命令sudoln-s/usr/local/bin/python3.8/usr/local/bin/python3刷新Bash中`python`命令的哈希表hash-rpython3#之后嘗試使用python命令,是否可以進(jìn)入環(huán)境環(huán)境安裝Step9安裝pipsudoaptinstallpython3-pipStep10安裝相關(guān)模塊python3-mpipinstalluser_agents-i/simplepython3-mpipinstallpandas-i/simple環(huán)境安裝二、Hadoop安裝Step1:安裝Java8Step2:安裝SSH服務(wù)sudoaptupdatesudoaptinstallsshsudoaptupdatesudoaptinstallopenjdk-8-jdkStep3:下載和解壓縮Hadoop3.3.3wget/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz環(huán)境安裝當(dāng)然,教學(xué)資源中提供了hadoop的安裝包,可下載并解壓縮Hadoop3.3.3:tar-xvfhadoop-3.3.3.tar.gz#只是為了改名,變得簡單些sudomvhadoop-3.3.3hadoopStep4:配置環(huán)境變量

sudovi/etc/profile在文件最后添加以下內(nèi)容:exportHADOOP_HOME=/home/arthas/hadoopexportPATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin環(huán)境安裝然后保存并關(guān)閉文件。接下來,重新加載當(dāng)前的shell環(huán)境:source/etc/profileStep5:修改Hadoop配置文件

打開Hadoop配置文件:sudovi/home/arthas/hadoop/etc/hadoop/hadoop-env.sh修改JAVA_HOME變量的值為Java8的安裝路徑:exportJAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64然后保存并關(guān)閉文件。環(huán)境安裝接下來,打開core-site.xml配置文件:sudovi/home/arthas/hadoop/etc/hadoop/core-site.xml在<configuration>標(biāo)簽內(nèi)添加以下內(nèi)容:<property>

<name>fs.defaultFS</name>

<value>hdfs://localhost:9000</value>

</property>

<property>

<name>hadoop.tmp.dir</name>

<value>/home/arthas/hadoop/tmp</value>

<description>Abaseforothertemporarydirectories.</description>

</property>然后保存并關(guān)閉文件。環(huán)境安裝現(xiàn)在,打開hdfs-site.xml配置文件:sudovi/home/arthas/hadoop/etc/hadoop/hdfs-site.xml在<configuration>標(biāo)簽內(nèi)添加以下內(nèi)容:<property>

<name>dfs.replication</name>

<value>3</value>

</property>

<property>

<name>.dir</name>

<value>file:/home/arthas/hadoop/hadoop_data/hdfs/namenode</value>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>file:/home/arthas/hadoop/hadoop_data/hdfs/datanode</value>

</property>

<property>

<name>dfs.http.address</name>

<value>:50070</value>

</property>環(huán)境安裝然后保存并關(guān)閉文件。最后,打開mapred-site.xml:sudovi/home/arthas/hadoop/etc/hadoop/mapred-site.xml在<configuration>標(biāo)簽內(nèi)添加以下內(nèi)容:<property>

<name></name>

<value>yarn</value></property>Step6:格式化HDFS

在啟動Hadoop之前,我們需要格式化HDFS:hdfsnamenode-format環(huán)境安裝Step7:生成

SSH密鑰生成SSH密鑰:

ssh-keygen-trsa-P''-f~/.ssh/id_rsa然后將該密鑰添加到授權(quán)密鑰列表中:cat~/.ssh/id_rsa.pub>>~/.ssh/authorized_keyschmod0600~/.ssh/authorized_keysStep8:開啟密鑰認(rèn)證修改/etc/ssh/sshd_config:sudovi/etc/ssh/sshd_config環(huán)境安裝在其內(nèi)容中找到PubkeyAuthentication改為yesPubkeyAuthenticationyes然后保存并關(guān)閉文件。重新啟動SSH服務(wù)以使更改生效:sudosystemctlrestartsshdStep9:啟動Hadoop現(xiàn)在,進(jìn)入hadoop安裝根目錄,您可以通過以下命令啟動Hadoop:~/hadoop/sbin/start-all.sh啟動過程(正常):arthas@arthas:~$start-all.shWARNING:AttemptingtostartallApacheHadoopdaemonsasarthasin10seconds.WARNING:Thisisnotarecommendedproductiondeploymentconfiguration.WARNING:UseCTRL-Ctoabort.Startingnamenodeson[localhost]StartingdatanodesStartingsecondarynamenodes[arthas]StartingresourcemanagerStartingnodemanagers環(huán)境安裝一、Spark安裝Step1:安裝Scala教學(xué)資源中提供了spark的安裝包,可拷貝并解壓,解壓縮并更名:cd~tar-xvfscala-2.11.12.tgzsudomvscala-2.11.12scalaStep2:安裝Sparkcd~tar-xvfspark-3.3.0-bin-hadoop3.tgzsudomvspark-3.3.0-bin-hadoop3sparkStep3:配置Scala和Spark環(huán)境變量sudovi/etc/profile環(huán)境安裝進(jìn)入配置文件后,輸入“i”進(jìn)入編輯模式,將以下內(nèi)容復(fù)制(重復(fù)內(nèi)容可忽略):exportJAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64exportPATH=$PATH:$JAVA_HOME/bin

exportHADOOP_HOME=/home/arthas/hadoopexportPATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

exportSCALA_HOME=/home/arthas/scalaexportPATH=$PATH:$SCALA_HOME/bin

exportSPARK_HOME=/home/arthas/sparkexportPATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin輸入以下命令將環(huán)境重新生效:source/etc/profile環(huán)境安裝Step4:修改Spark文件進(jìn)入spark/conf/文件夾,復(fù)制配置文件spark-env.sh.template并更名為spark-env.sh之后,編輯該文件:cp~/spark/conf/spark-env.sh.templatespark-env.shsudovispark/conf/spark-env.sh進(jìn)入配置文件后,輸入“i”進(jìn)入編輯模式,將以下內(nèi)容復(fù)制:exportJAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64exportSCALA_HOME=/home/arthas/scalaexportHADOOP_HOME=/home/arthas/hadoopexportHADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexportSPARK_MASTER_IP=MasterexportSPARK_LOCAL_DIRS=/home/arthas/scalaexportSPARK_DRIVER_MEMORY=2g任務(wù)實施4.1.1行為數(shù)據(jù)需求分析第一步:分析原始數(shù)據(jù)內(nèi)容hdfsdfs-put/opt/software/data/behavior_202112-part/user/ysedu/hdfsdfs-put/opt/software/data/access_202112-part/user/ysedu/由于數(shù)據(jù)量較大,本任務(wù)使用Hadoop的hdfs存儲數(shù)據(jù),spark處理數(shù)據(jù),可使用資料中已安裝好全部環(huán)境的docker鏡像tar包。也可使用資料包中安裝包或自行下載后安裝,本環(huán)境使用的版本為Hadoop3.3.3和spark3.3.0。本案例我們將基于某小程序2021年12月脫敏后真實數(shù)據(jù)進(jìn)行分析,包括用戶行為數(shù)據(jù)behavior_202112-part和訪問日志access_202112-part。首先使用hdfs將準(zhǔn)備好的原始數(shù)據(jù)導(dǎo)入任務(wù)實施4.1.1行為數(shù)據(jù)需求分析第一步:分析原始數(shù)據(jù)內(nèi)容通過hdfs的cat命令可以查看我們原始數(shù)據(jù)hdfsdfs-cat/user/ysedu/behavior_202112-part/part-00000hdfsdfs-cat/user/ysedu/access_202112-part/di_log.2021122007.209.log

access_202112-part記錄了用戶nginx的請求數(shù)據(jù),但其中ip需要解析為地理位置,瀏覽器信息可以解析為手機(jī)型號,token需要轉(zhuǎn)換為用戶id,以便進(jìn)行用戶行為數(shù)據(jù)的分析和展示。任務(wù)實施4.1.1行為數(shù)據(jù)需求分析第二步:根據(jù)需求制定分析指標(biāo)根據(jù)以上數(shù)據(jù)我們可以利用整體的pv/uv(訪問量/用戶瀏覽量),展示應(yīng)用整體的訪問量情況;也可以根據(jù)不同頁面page得出各頁面的pv/uv,用于了解哪些頁面展示得最多,從而將主要功能放在該頁面。哪些想要展示的頁面因為隱藏得比較深導(dǎo)致瀏覽量不高,需要做操作流程上的優(yōu)化處理。根據(jù)用戶通過不同渠道channel_code訪問進(jìn)入程序,可以展示各渠道的pv/uv,了解哪些優(yōu)質(zhì)渠道可以帶來更多的用戶訪問,哪些渠道效果不佳,從而篩選出更好的渠道進(jìn)行合作。根據(jù)訪問時長spot_val,可以求得平均每次訪問的停留時長。也可以得出平均每個用戶的訪問時長,結(jié)合頁面和日期時間等,可以分析出在不同維度下的用戶黏性,以此為依據(jù)了解什么時間段,什么樣的功能頁面更吸引用戶。任務(wù)實施4.1.1行為數(shù)據(jù)需求分析第二步:根據(jù)需求制定分析指標(biāo)根據(jù)訪問日志,篩選url為登錄接口/di/reg/get_login_token,可以統(tǒng)計出每天各時段的啟動次數(shù)人數(shù)。也可以通過統(tǒng)計用戶登錄時段,了解用戶在什么時候最活躍,從而可以優(yōu)先考慮在該時段做活動或發(fā)文章。根據(jù)省份/城市的pv/uv查找出當(dāng)前最活躍的地區(qū),以及找到有潛力成為下一個活躍地區(qū)的城市。結(jié)合渠道信息,針對當(dāng)?shù)氐那闆r找到合適的渠道,更快獲取新客戶、留住老客戶。還可以分析事件轉(zhuǎn)化,如掃碼領(lǐng)取事件,可以分別分析掃碼事件、選商品事件和領(lǐng)取事件的數(shù)據(jù)量,通過漏斗圖分析用戶流失環(huán)節(jié),從而有針對性地做出優(yōu)化。任務(wù)實施4.1.1行為數(shù)據(jù)需求分析第二步:根據(jù)需求制定分析指標(biāo)最后還可以根據(jù)用戶首次訪問時間和之后再次訪問時間,計算得出用戶訪問留存率,以此為數(shù)據(jù)支撐,對現(xiàn)有的業(yè)務(wù)做出合理評價,便于制定和調(diào)整計劃。留存率是用于反映運營情況的統(tǒng)計指標(biāo),其具體含義為在統(tǒng)計周期(周/月)內(nèi),每日活躍用戶數(shù)在第N日仍啟動該App的用戶數(shù)占比的平均值。其中N通常取2、4、8、15、31,分別對應(yīng)次日留存率、三日留存率、七日留存率、半月留存率和月留存率。4.1.2原始數(shù)據(jù)簡單處理任務(wù)實施上面提到access_202112-part數(shù)據(jù)還需要處理,需要將ip解析為地理位置,瀏覽器信息解析為手機(jī)型號,token解析為用戶id。第一步:創(chuàng)建viuser_behavior_analysis_1.py創(chuàng)建user_behavior_analysis_1.py文件編寫數(shù)據(jù)處理內(nèi)容importsys

frompyspark.sqlimportSparkSession

fromdata_cleaningimportDataCleaning

fromdata_cleaningimportSplitType

fromip_areaimportIpArea

fromuser_agentimportUserAgentParser

importtime

#獲取sparkSession

defget_spark_session():

returnSparkSession.builder.appName("BehaviorLog").getOrCreate()

defmain(argv):

iflen(argv)>1:

input_path=argv[1]

else:

sys.stderr.write("Usage:"+argv[0]+"<inputpath>[<outputpath>]\n")

exit(2)

任務(wù)實施

iflen(argv)>2:

output_path=argv[2]

else:

output_path=None

spark=get_spark_session()

rdd=spark.sparkContext.textFile(input_path)

#設(shè)定正則格式

reg_str='^(\\S+)\\S+\\S+\\[([\\w:/]+\\s[+\\-]\\d{4})\\]\"(\\S+)(\\S+)\\s*\\S+\\s*\"(\\d{3})(\\S+)(\\S+)\"(\\S*)\"\"([^\\\"]+)\"\"\\S*\"\"(\\S*)\"'

colums=["ip","day_time","method","url","rt_code","size","cost","refer_url","ua","token"]

defurl_parse(r):

url=r["url"]

url_path=""

uuid=""

pos=url.find("?")

ifpos>=0:

#切割url參數(shù)

url_path=url[:pos]

url_params=url[pos+1:].split('&')

forone_paraminurl_params:

param_pos=one_param.find("=")

ifparam_pos>=0:

key=one_param[:param_pos]

val=one_param[param_pos+1:]

ifkey=="uuid":

uuid=val

return(url_path,uuid)

ip_area=IpArea()

ip_area.set_dict(ip_area.load_ip_area_dict('./data/ip_region.data'),

ip_area.load_ip_area_dict('./data/ip_country.data'))任務(wù)實施before_converts={

("country","province","city"):lambdar:ip_area.get_area_by_ip(r["ip"]),

"day_time":lambdar:time.strftime('%Y-%m-%d%H:%M:%S',

time.strptime(r["day_time"][:20],'%d/%b/%Y:%H:%M:%S')),

("url_path","uuid"):url_parse,

("brower","os","platform","is_mobile"):lambdar:UserAgentParser().parse(r["ua"])

}

#只篩選/di開頭的url

filters=[lambdar:r["url_path"].startswith("/di")]

after_converts={}

dc=DataCleaning(SplitType.regex,reg_str,colums,before_converts,filters,after_converts)

dc.remove_columns(["url","refer_url","ua"])

rdd=rdd.map(dc.format)

rdd=rdd.map(dc.convert).filter(lambdar:r!=None)

rdd=rdd.map(dc.tab_text)

#輸出數(shù)據(jù)

ifoutput_path:

rdd.saveAsTextFile(output_path)

else:

print('\n'.join([str(item)foriteminrdd.take(10)]))

if__name__=="__main__":

main(sys.argv)任務(wù)實施這里用到了工具類DataCleaning數(shù)據(jù)清洗類,制定數(shù)據(jù)解析類型為正則,reg_str='^(\\S+)\\S+\\S+\\[([\\w:/]+\\s[+\\-]\\d{4})\\]\"(\\S+)(\\S+)\\s*\\S+\\s*\"(\\d{3})(\\S+)(\\S+)\"(\\S*)\"\"([^\\\"]+)\"\"\\S*\"\"(\\S*)\“’解析出來的字段對應(yīng)為colums=["ip","day_time","method","url","rt_code","size","cost","refer_url","ua","token"]字典before_converts定義了各類數(shù)據(jù)和對應(yīng)的處理方法before_converts={

("country","province","city"):lambdar:ip_area.get_area_by_ip(r["ip"]),

"day_time":lambdar:time.strftime('%Y-%m-%d%H:%M:%S',time.strptime(r["day_time"][:20],'%d/%b/%Y:%H:%M:%S')),

("url_path","uuid"):url_parse,

("brower","os","platform","is_mobile"):lambdar:UserAgentParser().parse(r["ua"])

}任務(wù)實施篩選項目指定url為/di開頭的數(shù)據(jù)filters=[lambdar:r["url_path"].startswith("/di")]

最后調(diào)用處理數(shù)據(jù)dc=DataCleaning(SplitType.regex,reg_str,colums,before_converts,filters,after_converts)

#刪除多余列

dc.remove_columns(["url","refer_url","ua"])

rdd=rdd.map(dc.format)

rdd=rdd.map(dc.convert).filter(lambdar:r!=None)

#保存數(shù)據(jù)

rdd=rdd.map(dc.tab_text)任務(wù)實施第二步:ip解析地域("country","province","city"):lambdar:ip_area.get_area_by_ip(r["ip"]),

這里用到的是fromip_areaimportIpArea會用到兩個文件,ip_region.dataip_country.data,用于解析ip所屬國家和所屬地區(qū)。創(chuàng)建ip_area.py文件,寫入如下代碼#-*-coding:utf-8-*-

#ip_area.py

classIpArea():

#指定ip地區(qū)和ip城市字典文件

def__init__(self,ip_region_file=None,ip_country_file=None):

ifip_region_file:

self.ip_region_dict=self.load_ip_area_dict(ip_region_file)

ifip_country_file:

self.ip_country_dict=self.load_ip_area_dict(ip_country_file)

defset_dict(self,ip_region_dict,ip_country_dict):

self.ip_region_dict=ip_region_dict

self.ip_country_dict=ip_country_dict任務(wù)實施

defload_ip_area_dict(self,ip_region_file):

res=[]

fb=open(ip_region_file,'r')

try:

whileTrue:

line=fb.readline()

ifnotline:

break

parts=line.split("\t")

#地域多個,則拼接成一個字符串

res.append([ip_to_long(parts[0]),ip_to_long(parts[1]),','.join(parts[2:]).strip()])

exceptIOErroraserr:

print("IOError:",err)

finally:

fb.close()

#排序

returnsorted(res,key=lambdak:k[:2])

defget_area_by_ip(self,ip_str):

ip=ip_str[7:]ifip_str.startswith("::ffff:")elseip_str

ip_long=ip_to_long(ip_str)

#ip查找城市

ip_region_dic_list=self.ip_region_dict

ip_index=-1

iflen(ip_region_dic_list)>0:

ip_index=ip_search_section(ip_region_dic_list,0,len(ip_region_dic_list)-1,ip_long)任務(wù)實施country=None

province="未知"

city="未知"

if-1!=ip_index:

region_cell=ip_region_dic_list[ip_index]

region_str=region_cell[2]

ifregion_str!="其他":

country="中國"

region_parts=region_str.split(",")

province=region_parts[0]

city=region_parts[1]iflen(region_parts)>1else"其他"

ifNone==country:

#ip查找國家或區(qū)域

ip_country_dic_list=self.ip_country_dict

country_index=-1

iflen(ip_country_dic_list)>0:

country_index=ip_search_section(ip_country_dic_list,0,len(ip_country_dic_list)-1,ip_long)

if-1!=country_index:

country_cell=ip_country_dic_list[country_index]

country_str=country_cell[2]

if"China"==country_str:

country="中國"

elif"Reserved"==country_str:

country="其他"

elif"Taiwan;RepublicofChina(ROC)"==country_str:

country="中國"

province="臺灣"

elif"HongKong"==country_str:

country="中國"

province="香港"

return(country,province,city)任務(wù)實施#二分法查找IP

defip_search_section(ip_dict_list,low,high,ip_long):

iflow<=high:

mid=int((low+high)/2)

dic=ip_dict_list[mid]

s_ip=dic[0]

e_ip=dic[1]

ifip_long>=s_ipandip_long<=e_ip:

returnmid

elifip_long>e_ip:

returnip_search_section(ip_dict_list,mid+1,high,ip_long)

elifip_long<s_ip:

returnip_search_section(ip_dict_list,low,mid-1,ip_long)

return-1

#ip轉(zhuǎn)長整型

defip_to_long(ip_str):

ip_long=[0,0,0,0]

p1=ip_str.find(".")

ifp1==-1:

returnint(ip_str)

p2=ip_str.find(".",p1+1)

p3=ip_str.find(".",p2+1)

ip_long[0]=int(ip_str[:p1])

ip_long[1]=int(ip_str[p1+1:p2])

ip_long[2]=int(ip_str[p2+1:p3])

ip_long[3]=int(ip_str[p3+1:])

return(ip_long[0]<<24)+(ip_long[1]<<16)+(ip_long[2]<<8)+ip_long[3]

if__name__=="__main__":

ip_area=IpArea("./data/ip_region.data","./data/ip_country.data")

print(ip_area.get_area_by_ip("15"))任務(wù)實施定義類IpArea,__init__指定ip_region和ip_country文件路徑load_ip_area_dict用于讀取文件并按ip排序get_area_by_ip為通過ip解析地址的方法,其中ip_to_long為將ip轉(zhuǎn)為long類型數(shù)字,ip_search_section根據(jù)ip通過二分法遞歸查找對應(yīng)所在位置下標(biāo)第三步:解析時間為%Y-%m-%d%H:%M:%S"day_time":lambdar:time.strftime('%Y-%m-%d%H:%M:%S',time.strptime(r["day_time"][:20],'%d/%b/%Y:%H:%M:%S')),例如將日志中數(shù)據(jù)“31/Dec/2021:23:45:12+0800”解析為“2021-12-3123:45:12”返回。任務(wù)實施第四步:解析url中的uuid("url_path","uuid"):url_parse,

url_parse定義在user_behavior_analysis_1.py文件中,例如url為/diclient/user/perfect_info?scene=a45bba09140ffedd96d8719f294b7cd1&uuid=1640965488711&version=8將會被解析返回url_path:"/diclient/user/perfect_info",uuid:"1640965488711"第五步:ua解析手機(jī)型號("brower","os","platform","is_mobile"):lambdar:uap.parse(r["ua"]),

任務(wù)實施這里用到了fromuser_agentimportUserAgentParser需要拷貝手機(jī)型號數(shù)據(jù)文件user_agent_dict.data,用于解析ua中手機(jī)型號。創(chuàng)建user_agent.py文件,寫入如下代碼#-*-coding:utf-8-*-

#user_agent.py

importuser_agents

classUserAgentParser():

def__init__(self):

phone_map={}

#打開字典文件

withopen('./data/user_agent_dict.data','r')asfb:

whileTrue:

#逐行讀取文件

line=fb.readline()

ifnotline:

break

parts=line.split("\t")

iflen(parts)!=2:

print("Unknowlinesplit:"+str(parts))

phone_map[parts[0].strip()]=parts[1].strip()

#記錄map

self.phone_map=phone_map

#根據(jù)map解析

defparse(self,ua_str):

user_agent=user_agents.parse(ua_str)

dev=user_agent.device.family

platform='Other'ifdev=='Other'elseself.phone_map.get(user_agent.device.family,user_agent.device.family)

broswer,os,platform,is_mobile=(user_agent.browser.family,user_agent.os.family,platform,1ifuser_agent.is_mobileelse0)

return(broswer,os,platform,is_mobile)任務(wù)實施第六步:token解析用戶信息("shopid","userid"):lambdar:tui.parse_userinfo(r['token'])

解析token用到了fromtoken_userinfoimportTokenUserinfo需要用到數(shù)據(jù)文件user_token_map.csv,用于將token轉(zhuǎn)化為userid和shopid。將文件拷貝到data目錄下。cp/opt/software/data/user_agent_dict.data/home/ysedu/workspace/data/

任務(wù)實施在/home/ysedu/workspace下創(chuàng)建token_userinfo.py文件,寫入如下代碼:#-*-coding:utf-8-*-

classTokenUserinfo():

def__init__(self,token_userinfo_map_path='data/user_token_map.csv'):

token_userinfo_map={}

#打開用戶token對應(yīng)字典文件

withopen(token_userinfo_map_path,'r')asfb:

whileTrue:

line=fb.readline()

ifnotline:

break

parts=line.strip().split('","')

token=parts[0][1:]

shopid=parts[1]

shopid=shopidifshopid!='0'else''

userid=parts[2][:-1]

userid=useridifuserid!='0'else''

token_userinfo_map[token]=(shopid,userid)

self.token_userinfo_map=token_userinfo_map

defparse_userinfo(self,token):

#根據(jù)map解析

userinfo=self.token_userinfo_map.get(token)

ifuserinfo:

returnuserinfo

else:

return('','')任務(wù)實施第七步:執(zhí)行數(shù)據(jù)處理編寫啟動腳本start_uba_spark_1.sh,內(nèi)容如下#!/bin/bash

source/etc/profile

data_path=/user/ysedu/access_202112-part

res_path=/user/ysedu/uba/access_202112-part

hadoopfs-rm-r$res_path

SPARK_MASTER=local

spark-submit--master$SPARK_MASTER--py-files=data_cleaning.py,ip_area.py,user_agent.py,token_userinfo.pyuser_behavior_analysis_1.py$data_path$res_path執(zhí)行后完成數(shù)據(jù)處理并寫入/user/ysedu/uba/access_202112-part4.1.3用戶行為數(shù)據(jù)統(tǒng)計任務(wù)實施第一步:創(chuàng)建viuser_behavior_analysis_2.py#-*-coding:utf-8-*-

importsys

frompyspark.sqlimportSparkSession

frompyspark.sqlimportSQLContext

frompyspark.sql.typesimport*

importpandasaspd

fromdata_cleaningimportDataCleaning

fromdata_cleaningimportSplitType

fromdata_statimport*

defget_spark_session():

returnSparkSession.builder.appName("BehaviorStat").getOrCreate()

output_path=None

sc=None

defmain(argv):

iflen(argv)>2:

input_request_path=argv[1]

input_accesslog_path=argv[2]

else:

sys.stderr.write("Usage:"+argv[0]+"<input_request_path><input_accesslog_path>[<outputpath>]\n")

exit(2)

globaloutput_path

globalsc任務(wù)實施

iflen(argv)>3:

output_path=argv[2]

spark=get_spark_session()

sc=spark.sparkContext

#清洗請求日志數(shù)據(jù)

rqt=DataCleaning(SplitType.field,"\t",["day_time","url","channel_code","token","spot_code","spot_val","uuid","item_id","action","page","shopid","userid"])

accesslog=DataCleaning(SplitType.field,"\t",["ip","day_time","method","rt_code","size","cost","token","country","province","city","url_path","uuid","brower","os","platform","is_mobile","shopid","userid"])

#讀取rdd數(shù)據(jù)

rqt_rdd=sc.textFile(input_request_path).map(rqt.format)

access_rdd=sc.textFile(input_accesslog_path).map(accesslog.format)

#分別計算各所需數(shù)據(jù)

page_stat(rqt_rdd)

total_stat(rqt_rdd)

channel_stat(rqt_rdd)

total_avg_access_duration_stat(rqt_rdd)

user_avg_access_duration_stat(rqt_rdd)

terminal_stat(access_rdd)

province_city_stat(access_rdd)

daily_stat(rqt_rdd)

hour_stat(access_rdd)

get_gift_convert_stat(access_rdd)

retention_stat(sc.textFile(input_accesslog_path).map(accesslog.format))

spark.stop()右圖代碼中分別引入hdfs文件/user/ysedu/data_cleaning/behavior_202112-part和/user/ysedu/uba/access_202112-part作為數(shù)據(jù)源,按以下列解析任務(wù)實施接下來我們分別完成統(tǒng)計的各個函數(shù),并最終調(diào)用輸出數(shù)據(jù)。提前編寫好執(zhí)行文件,每步寫完一個函數(shù),即可執(zhí)行查看結(jié)果。vistart_uba_spark_2.sh#!/bin/bash

source/etc/profile

request_data_path=/user/ysedu/data_cleaning/behavior_202112-part

access_data_path=/user/ysedu/uba/access_202112-part

res_path=/user/ysedu/uba/behavior_202112-part

hadoopfs-rm-r$res_path

SPARK_MASTER=local

spark-submit--master$SPARK_MASTER--py-files=data_cleaning.py,data_stat.pyuser_behavior_analysis_2.py$request_data_path$access_data_path$res_path任務(wù)實施第二步:計算整體pvuv#整體pvuv

deftotal_stat(rdd):

dim_columns=[]

indices=[Count(),CountUniq(["userid"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))使用數(shù)據(jù)統(tǒng)計類DataStat,篩選"spot_code"為"spot_interval"訪問數(shù)據(jù),然后分別計算總數(shù)和userid去重后數(shù)量,得到pv和uv。將得到的數(shù)據(jù)保存到data/stat_result/目錄下,文件名為當(dāng)前函數(shù)名,通過sys._getframe().f_code.co_name獲得。任務(wù)實施第三步:頁面pv

uv#頁面pvuv

defpage_stat(rdd):

dim_columns=["page"]

indices=[Count(),CountUniq(["userid"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#根據(jù)pv排序

rdd=rdd.sortBy(lambdar:r[-2],False,1)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))使用數(shù)據(jù)統(tǒng)計類DataStat,篩選"spot_code"為"spot_interval"訪問數(shù)據(jù),這里按page進(jìn)行分組,分別計算總數(shù)和userid去重后數(shù)量,得到pv和uv,并使用倒數(shù)第二列,也就是pv進(jìn)行倒序。將得到的數(shù)據(jù)保存到data/stat_result/目錄下,文件名為當(dāng)前函數(shù)名。任務(wù)實施第四步:渠道訪問pvuv#渠道訪問pvuv

defchannel_stat(rdd):

dim_columns=["channel_code"]

indices=[Count(),CountUniq(["userid"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#根據(jù)pv排序

rdd=rdd.sortBy(lambdar:r[-2],False,1)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))第五步:平均訪問時長#平均訪問時長

deftotal_avg_access_duration_stat(rdd):

dim_columns=[]

indices=[Avg(["spot_val"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))使用數(shù)據(jù)統(tǒng)計類DataStat,篩選"spot_code"為"spot_interval"訪問數(shù)據(jù),計算spot_val也就是停留時長的平均值。將得到的數(shù)據(jù)保存到data/stat_result/目錄下,文件名為當(dāng)前函數(shù)名。使用數(shù)據(jù)統(tǒng)計類DataStat,篩選"spot_code"為"spot_interval"訪問數(shù)據(jù),按channel_code進(jìn)行分組,分別計算總數(shù)和userid去重后數(shù)量,得到pv和uv,并按pv倒序。任務(wù)實施第六步:用戶訪問時長#用戶訪問時長

defuser_avg_access_duration_stat(rdd):

dim_columns=["uuid"]

indices=[Avg(["spot_val"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論