大數(shù)據(jù)導(dǎo)論思維、技術(shù)與應(yīng)用第12章SPARKSQL課件_第1頁(yè)
大數(shù)據(jù)導(dǎo)論思維、技術(shù)與應(yīng)用第12章SPARKSQL課件_第2頁(yè)
大數(shù)據(jù)導(dǎo)論思維、技術(shù)與應(yīng)用第12章SPARKSQL課件_第3頁(yè)
大數(shù)據(jù)導(dǎo)論思維、技術(shù)與應(yīng)用第12章SPARKSQL課件_第4頁(yè)
大數(shù)據(jù)導(dǎo)論思維、技術(shù)與應(yīng)用第12章SPARKSQL課件_第5頁(yè)
已閱讀5頁(yè),還剩57頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、 大數(shù)據(jù)導(dǎo)論第十二章CONTENTS目錄PART 01 SPARK SQL簡(jiǎn)介PART 02 SPARK SQL執(zhí)行流程PART 03 基礎(chǔ)數(shù)據(jù)模型DATAFRAMEPART 04 使用Spark SQL的方式PART 05 SPARK SQL數(shù)據(jù)源PART 06 SPARK SQL CLI介紹PART 07在Pyspark中使用Spark SQLPART 08 在Java中連接Spark SQLPART 09 習(xí)題PART 01 Spark SQL簡(jiǎn)介Spark SQL是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,為Spark提供了查詢結(jié)構(gòu)化數(shù)據(jù)的能力。Spark SQL可被視為一個(gè)分布式的SQ

2、L查詢引擎,可以實(shí)現(xiàn)對(duì)多種數(shù)據(jù)格式和數(shù)據(jù)源進(jìn)行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。Spark SQL簡(jiǎn)介Spark SQL介紹:Spark SQL是為了處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)Spark 模塊。不同于Spark RDD的基本API,Spark SQL接口擁有更多關(guān)于數(shù)據(jù)結(jié)構(gòu)本身與執(zhí)行計(jì)劃等更多信息。在Spark內(nèi)部,Spark SQL可以利用這些信息更好地對(duì)操作進(jìn)行優(yōu)化。Spark SQL提供了三種訪問(wèn)接口:SQL,DataFrame API和Dataset API。當(dāng)計(jì)算引擎被用來(lái)執(zhí)行一個(gè)計(jì)算時(shí),有不同的API和語(yǔ)言種類可供選擇

3、。這種統(tǒng)一性意味著開(kāi)發(fā)人員可以來(lái)回輕松切換各種最熟悉的API來(lái)完成同一個(gè)計(jì)算工作。Spark SQL簡(jiǎn)介Spark SQL具有如下特點(diǎn)數(shù)據(jù)兼容方面:能加載和查詢來(lái)自各種來(lái)源的數(shù)據(jù)。 性能優(yōu)化方面:除了采取內(nèi)存列存儲(chǔ)、代碼生成等優(yōu)化技術(shù)外,還引進(jìn)成本模型對(duì)查詢進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等; 組件擴(kuò)展方面:無(wú)論是SQL的語(yǔ)法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。標(biāo)準(zhǔn)連接:Spark SQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。Spark SQL簡(jiǎn)介Spark SQL具有如下特點(diǎn)集成:無(wú)縫地將SQL查詢與Spark程序混合。 Spark SQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spa

4、rk中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢,在Python,Scala和Java中集成了API。這種緊密的集成使得SQL查詢以及復(fù)雜的分析算法可以輕松地運(yùn)行。可擴(kuò)展性:對(duì)于交互式查詢和長(zhǎng)查詢使用相同的引擎。Spark SQL利用RDD模型來(lái)支持查詢?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè),不需擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。PART 02 Spark SQL執(zhí)行流程Spark SQL執(zhí)行流程類似于關(guān)系型數(shù)據(jù)庫(kù),Spark SQL語(yǔ)句也是由Projection(a1,a2,a3)、 Data Source (tableA)、 Filter(condition)三部分組成,分別對(duì)應(yīng)SQL查詢過(guò)程中的Result、D

