spark交流0804內(nèi)訓(xùn)介紹下_第1頁
spark交流0804內(nèi)訓(xùn)介紹下_第2頁
spark交流0804內(nèi)訓(xùn)介紹下_第3頁
spark交流0804內(nèi)訓(xùn)介紹下_第4頁
spark交流0804內(nèi)訓(xùn)介紹下_第5頁
已閱讀5頁,還剩43頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

Spark介紹(下)陳希富2015年07月目錄123初識(shí)ScalaSpark流計(jì)算Spark安裝及基本操作概述曾經(jīng)有人問Java的創(chuàng)始人高斯林這樣一個(gè)問題,“除了Java語言以外,您現(xiàn)在還使用JVM平臺(tái)上的哪種編程語言?”他毫不猶豫的說是Scala。在目前眾多的JVM語言當(dāng)中,Scala無疑是最引人注意的語言之一。連Java8也引入了FP。

從某種程度上來說,Java認(rèn)可了Scala的做法。Scala是一門多范式編程語言,綜合了多門語言的風(fēng)格和思想,志在以簡(jiǎn)練、優(yōu)雅及類型安全的方式來表達(dá)常用編程模式。它平滑地集成了面向?qū)ο蠛秃瘮?shù)語言的特性??梢哉f,Scala是面向函數(shù)與面向?qū)ο蟮幕旌蟂cala能否成為Java殺手?一個(gè)Twitter的開發(fā)人員說:Scala將會(huì)成為現(xiàn)代Web2.0的發(fā)起語言Scala是運(yùn)行在JVM之上的,Scala的代碼也被編譯為.class形式,性能接近Java。Scala是一個(gè)靜態(tài)語言,更適合大型工程項(xiàng)目。Scala是針對(duì)“并發(fā)性問題”的解決方案之一,讓開發(fā)人員能夠更加輕松地專注于問題的實(shí)質(zhì),而不用考慮并發(fā)編程的低級(jí)細(xì)節(jié)。為什么要使用ScalaSpark是用Scala開發(fā)的,如果出現(xiàn)異常,在排查問題時(shí)對(duì)Spark源碼比較熟悉,可以起到事半功倍的效果。Scala是一門優(yōu)雅的語言技術(shù),使用Scala開發(fā)Spark應(yīng)用程序開發(fā)效率更高,代碼更精簡(jiǎn)。Actor編程模式讓高度并行應(yīng)用程序的開發(fā)更加簡(jiǎn)單,而不必依照復(fù)雜的Java線程模型來編寫程序。有人說:熟悉Scala之后再看Java代碼,有種讀匯編的感覺!兩個(gè)簡(jiǎn)單的Scala例子1)對(duì)集合求2的整數(shù)倍:varlists=List(1,2,3,4,5)2)JavaBean:publicclassPeople{ privateStringname; privateStringsex; publicStringgetName(){returnname;} publicvoidsetName(Stringname){=name;} publicStringgetSex(){returnsex;} publicvoidsetSex(Stringsex){this.sex=sex;} }Scala:classPeople(varname:String,varsex:String){}lists.filter(n=>n%2==0)或lists.filter(_%2==0)Scala的特性Scala是面向?qū)ο蟮腟cala是一個(gè)純面向?qū)ο笳Z言,在某種意義上來講所有數(shù)值都是對(duì)象。對(duì)象的類型和行為是由class和trait來描述的。Class的抽象可由子類化和一種靈活的基于mixin的組合機(jī)制(它可作為多重繼承的簡(jiǎn)單替代方案)來擴(kuò)展。Scala是函數(shù)式的Scala還是一個(gè)函數(shù)式語言,在某種意義上來講所有函數(shù)都是數(shù)值。Scala為定義匿名函數(shù)提供了一種輕量級(jí)的語法,它支持高階(higher-order)函數(shù)、允許函數(shù)嵌套、支持局部套用(currying)。Scala的case類及其內(nèi)置支持的模式匹配模型在許多函數(shù)式編程語言中都被使用。Scala是靜態(tài)類型的Scala配備了一套富有表現(xiàn)力的類型系統(tǒng),該抽象概念以一種安全的和一致的方式被使用。Scala是可擴(kuò)展的Scala的設(shè)計(jì)承認(rèn)了實(shí)踐事實(shí),領(lǐng)域特定應(yīng)用開發(fā)通常需要領(lǐng)域特定語言擴(kuò)展。Scala提供了一個(gè)獨(dú)特的語言組合機(jī)制,這可以更加容易地以類庫的形式增加新的語言結(jié)構(gòu):任何方式可以被用作中綴(infix)或后綴(postfix)操作符閉包按照所期望的類型(目標(biāo)類型)自動(dòng)地被構(gòu)造兩者結(jié)合使用可方便地定義新語句,無需擴(kuò)展語法,也無需使用類似宏的元編程工具。Scala可與Java和.NET進(jìn)行互操作這一點(diǎn),在Scala設(shè)計(jì)之初就已經(jīng)有所考慮。Scala和java代碼上的比較1、告別return語句。

