




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、Spark SQL數(shù)據(jù)加載和保存實(shí)戰(zhàn)一:前置知識(shí)詳解:Spark SQL 重要是操作 DataFrame ,DataFrame 本身提供了 save 和 load 的操作, Load :可以創(chuàng)建 DataFrame ,Save :把 DataFrame中的數(shù)據(jù)保存到文件或者說(shuō)與具體的格式來(lái)指明我們要讀取的文件的類型以及與具體的格式來(lái)指出我們要輸出的文件是什么類型。二: Spark SQL讀寫(xiě)數(shù)據(jù)代碼實(shí)戰(zhàn):import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spa
2、rk.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public
3、 class SparkSQLLoadSaveOps public static void main(String args) SparkConfconfSparkConf().setMaster(local).setAppName(SparkSQLLoadSaveOps);JavaSparkContext sc = new JavaSparkContext(conf);SQLContext = new SQLContext(sc);/* read() 是 DataFrameReader類型, load 可以將數(shù)據(jù)讀取出來(lái)*/=newDataFrame peopleDF = sqlContex
4、t.read().format(json).load(E:SparkSparkinstanll_packageBig_Data_Softwarespa rk-1.6.0-bin-hadoop2.6examplessrcmainresourcespeople.json);/* 直接對(duì) DataFrame進(jìn)行操作* Json:是一種自解釋的格式,讀取Json 的時(shí)候怎么判斷其是什么格式?通過(guò)掃描整個(gè) Json 。掃描之后才會(huì)知道元數(shù)據(jù)*/通過(guò) mode 來(lái)指定輸出文件的是 append 。創(chuàng)建新文件來(lái)追加文件peopleDF.select(name).write().mode(SaveMode.A
5、ppend).save(E:personNames);讀取過(guò)程源碼分析如下:1. read方法返回 DataFrameReader,用于讀取數(shù)據(jù)。/*: Experimental :Returns a DataFrameReader that can be used to read data in as a DataFrame.sqlContext.read.parquet(/path/to/file.parquet)sqlContext.read.schema(schema).json(/path/to/file.json)* group genericdata* since 1.4.0*/
6、Experimental/ 創(chuàng)建 DataFrameReader實(shí)例,獲得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)然后再調(diào)用 DataFrameReader 類中的 format ,指出讀取文件的格式。/*Specifies the input data source format.since 1.4.0*/def format(source: String): DataFrameReader = this.source = sourcethis3.通過(guò) DtaFrameReader中 loa
7、d 方法通過(guò)路徑把傳入過(guò)來(lái)的輸入變成DataFrame。/*Loads input in as a DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()至此,數(shù)據(jù)的讀取工作就完成了,下面就對(duì)DataFrame進(jìn)行操作。下面
8、就是寫(xiě)操作!1. 調(diào)用 DataFrame中 select函數(shù)進(jìn)行對(duì)列篩選/*Selects a set of columns. This is a variant of select that can only selectexisting columns using column names (i.e. cannot construct expressions).*/ The following two are equivalent:df.select(colA, colB)df.select($colA, $colB)group dfopssince 1.3.0*/scala.annot
9、ation.varargsdef select(col: String, cols: String*): DataFrame = select(col +: cols).map(Column(_) : _*)然后通過(guò) write 將結(jié)果寫(xiě)入到外部存儲(chǔ)系統(tǒng)中。/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFra
10、meWriter(this)在保持文件的時(shí)候 mode 指定追加文件的方式/* Specifies the behavior when data or table already exists. Options include:/ Overwrite是覆蓋- SaveMode.Overwrite: overwrite the existing data.創(chuàng)建新的文件,然后追加- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).- SaveMode.ErrorIfExist
11、s: default option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis最后, save() 方法觸發(fā) action ,將文件輸出到指定文件中。/*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.extraOptions += (path
12、- path)save()三: Spark SQL讀寫(xiě)整個(gè)流程圖如下:這里寫(xiě)圖片描述四:對(duì)于流程中部分函數(shù)源碼詳解:DataFrameReader.Load()Load ()返回 DataFrame 類型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認(rèn)的路徑讀取。/*Returns the dataset stored at path as a DataFrame,using the default data source configured by spark.sql.sources.default.group genericdatadeprecated As of 1.4.0, replaced by re
13、ad().load(path). This will be removed in Spark 2.0.*/deprecated(Use read.load(path). This will be removed in Spark 2.0., 1.4.0)def load(path: String): DataFrame = / 此時(shí)的 read 就是 DataFrameReaderread.load(path)追蹤 load 源碼進(jìn)去,源碼如下:在 DataFrameReader中的方法。 Load() 通過(guò)路徑把輸入傳進(jìn)來(lái)變成一個(gè)DataFrame。/*Loads input in as a
14、 DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()追蹤 load 源碼如下:/*Loads input in as a DataFrame, for data sources that dont require a p
15、ath (e.g. externalkey-value stores).*since 1.4.0*/def load(): DataFrame = 對(duì)傳入的 Source 進(jìn)行解析val resolved = ResolvedDataSource(sqlContext,userSpecifiedSchema = userSpecifiedSchema,partitionColumns = Array.emptyString,provider = source,options = extraOptions.toMap)DataFrame(sqlContext, LogicalRelation(r
16、esolved.relation)DataFrameReader.format()1. Format:具體指定文件格式,這就獲得一個(gè)巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。Spark SQL在讀取文件的時(shí)候可以指定讀取文件的類型。例如,Json,Parquet./* Specifies the input data source format.Built-in options include“parquet”,”json ”,etc.* since 1.4.0*/def format(source: String): DataFrameReader = this
17、.source = source /FileTypethisDataFrame.write()1. 創(chuàng)建 DataFrameWriter實(shí)例/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)2.追蹤 DataFrameWriter源碼如下:以 DataFrame的方式向外部存儲(chǔ)系
18、統(tǒng)中寫(xiě)入數(shù)據(jù)。/*: Experimental :Interface used to write a DataFrame to external storage systems (e.g. file systems,key-value stores, etc). Use DataFrame.write to access this.*since 1.4.0*/Experimentalfinal class DataFrameWriter privatesql(df: DataFrame) DataFrameWriter.mode()Overwrite 是覆蓋,之前寫(xiě)的數(shù)據(jù)全都被覆蓋了。Appe
19、nd: 是追加,對(duì)于普通文件是在一個(gè)文件中進(jìn)行追加,但是對(duì)于parquet格式的文件則創(chuàng)建新的文件進(jìn)行追加。/*Specifies the behavior when data or table already exists. Options include:- SaveMode.Overwrite: overwrite the existing data.- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).默認(rèn)操作- SaveMode.ErrorIfExists: de
20、fault option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis通過(guò)模式匹配接收外部參數(shù)/*Specifies the behavior when data or table already exists. Options include:- overwrite: overwrite the existing data.- append: append the data.- ignore: ignore
21、 the operation (i.e. no-op).- error: default option, throw an exception at runtime.*since 1.4.0*/def mode(saveMode: String): DataFrameWriter = this.mode = saveMode.toLowerCase match case overwrite = SaveMode.Overwritecase append = SaveMode.Appendcase ignore = SaveMode.Ignorecase error | default = Sa
22、veMode.ErrorIfExistscase _ = throw new IllegalArgumentException(sUnknown save mode: $saveMode. + Accepted modes are overwrite, append, ignore, error.)thisDataFrameWriter.save()save 將結(jié)果保存?zhèn)魅氲穆窂健?*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.
23、extraOptions += (path - path)save()追蹤 save 方法。/*Saves the content of the DataFrame as the specified table.since 1.4.0*/def save(): Unit = ResolvedDataSource(df.sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.emptyString),mode,extraOptions.toMap,df)其中 source 是 SQLConf 的 defaultDa
24、taSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName11其中 DEFAULT_DATA_SOURCE_NAME默認(rèn)參數(shù)是parquet。/ This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf(spark.sql.sources.default, defaultValue = Some(pache.spark.sql.parquet), doc = The default
25、 data source to use in input/output.)DataFrame.Scala中部分函數(shù)詳解:1. toDF函數(shù)是將 RDD 轉(zhuǎn)換成 DataFrame/*Returns the object itself.group basicsince 1.3.0*/This is declared with parentheses to prevent the Scala compiler from treatingrdd.toDF(1) as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = thisshow() 方法:將結(jié)果顯示出來(lái)/*Displays the DataFrame in a tabular form. For example:*yearmonth AVG(Adj Close) MAX(Adj Close)*1980120.5032180.595103*1981010.5232890.570307*1982020.4365040.475256
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 軟件開(kāi)發(fā)與部署合同書(shū)范本
- 清華大學(xué)《果蔬飲料新產(chǎn)品開(kāi)發(fā)與配方設(shè)計(jì)》2023-2024學(xué)年第二學(xué)期期末試卷
- 山西臨汾霍州第一期第二次月考2025屆初三第二學(xué)期3月第一次測(cè)試數(shù)學(xué)試題含解析
- 浙江杭州西湖區(qū)重點(diǎn)名校2025年初三年級(jí)學(xué)情檢測(cè)試題英語(yǔ)試題含答案
- 私家車出租合同
- 寧波幼兒師范高等??茖W(xué)?!蹲≌O(shè)計(jì)原理》2023-2024學(xué)年第二學(xué)期期末試卷
- 遼寧省鞍山市臺(tái)安縣2025屆數(shù)學(xué)三下期末教學(xué)質(zhì)量檢測(cè)試題含解析
- 四川省成都市成華區(qū)重點(diǎn)中學(xué)2025屆初三期初調(diào)研考試語(yǔ)文試題試卷含解析
- 遼寧民族師范高等??茖W(xué)?!抖Y儀文化與有效溝通》2023-2024學(xué)年第一學(xué)期期末試卷
- 山東省濱州市濱城區(qū)濱北街道辦事處北城英才學(xué)校2025屆六年級(jí)下學(xué)期小升初數(shù)學(xué)試卷含解析
- 德施曼智能鎖使用說(shuō)明書(shū)
- 《辦公室用語(yǔ)》課件
- 光伏并網(wǎng)前單位工程驗(yàn)收?qǐng)?bào)告-2023
- 《高層建筑結(jié)構(gòu)設(shè)計(jì)》課件 第5、6章 高層建筑結(jié)構(gòu)有限元計(jì)算、高層框架結(jié)構(gòu)設(shè)計(jì)
- 除濕防潮施工方案
- 基于PLC的自動(dòng)化立體倉(cāng)庫(kù)控制系統(tǒng)設(shè)計(jì)
- 《囊螢夜讀.》教學(xué)設(shè)計(jì)及教學(xué)反思
- 2023下半年教師資格《初中道德與法治學(xué)科知識(shí)與教學(xué)能力》押題卷2
- 壓力容器年度自查表
- 危險(xiǎn)化學(xué)品安全管理與安全技術(shù)培訓(xùn)課件
- 小學(xué)數(shù)學(xué)-《圖形的拼組》教學(xué)課件設(shè)計(jì)
評(píng)論
0/150
提交評(píng)論