5、ata Source、 Operation,也就是說(shuō)SQL語(yǔ)句按Result-Data Source-Operation的次序來(lái)描述的。Spark SQL執(zhí)行流程解析(Parse)對(duì)讀入的SQL語(yǔ)句進(jìn)行解析,分辨出SQL語(yǔ)句中哪些詞是關(guān)鍵詞(如SELECT、 FROM、WHERE),哪些是表達(dá)式、哪些是 Projection、哪些是 Data Source 等,從而判斷SQL語(yǔ)句是否規(guī)范; 綁定(Bind)將SQL語(yǔ)句和數(shù)據(jù)庫(kù)的數(shù)據(jù)字典(列、表、視圖等)進(jìn)行綁定,如果相關(guān)的Projection、Data Source等都存在,則這個(gè)SQL語(yǔ)句是可以執(zhí)行的; Spark SQL執(zhí)行流程優(yōu)化(Op

6、timize)一般的數(shù)據(jù)庫(kù)會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫(kù)會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃; 執(zhí)行(Execute)按Operation-Data Source-Result 的次序來(lái)執(zhí)行計(jì)劃。在執(zhí)行過(guò)程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過(guò)的SQL語(yǔ)句,可能直接從數(shù)據(jù)庫(kù)的緩沖池中獲取返回結(jié)果。PART 03 基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是由“命名列”(類似關(guān)系表的字段定義)所組織起來(lái)的一個(gè)分布式數(shù)據(jù)集合,可以把它看成是一個(gè)關(guān)系型數(shù)據(jù)庫(kù)的表?;A(chǔ)數(shù)據(jù)模型DataFrameDataFrame是Spark SQL的核心,它將數(shù)據(jù)保存

7、為行構(gòu)成的集合,行對(duì)應(yīng)列有相應(yīng)的列名。DataFrame與RDD的主要區(qū)別在于,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL可以掌握更多的結(jié)構(gòu)信息,從而能夠?qū)ataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)?;A(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)比:PART 04 使用Spark SQL的方式使用Spark SQL的方式使用Spark SQL,首先利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame;然

8、后,利用DataFrame上豐富的API進(jìn)行查詢、轉(zhuǎn)換;最后,將結(jié)果進(jìn)行展現(xiàn)或存儲(chǔ)為各種外部數(shù)據(jù)形式。SparkSQL為Spark提供了查詢結(jié)構(gòu)化數(shù)據(jù)的能力,查詢時(shí)既可以使用SQL也可以使用DataFrameAPI(RDD)。通過(guò)Thrift Server,SparkSQL支持多語(yǔ)言編程包括Java、Scala、Python及R。使用Spark SQL的方式使用Spark SQL的方式加載數(shù)據(jù). 從Hive中的users表構(gòu)造DataFrame:users = sqlContext.table(users). 加載S3上的JSON文件:logs = sqlContext.load(s3n:/p

9、ath/to/data.json, json). 加載HDFS上的Parquet文件:clicks = sqlContext.load(hdfs:/path/to/data.parquet, parquet)使用Spark SQL的方式加載數(shù)據(jù). 通過(guò)JDBC訪問(wèn)MySQL:comments = sqlContext.jdbc(jdbc:mysql:/localhost/comments, user). 將普通RDD轉(zhuǎn)變?yōu)镈ataFrame:rdd = sparkContext.textFile(“article.txt”) .flatMap(_.split( ) .map(_, 1) .re

10、duceByKey(_+_) wordCounts = sqlContext.createDataFrame(rdd, word, count)使用Spark SQL的方式加載數(shù)據(jù). 將本地?cái)?shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame:data = (Alice, 21), (Bob, 24)people = sqlContext.createDataFrame(data, name, age). 將PandasDataFrame轉(zhuǎn)變?yōu)镾parkDataFrame(PythonAPI特有功能):sparkDF=sqlContext.createDataFrame(pandasDF)使用Spark SQL的

11、方式使用DataFrame. 創(chuàng)建一個(gè)只包含年輕用戶的DataFrame :young = users.filter(users.age 21) . 也可以使用Pandas風(fēng)格的語(yǔ)法: young = usersusers.age = 13 AND age = 19)teenagers.show()Parquet文件數(shù)據(jù)源JSON DataSets 數(shù)據(jù)源JSON DataSets 數(shù)據(jù)源Spark SQL可以自動(dòng)根據(jù)JSON DataSet的格式把其上載為DataFrame。用路徑指定JSON dataset;路徑下可以是一個(gè)文件,也可以是多個(gè)文件:sc = spark.sparkConte