在Scala中無需return語句,代碼最后執(zhí)行的結(jié)果為返回值。2、類型推斷,你只需要使用val或var。vararg1=“abc”valarg2=2valarg3:Int=43、Tuples(元組)讓定義Map<String,Map<String,Integer>>時(shí)哭出來。vartuple=(1,”abc”,2)varmap=Map(1->”a”,2->”b”,3->”c”)4、for里還能寫iffor(file<-filesiffile.isFile )println(file)5、if語句也有返回值valfilename=if(!args.isEmpty)args(0)else"default.txt"6、多重繼承。Trait(類似java的接口)中可以有具體的實(shí)現(xiàn)。7、……Scala基本語法變量的定義通過val和var定義val類似java中的final無需;結(jié)尾類型推斷valmsg="Hello,World"valmsg2:String="Helloagain"函數(shù)的定義defadd(x:Int,y:Int):Int={x+y}每個(gè)Scala函數(shù)都有返回值,只是有些返回值類型為Unit,類似Java中的void類型函數(shù)的最后一個(gè)表達(dá)式的值就可以作為函數(shù)的結(jié)果作為返回值Ifelse語句也有返回值(其實(shí)if也是一個(gè)函數(shù))迭代for:for(arg<-args)println(arg)foreach:args.foreach(arg=>println(arg))Scala基本語法數(shù)組的操作數(shù)組的定義:valargs=newArray[String](3)數(shù)組的訪問:args(0)=“hello”或args(0)集合的操作List:valnumList=List(1,2)元組(Tuples):valtp=(123,“hello")Set:varscalaSet=Set(“hello",”hi")Map:valscalaMap=Map(1->“I”,2->“II”)Scala基本語法-if和whilevarfilename="default.txt"if(!args.isEmpty)filename=args(0)valfilename=if(!args.isEmpty)args(0)else"default.txt“(使用val更為函數(shù)式編程風(fēng)格)while(a!=0){……}do{...}while(a!=0)(while基本和java類似,且沒有返回值,所以要盡量避免使用while)Scala基本語法-for(瑞士軍刀)語法格式:valfilesHere=(newjava.io.File(".")).listFilesfor(file<-filesHere)println(file)條件過濾:valfilesHere=(newjava.io.File(".")).listFilesfor(file<-filesHereiffile.isFileiffile.getName.endsWith(".scala"))println(file)Scala基本語法-for(瑞士軍刀)嵌套迭代:valfilesHere=(newjava.io.File(".")).listFilesdeffileLines(file:java.io.File)=scala.io.Source.fromFile(file).getLines().toListdefgrep(pattern:String)=for{file<-filesHereiffile.getName.endsWith(".scala")

line<-fileLines(file)ifline.trim.matches(pattern)}println(file+":"+line.trim)grep(".*gcd.*")Scala基本語法-for(瑞士軍刀)生成新集合:valfilesHere=(newjava.io.File(".")).listFilesdefscalaFiles=for{file<-filesHereiffile.getName.endsWith(".scala")}yieldfileScala基本語法-matchvalfirstArg=if(args.length>0)args(0)else""valfriend=firstArgmatch{case"salt"=>"pepper"case"chips"=>"salsa"case"eggs"=>"bacon"case_=>"huh?"}println(friend)Scala基本語法-try拋出異常:thrownewRuntimeException("nmustbeeven")捕獲異常:try{valf=newFileReader("input.txt")}catch{caseex:FileNotFoundException=>//handlemissingfilecaseex:IOException=>//handleotherI/Oerror}finally:try{//usethefile}finally{file.close()}try…catch…也有返回值

類和對(duì)象-定義objectChecksumAccumulator{

privatevalcache=Map[String,Int]()

defcalculate(s:String):Int=

if(cache.contains(s))

cache(s)

else{

valacc=newChecksumAccumulator

for(c<-s)

acc.add(c.toByte)

valcs=acc.checksum()

cache+=(s->cs)

cs

}

}classChecksumAccumulator{

privatevarsum=0

defadd(b:Byte):Unit=sum+=b

defchecksum():Int=~(sum&0xFF)+1

}ChecksumAccumulator.calculate("helloScala")類和對(duì)象-操作有理數(shù)為例,定義其加減乘除,了解類的操作:有理數(shù)(rational)可以表示成個(gè)分?jǐn)?shù)形式:n/d,其中n和d都是整數(shù)(d不可以為0),n稱為分子(numberator),d為分母(denominator)。1)定義:classRational(n:Int,d:Int)2)重寫:classRational(n:Int,d:Int){

overridedeftoString=n+"/"+d}3)前提條件檢查:classRational(n:Int,d:Int){ require(d!=0) overridedeftoString=n+"/"+d}類和對(duì)象-操作4)添加成員變量和重載:classRational(n:Int,d:Int){require(d!=0)valnumber=nvaldenom=doverridedeftoString=number+"/"+denom

defadd(that:Rational)=newRational(number*that.denom+that.number*denom,denom*that.denom)defadd(i:Int)=newRational(number+i*denom,denom)}那么就可以使用a+b或a+Int的形式進(jìn)行運(yùn)算了如何實(shí)現(xiàn)Int+a形式?implicitdefintToRational(x:Int)=newRational(x,1)頭等公民-函數(shù)-類成員函數(shù)當(dāng)程序越來越大,你需要使用函數(shù)將代碼細(xì)化為小的容易管理的模塊。和Java相比,Scala提供了多種Java不支持的方法來定義函數(shù),除了類成員函數(shù)外,Scala還支持嵌套函數(shù),函數(shù)字面量,函數(shù)變量等。importscala.io.SourceobjectLongLines{defprocessFile(filename:String,width:Int){valsource=Source.fromFile(filename)for(line<-source.getLines())processLine(filename,width,line)}privatedefprocessLine(filename:String,width:Int,line:String){if(line.length>width)println(filename+":"+line.trim)}}頭等公民-函數(shù)-局部函數(shù)局部函數(shù):importscala.io.SourceobjectLongLines{defprocessFile(filename:String,width:Int){defprocessLine(line:String){if(line.length>width)println(filename+":"+line.trim)}valsource=Source.fromFile(filename)for(line<-source.getLines())processLine(line)}}1)函數(shù)嵌套函數(shù)。2)局部函數(shù)可以直接訪問上層函數(shù)的參數(shù)頭等公民-函數(shù)-函數(shù)字面量函數(shù)字面量:字面量:(x:Int)=>x+1附值給變量:varincrease=(x:Int)=>x+1多行:varincrease=(x:Int)=>{println("We")println("are")println("here")x+1}函數(shù)做為參數(shù):valsomeNumbers=List(-11,-10,-5,0,5,10)someNumbers.foreach((x:Int)=>println(x))someNumbers.filter(x=>x>0)簡(jiǎn)化寫法:valf=(_:Int)+(_:Int)+(_:Int)f(5,10,13)頭等公民-函數(shù)-閉包閉包:閉包是可以包含自由(未綁定到特定對(duì)象)變量的代碼塊;這些變量不是在這個(gè)代碼塊內(nèi)或者任何全局上下文中定義的,而是在定義代碼塊的環(huán)境中定義(局部變量)。scala>varmore=1more:Int=1scala>valaddMore=(x:Int)=>x+moreaddMore:Int=>Int=<function1>scala>addMore(100)res1:Int=101(當(dāng)自由變量改變時(shí),scala也能捕獲到這個(gè)變化。同樣scala改變變量時(shí),也能反映到外面。)scala>defmakeIncreaser(more:Int)=(x:Int)=>x+moremakeIncreaser:(more:Int)Int=>Intscala>valinc1=makeIncreaser(1)inc1:Int=>Int=<function1>scala>valinc9999=makeIncreaser(9999)inc9999:Int=>Int=<function1>scala>inc1(10)res5:Int=11scala>inc9999(10)res6:Int=10009頭等公民-函數(shù)-參數(shù)重復(fù)參數(shù):defecho(args:String*)=for(arg<-args)println(arg)echo("One")echo("Hello","World")命名參數(shù):defspeed(distance:Float,time:Float):Float=distance/timespeed(time=10,distance=100)speed(distance=100,time=10)缺省參數(shù)值:defspeed(distance:Float=100,time:Float):Float=distance/timespeed(time=10)頭等公民-函數(shù)-柯里化函數(shù)Scala允許程序員自己新創(chuàng)建一些控制結(jié)構(gòu),并且可以使得這些控制結(jié)構(gòu)在語法看起來和Scala內(nèi)置的控制結(jié)構(gòu)一樣,在Scala中需要借助于柯里化(Currying),柯里化是把接受多個(gè)參數(shù)的函數(shù)變換成接受一個(gè)單一參數(shù)(最初函數(shù)的第一個(gè)參數(shù))的函數(shù),并且返回接受余下的參數(shù)而且返回結(jié)果的新函數(shù)的技術(shù)。普通函數(shù):defplainOldSum(x:Int,y:Int)=x+yplainOldSum(1,2)柯里化函數(shù):defcurriedSum(x:Int)(y:Int)=x+ycurriedSum(1)(2)deffirst(x:Int)=(y:Int)=>x+yvalsecond=first(1)second(2)valonePlus=curriedSum(1)_onePlus(2)Scala編程風(fēng)格和思想-一個(gè)簡(jiǎn)單例子一個(gè)簡(jiǎn)單的原則,如果代碼中含有var類型的變量,這段代碼就是傳統(tǒng)的指令式編程,如果代碼只有val變量,這段代碼就很有可能是函數(shù)式代碼,因此學(xué)會(huì)函數(shù)式編程關(guān)鍵是不使用vars來編寫代碼。defprintArgs(args:Array[String]):Unit={ vari=0 while(i<args.length){ println(args(i)) i+=1 }}defprintArgs(args:Array[String]):Unit={ for(arg<-args) println(arg)}defprintArgs(args:Array[String]):Unit={ args.foreach(println)}Scala編程風(fēng)格和思想-另一個(gè)簡(jiǎn)單例子packagecom.cxf.testimportjava.io.FileobjectClosureTest{privatedeffiles=(newFile("c:\\deleteme")).listFiles()deffileEndFilger(query:String)={for(file<-filesif(file.getName.endsWith(query)))yieldfile}deffileContainsFilger(query:String)={for(file<-filesif(file.getName.contains(query)))yieldfile}deffileRegexFilger(query:String)={for(file<-filesif(file.getName.matches(query)))yieldfile}}

Scala編程風(fēng)格和思想-另一個(gè)簡(jiǎn)單例子packagecom.cxf.testimportjava.io.FileobjectClosureTest{privatedeffiles=(newFile("c:\\deleteme")).listFiles()

deffileFilter(matcher:(String)=>Boolean)={for(file<-filesif(matcher(file.getName)))yieldfile}defendFilter(query:String)=fileFilter(_.endsWith(query))defcontainsFilter(query:String)=fileFilter(_.contains(query))defregexFilter(query:String)=fileFilter(_.matches(query))}

高級(jí)應(yīng)用-抽象類abstractclassElement{defcontents:Array[String]}一個(gè)含有抽象方法的類必須定義成抽象類,也就是使用abstract關(guān)鍵字來定義類。一個(gè)沒有定義實(shí)現(xiàn)的方法就是抽象方法,只要這個(gè)方法沒有具體實(shí)現(xiàn),就是抽象方法。無參函數(shù):abstractclassElement{defcontents:Array[String]valheight=contents.lengthvalwidth=if(height==0)0elsecontents(0).length}高級(jí)應(yīng)用-擴(kuò)展類classArrayElement(conts:Array[String])extendsElement{defcontents:Array[String]=conts}其中extends具有兩個(gè)功效,一是讓ArrayElement繼承所有Element類的非私有成員,第二使得ArrayElement成為Element的一個(gè)子類。而Element稱為ArrayElement的父類。ArrayElement繼承了Element的width和height方法,因此你可以使用ArrayElement.width來查詢寬度,比如:valae=newArrayElement(Array("hello","world"))派生也意味著子類的值可以用在任何可以使用同名父類值的地方,比如:vale:Element=newArrayElement(Array("hello"))組合與繼承-類成員關(guān)系高級(jí)應(yīng)用-Trait在Scala中Trait為重用代碼的一個(gè)基本單位。一個(gè)Traits封裝了方法和變量,和Interface相比,它的方法可以有實(shí)現(xiàn),這一點(diǎn)有點(diǎn)和抽象類定義類似。但和類繼承不同的是,Scala中類繼承為單一繼承,也就是說子類只能有一個(gè)父類。當(dāng)一個(gè)類可以和多個(gè)Trait混合,這些Trait定義的成員變量和方法也就變成了該類的成員變量和方法,由此可以看出Trait集合了Interface和抽象類的優(yōu)點(diǎn),同時(shí)又沒有破壞單一繼承的原則。traitPhilosophical{defphilosophize(){println("Iconsumememeory,thereforIam!")}}這可以使用extends或with來混合一個(gè)traitclassFrogextendsPhilosophical{overridedeftoString="gree"}classFrogextendsAnimalwithPhilosophicalwithHasLegs總結(jié)Scala可以說是:“麻雀雖小,五臟俱全”。Scala也可以簡(jiǎn)單理解為:大量語法糖的Java。重要的是學(xué)習(xí)Scala的語言風(fēng)格及思想。語法更簡(jiǎn)潔,讓開發(fā)人員更關(guān)注具體實(shí)現(xiàn),而不是代碼細(xì)節(jié)更適合于高并發(fā)大型項(xiàng)目應(yīng)用函數(shù)式編程更適合分布式計(jì)算目錄123初識(shí)ScalaSpark流計(jì)算Spark安裝及基本操作伯克利大學(xué)-BDAS軟件棧復(fù)雜的批量數(shù)據(jù)處理(batchdataprocessing)10'~5H?;跉v史數(shù)據(jù)的交互式查詢(interactivequery)10s~5'基于實(shí)時(shí)數(shù)據(jù)流的數(shù)據(jù)處理(streamingdataprocessing)500ms~5s什么是SparkStreaming