12、xtpath = examples/src/main/resources/people.jsonpeopleDF = spark.read.json(path)使用的結(jié)構(gòu)可以調(diào)用printSchema()方法打?。簆eopleDF.printSchema()利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql方法進(jìn)行SQL查詢:peopleDF.createOrReplaceTempView(people)teenagerNamesDF = spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19)teenagerNam

13、esDF.show()JSON DataSets 數(shù)據(jù)源JSON dataset的DataFrame也可以是RDDString 格式,每個(gè)JSON對(duì)象為一個(gè)string:jsonStrings = name:Yin,address:city:Columbus,state:OhiootherPeopleRDD = sc.parallelize(jsonStrings)otherPeople = spark.read.json(otherPeopleRDD)otherPeople.show()JSON DataSets 數(shù)據(jù)源Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源Spark SQL支持對(duì)Hive中的數(shù)據(jù)

14、進(jìn)行讀寫。首先創(chuàng)建一個(gè)支持Hive的SparkSession對(duì)象,包括與Hive metastore的連接,支持Hive的序列化和反序列化操作,支持用戶定義的Hive操作等。warehouse_location = abspath(spark-warehouse)spark = SparkSession .builder .appName(Python Spark SQL Hive integration example) .config(spark.sql.warehouse.dir, warehouse_location) .enableHiveSupport() .getOrCreate

15、()warehouse_location 指定數(shù)據(jù)庫(kù)和表的缺省位置:Hive表數(shù)據(jù)源spark.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive)spark.sql(LOAD DATA LOCAL INPATH examples/src/main/resources/kv1.txt INTO TABLE src)基于新創(chuàng)建的SparkSession創(chuàng)建表和上載數(shù)據(jù)到表中:spark.sql(SELECT * FROM src).show()spark.sql(SELECT COUNT(*) FROM sr

16、c).show()使用HiveQL進(jìn)行查詢:Hive表數(shù)據(jù)源sqlDF = spark.sql(SELECT key, value FROM src WHERE key val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext1943a343scala import sqlContext.implicits._import sqlContext.implicits._SQLContext

17、Spark SQL CLI介紹下面的操作基于一個(gè)簡(jiǎn)單的數(shù)據(jù)文件people.json,文件的內(nèi)容如下:name:Michaelname:Andy, age:30name:Justin, age:19數(shù)據(jù)文件下面語(yǔ)句從本地文件people.json讀取數(shù)據(jù)創(chuàng)建DataFrame:val df = sqlContext.read.json(file:/data/people. json)df: org.apache.spark.sql.DataFrame = age: bigint, name: string創(chuàng)建DataFramesPyspark是針對(duì)Spark的Python API。Spark使

18、用py4j來(lái)實(shí)現(xiàn)Python與Java的互操作,從而實(shí)現(xiàn)使用Python編寫Spark程序。Spark也同樣提供了Pyspark,一個(gè)Spark的Python Shell,可以以交互的方式使用Python編寫Spark程序。PART 07 在Pyspark中使用Spark SQL在Pyspark中使用Spark SQL在終端上啟動(dòng)PythonSparkShell:./bin/pyspark使用JSON文件作為數(shù)據(jù)源,創(chuàng)建JSON文件/home/sparksql/courses.json,并輸入下面的內(nèi)容:實(shí)例描述name:Linux, type:basic, length:10name:TCP

19、IP, type:project, length:15name:Python, type:project, length:8name:GO, type:basic, length:2name:Ruby, type:basic, length:5在Pyspark中使用Spark SQL首先使用SQLContext模塊,其作用是提供Spark SQL處理的功能。在Pyspark Shell中逐步輸入下面步驟的內(nèi)容:引入pyspark.sql中的SQLContext:from pyspark.sql import SQLContext創(chuàng)建SQLContext對(duì)象使用pyspark的SparkCont

20、ext對(duì)象,創(chuàng)建SQLContext對(duì)象:sqlContext = SQLContext(sc)在Pyspark中使用Spark SQLDataFrame對(duì)象可以由RDD創(chuàng)建,也可以從Hive表或JSON文件等數(shù)據(jù)源創(chuàng)建。創(chuàng)建DataFrame,指明來(lái)源自JSON文件:df = sqlContext.read.json(/home/shiyanlou/courses.json)創(chuàng)建DataFrame對(duì)象在Pyspark中使用Spark SQL首先打印當(dāng)前DataFrame里的內(nèi)容和數(shù)據(jù)表的格式:df.select(name).show()#展示了所有的課程名df.select(name, le

21、ngth).show()#展示了所有的課程名及課程長(zhǎng)度對(duì)DataFrame進(jìn)行操作show()函數(shù)將打印出JSON文件中存儲(chǔ)的數(shù)據(jù)表;使用printSchema()函數(shù)打印數(shù)據(jù)表的格式。然后對(duì)DataFrame的數(shù)據(jù)進(jìn)行各種操作:df.show() df.printSchema()在Pyspark中使用Spark SQLdf.filter(dftype = basic).select(name, type).show()#展示了課程類型為基礎(chǔ)課(basic)的課程名和課程類型df.groupBy(type).count().show()#計(jì)算所有基礎(chǔ)課和項(xiàng)目課的數(shù)量。首先需要將DataFram