將Spark擴(kuò)展為大規(guī)模流處理系統(tǒng)

可以擴(kuò)展到100節(jié)點(diǎn)規(guī)模,達(dá)到秒級(jí)延遲

高效且具有良好的容錯(cuò)性

提供了類似批處理的API,可很容易實(shí)現(xiàn)復(fù)雜算法

電子商務(wù):統(tǒng)計(jì)過去1分鐘內(nèi)訪問最多的5件商品SparkStreaming設(shè)計(jì)動(dòng)機(jī)

很多重要的應(yīng)用要處理大量在線流式數(shù)據(jù),并返回近似實(shí)時(shí)的結(jié)果

社交網(wǎng)絡(luò)趨勢(shì)追蹤

網(wǎng)站指標(biāo)統(tǒng)計(jì)

廣告系統(tǒng)

具備分布式流式處理框架的基本特征

良好的擴(kuò)展性(百級(jí)別節(jié)點(diǎn))

低延遲(秒級(jí)別)有狀態(tài)的流式處理

傳統(tǒng)流式系統(tǒng)采用了“record-at-a-time”的處理模型

每個(gè)節(jié)點(diǎn)狀態(tài)是變化的

對(duì)于每條記錄,修改狀態(tài)后發(fā)射新的記錄

節(jié)點(diǎn)宕機(jī)后,狀態(tài)丟失

在流式處理系統(tǒng)中,讓可變狀態(tài)具有容錯(cuò)性是很具有挑戰(zhàn)性的工作mutablestatenode

1node

3input

recordsnode

2input

records已存在的流式系統(tǒng):Storm

重發(fā)未被處理的數(shù)據(jù)記錄

每條數(shù)據(jù)至少被處理一次

狀態(tài)可能被修改多次

狀態(tài)信息可能因?yàn)椋ㄓ布蜍浖┕收隙鴣G失SparkStreaming基本思想將流式計(jì)算轉(zhuǎn)化為一批很小的、確定的批處理作業(yè)。以X秒為單位將數(shù)據(jù)流切分成離散的作業(yè)將每批數(shù)據(jù)看做RDD,使用RDD操作符處理最終結(jié)果以RDD為單位返回(寫入HDFS或者其他系統(tǒng))SparkSpark

StreamingbatchesofXsecondslivedatastreamprocessedresultsDStream(Discretized

Streams)

表示連續(xù)的數(shù)據(jù)流,可能是輸入流,或通過輸入流轉(zhuǎn)化而成的數(shù)據(jù)流;

內(nèi)部由一系列離散的RDD組成;

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論