22、e注冊(cè)為Table才可以在該表上執(zhí)行SQL語(yǔ)句:df.registerTempTable(courses)coursesRDD = sqlContext.sql(SELECT name FROM courses WHERE length = 5 and length = 10)names = coursesRDD.rdd.map(lambda p: Name: + )for name in names.collect(): print name執(zhí)行SQL語(yǔ)句在Pyspark中使用Spark SQLParquet是Spark SQL讀取的默認(rèn)數(shù)據(jù)文件格式,把從JSON中讀取的Data

23、Frame保存為Parquet格式,只保存課程名稱和長(zhǎng)度兩項(xiàng)數(shù)據(jù):df.select(name, length).write.save(/tmp/courses.parquet, format=parquet)保存 DataFrame為其他格式將創(chuàng)建hdfs:/master:9000/tmp/courses.parquet文件夾并存入課程名稱和長(zhǎng)度數(shù)據(jù)。Spark SQL實(shí)現(xiàn)了Thrift JDBC/ODBC server,所以Java程序可以通過(guò)JDBC遠(yuǎn)程連接Spark SQL發(fā)送SQL語(yǔ)句并執(zhí)行。PART 08 在Java中連接Spark SQL在Java中連接Spark SQL首先將$

24、HIVE_HOME/conf/hive-site.xml 拷貝到$SPARK_HOME/conf目錄下。另外,因?yàn)镠ive元數(shù)據(jù)信息存儲(chǔ)在MySQL中,所以Spark在訪問(wèn)這些元數(shù)據(jù)信息時(shí)需要MySQL連接驅(qū)動(dòng)的支持。添加驅(qū)動(dòng)的方式有三種:在$SPARK_HOME/conf目錄下的spark-defaults.conf中添加:spark.jars /opt/lib2/mysql-connector-java-5.1.26-bin.jar;可以實(shí)現(xiàn)添加多個(gè)依賴jar比較方便:spark.driver.extraClassPath /opt/lib2/mysql-connector-java-5.

25、1.26-bin.jar;設(shè)置配置在運(yùn)行時(shí)添加 -jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上面的準(zhǔn)備工作后,Spark SQL和Hive就繼承在一起了,Spark SQL可以讀取Hive中的數(shù)據(jù)。設(shè)置配置啟動(dòng)Thrift在Spark根目錄下執(zhí)行:./sbin/start-thriftserver.sh開(kāi)啟thrift服務(wù)器,它可以接受所有spark-submit的參數(shù),并且還可以接受-hiveconf 參數(shù)。不添加任何參數(shù)表示以local方式運(yùn)行,默認(rèn)的監(jiān)聽(tīng)端口為10000 在Java中連接Spark SQL添加依賴打開(kāi)Eclips

26、e用JDBC連接Hive Server2。新建一個(gè)Maven項(xiàng)目,在pom.xml添加以下依賴:org.apache.hivehive-jdbc1.2.1org.apache.hadoophadoop-common2.4.1在Java中連接Spark SQL添加依賴jdk.toolsjdk.tools1.6system$JAVA_HOME/lib/tools.jar在Java中連接Spark SQLJDBC連接Hive Server2的相關(guān)參數(shù):驅(qū)動(dòng):org.apache.hive.jdbc.HiveDriverurl:jdbc:hive2:/31:10000/default用戶名:hadoop (啟動(dòng)thriftserver的linux用戶名)密碼:“”(默認(rèn)密碼為空)JDBC連接參數(shù)在Java中連接Spark SQLimportjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.sql.Statement;publicclassTest1publicstaticvoidmain(Stringargs)throwsSQLExceptionStringurl=jdbc:hive2:

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論