2022Apache spark中文實(shí)踐手冊指南_第1頁
2022Apache spark中文實(shí)踐手冊指南_第2頁
2022Apache spark中文實(shí)踐手冊指南_第3頁
2022Apache spark中文實(shí)踐手冊指南_第4頁
2022Apache spark中文實(shí)踐手冊指南_第5頁
已閱讀5頁,還剩131頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

Spark最佳實(shí) 使用Databricks作為分析平 領(lǐng)英如何應(yīng)對ApacheSpark的Scalability挑 利用閃存優(yōu)化在Cosco基礎(chǔ)上的Spark 基于Spark和TensorFlow的機(jī)器學(xué)習(xí)實(shí) 在kubernetes上運(yùn)行apachespark:最佳實(shí)踐和陷 使用RayOnSpark在大數(shù)據(jù)平臺上運(yùn)行新興的人工智能應(yīng) 使用Ray將可擴(kuò)展的自動(dòng)化機(jī)器學(xué)習(xí)(AutoML)用于時(shí)序預(yù) ApacheSpark3.0對Prometheus監(jiān)控的原生支 助力云上開源生態(tài)-阿里云開源大數(shù)據(jù)平臺的發(fā) EMRSpark-SQL性能極致優(yōu)化揭秘概覽 EMRSpark-SQL性能極致優(yōu)化揭秘RuntimeFilter EMRSpark-SQL性能極致優(yōu)化揭秘NativeCodegen SparkCodegen淺 Tablestore結(jié)合Spark的流批一體SQL實(shí) SparkSparkSparkSparkSpark最佳實(shí)踐 簡介:簡介:SPARK+AISUMMIT2020中文精華版線上峰會(huì)將會(huì)帶領(lǐng)大家一起顧2020SPARK又YipitDataDatabricks平臺搭建的分析平臺。YipitData‘sPlatform的相關(guān)介紹。(Whyaplatform)YipitData是一家咨詢公司,其客戶主要是投資基金以及財(cái)富五百強(qiáng)中的一些公司。該公司通過自己的數(shù)據(jù)產(chǎn)品進(jìn)行分析,提供給客戶相應(yīng)的數(shù)據(jù)分析報(bào)告。YipitData的主要產(chǎn)出方式和賺錢方式就是533個(gè)數(shù)據(jù)工程師。數(shù)據(jù)分析的基礎(chǔ)是數(shù)據(jù),所(Whatisinourplatform)OwnTheProductDataCollection、DataExploration、ETLWorkflowsReportGeneration四個(gè)階段。YipitData公司的人員主要包括數(shù)據(jù)分析師和數(shù)據(jù)工程師,其中數(shù)據(jù)分析師來分析數(shù)據(jù)DatabricksWorkspaceNotebookpython、Scala、SQL等語言的代碼,然后交由Databricks平臺去執(zhí)行并返結(jié)果。YipitData'Platform是基于DatabricksDatabricksPython(一)(IngestingYipitData1PBParquet,60KTables1.7K的Databases。他們的數(shù)據(jù)收集使用的是ReadypipeURL之后,將downloadURLsParquetReadypipe對網(wǎng)頁進(jìn)行kinesisFirehose,kinesisFirehoseAWSS3上。在這個(gè)階段所存儲的數(shù)據(jù)都是原始JSONschema的,這類數(shù)據(jù)對于數(shù)據(jù)分析師來JSON文件轉(zhuǎn)換成Bucket,這一步也自帶了壓縮效果。轉(zhuǎn)換完成之后會(huì)有兩個(gè)輸出,如下圖所示,一JSONS3Merge處理,將VariousFilePermissionsDataDatapDaDarkspaut(二)(TableYipitData'sPlatform提供了一些tableutilitiestabletable。比如下圖所示的create_tabletable。YipitData公司來說,上面的過程仍然是一個(gè)比較繁瑣的過程,因?yàn)樵摴咀钪匾娜蝿?wù)是進(jìn)行數(shù)據(jù)分析,且大多數(shù)人員也是數(shù)據(jù)分析師,如果讓數(shù)據(jù)分析師使用SparkAPI去完成上述過程,還是YipitData公司來說,最好是把一些功能進(jìn)行封裝,不要暴露太多的底層功能,create_table函數(shù),大大降低了數(shù)據(jù)分析師的使用難度。(三)(ClusterSparkSpark集群簡單了。為了解決易用性的問題,YipitDataT-ShirtSize劃分巧妙地將集群劃分成SMALL、MEDIUM、LARGE三類,如下圖所示,數(shù)據(jù)分析師在使用的時(shí)候雖然少了靈活性,但是節(jié)省T-Shirt的尺寸一樣做選擇即可,而無需關(guān)心背后的復(fù)雜RESTAPI(四)ETLWorkflow(ETLWorkflowYipitData使用Airflow來實(shí)現(xiàn)ETLWorkflowAirflow來管理ETLWorkflow,ETL的一個(gè)標(biāo)準(zhǔn)工具。對于數(shù)據(jù)工程師來說,Airflow的使用不是很難:首先構(gòu)建一個(gè)DAGTASKTASKS的依賴關(guān)系即可。但是,終究是要寫一段代YipitData來說就不是那么合適了。因此,YipitDataAirflow+databricksAPIDAGs。具體來說,每個(gè)文件夾APIDAGETL的自動(dòng)化,其中用戶只需要指定Notebook中的參數(shù)值即可。YipitData自動(dòng)化創(chuàng)建Workflows的過程如下圖所示,整個(gè)流程都是在Databricks平臺上擴(kuò)展得到的。SparkSpark最佳實(shí)踐 PAGE72 SparkQ1:Databricks和DataworksSpark應(yīng)用更強(qiáng)一些。ETL。ApacheSparkSparkSparkSpark用戶生產(chǎn)力,避免“用SparkSparkSpark用戶生成力以及如何優(yōu)化Spark基礎(chǔ)計(jì)算架構(gòu)。Spark團(tuán)隊(duì)軟件工程師,卡耐基梅隆大學(xué)碩士學(xué)位,專攻分布式系統(tǒng)方向。一、OverviewofSparkEcosystem@二、Scalingchallengeswehave三、SolutionstoscaleourSpark四、SolutionstoscaleSparkcompute一、OverviewofSparkEcosystem@的途徑。在這樣一張龐大的圖譜中分析數(shù)據(jù)獲得insights才能幫助領(lǐng)英平臺中的職場人士。那么大規(guī)ApacheSpark成為了領(lǐng)英的主要計(jì)算引擎。SparkHadoop平臺中100vcores2+PB3Spark應(yīng)用Spark70%shuffle5PB的數(shù)據(jù)。計(jì)算基20193倍以上。Spark是一個(gè)多元化的生態(tài)系統(tǒng),SparkSpark,要么通過AzkabanJupyternotebookSparkAPI60%SparkSQL應(yīng)用,通過豐富的大數(shù)據(jù)應(yīng)用,二、ScalingchallengeswehaveSpark基礎(chǔ)架SparkShuffle,SparkShuffle是最大的規(guī)模擴(kuò)展瓶頸。領(lǐng)英還嘗SQL優(yōu)化技術(shù),SQLSpark應(yīng)用數(shù)量下的計(jì)算工作量,從而進(jìn)一步緩解盡量通過自動(dòng)化的Spark系統(tǒng)答最常見的用戶問題分別是為什么我的Spark運(yùn)行失敗了,為什么我的Spark運(yùn)行很慢,以及如何讓它運(yùn)行的更快。下面分別從用戶生產(chǎn)力及SparkShuffle方面展開詳三、SolutionstoscaleourSparkusersSpark提升用戶生產(chǎn)力,并幫助用戶理解SparkSpark是非??紤]到領(lǐng)英SparkSpark中運(yùn)行時(shí),Spark用戶花費(fèi)了很多功夫終于調(diào)試好Spark團(tuán)GridBench,它可以通過各種報(bào)告幫助用戶理解性能Spark應(yīng)用多次運(yùn)行后的結(jié)果自動(dòng)分析,從而發(fā)現(xiàn)性能瓶頸點(diǎn)。GridBench也可GridBench針對某個(gè)應(yīng)用做出的性能比較報(bào)告,通過對比兩組不同時(shí)間下運(yùn)行間的之間執(zhí)行記錄。GridBench可以確定SparkSparkHistoryServerSpark生態(tài)系統(tǒng)中重要的一Spark應(yīng)用提供歷史日志記錄,通過網(wǎng)頁和RestAPISpark應(yīng)用的詳細(xì)數(shù)據(jù)信息。再通過MetricAPI獲取指標(biāo)數(shù)據(jù)。SparkHistorySparkSparkHistoryServer獲取每個(gè)指標(biāo)數(shù)據(jù),還是遇到了功能Spark應(yīng)用數(shù)量比較大,SparkHistoryServer不能很好的處理大量的并發(fā)請求。其次,SparkHistoryServer在解析歷史日志,提取較大的日志文件SparkHistoryServer,proxyserverworkerserver,通過使用多臺服務(wù)器可以很好的橫向擴(kuò)展。為了應(yīng)用運(yùn)行結(jié)束后才解析歷史日志,IncrementalParsing可以在運(yùn)行時(shí)開始解析,一點(diǎn)點(diǎn)的增量解析日Spark應(yīng)用結(jié)束后,SparkHistoryServerIncrementalParsing可以在很短的時(shí)間內(nèi)提供所需通過對SparkHistoryServerSpark應(yīng)用指標(biāo)的數(shù)據(jù)。為了驅(qū)動(dòng)各種用戶生產(chǎn)力數(shù)據(jù),領(lǐng)英搭建了一套基于KafkasamzaSparkTrackingService?;贙afkasamza是領(lǐng)英開源的流處理系統(tǒng),SparkTrackingServiceResourceManagerSparkIDSparkHistoryServerSpark應(yīng)SparkTrackingService會(huì)進(jìn)一步解析用戶所需要的指標(biāo)數(shù)據(jù)。由此,四、SolutionstoscaleSparkcomputeSparkShuffle有了提升用戶生產(chǎn)力的各種工具之后,Spark團(tuán)隊(duì)可以更多的投入的優(yōu)化計(jì)算引擎之上。Spark本身是一個(gè)復(fù)雜的系統(tǒng),應(yīng)該首先改進(jìn)哪個(gè)組件呢?隨著Spark在領(lǐng)英內(nèi)部使用率的快速增長,SparkShuffleServiceSparkExternalSparkShuffleService管ExecuterSparkShuffleServiceSparkExecuter中ShuffleMapTasksShuffleShuffleShuffleBlock,ShuffleExternalSparkShuffleServiceShuffleReducerTasks開始運(yùn)行時(shí),都會(huì)從遠(yuǎn)程的ShuffleServiceShuffleBlockShuffleService可以輕易ShuffleShuffleReducerTasks。由于SparkShuffleService共享性質(zhì),在大規(guī)模部署應(yīng)用服務(wù)時(shí)遇到了很多問題。SparkShuffleServiceTasks請求陸續(xù)發(fā)出,ShuffleServiceShuffleBlockShuffleServiceShuffle等待時(shí)間。第三個(gè)問題是ShuffleShuffleServiceShuffleBlocks的應(yīng)用,ShuffleBlock時(shí)很容易對ShuffleServiceShuffleShuffleServiceShuffleBlockShuffleBlock5000ShuffleReduceStage,并在圖StageShuffleBlockShuffle等待時(shí)間。數(shù)據(jù)來源是領(lǐng)英ShuffleBlockStage。Stage1ShuffleService升ShuffleServiceShuffle10006000次。ShuffleService背后使用的Next-gen服務(wù)器的問題。在較輕量層RPCRPC隔離開來,在集群高峰時(shí)段,大量的數(shù)據(jù)層的RPCShuffleServiceRPC請求超時(shí)。領(lǐng)英修ShuffleService之后,看到了立桿見影的效果。大大減少了Shuffle組件的可靠性。Stage2:ShuffleServiceShuffleService端限流來幫助解決集群內(nèi)abusive應(yīng)用的影響。這類應(yīng)用所帶ReducerTasksShuffleBlock時(shí),ShuffleShuffle會(huì)開始大量的小數(shù)據(jù)隨機(jī)讀取操作,很容易ServiceShuffleBlockShuffleBlock獲取速率超過閾值時(shí),ShuffleService可以讓相應(yīng)應(yīng)用的ReducerTasks退,通過減少并發(fā)ShuffleBlock獲取數(shù)據(jù)流ShuffleService限流機(jī)制,觀察到集群中所有節(jié)點(diǎn)上的ShuffleServiceBlockabusivejobShuffleService以及相鄰應(yīng)用的影響開始受到了控制。同時(shí),還觀察到集群上Shuffle數(shù)據(jù)傳輸速率并沒有特別明顯的變化。這意味著當(dāng)限制那些少數(shù)abusiveShuffle數(shù)據(jù)吞吐量。ShuffleServiceabusive應(yīng)用的影響,但依然不能根本的解決小ShuffleBlockShuffleVLDB2020Magnet可以關(guān)注后續(xù)工程博客文章。MagnetShuffleShuffle文件之后,MapTasks會(huì)將生成的ShuffleBlockShufflebyte組成的數(shù)據(jù),分組之后另外單獨(dú)的線程會(huì)讀取一整ShuffleBlock傳輸?shù)竭h(yuǎn)程的ShuffleService中。ShuffleServiceShuffleBlock按照不ShuffleShuffleDriverShuffleMapStageShuffleService,MapTasksServicesShuffleBlock始終被分配到同一個(gè)遠(yuǎn)程的ShuffleServiceShuffleService端將以Besteffort方式把收到ShuffleBlockShuffle文件當(dāng)中。ShuffleBlock大小和位置之外,SparkDriver也會(huì)收到這些分區(qū)合并的ShuffleReducerTasksSparkDriverShuffleBlockReducerTasksShuffleBlock數(shù)量,從ShuffleBlockShuffleServiceBesteffortShuffleBlock按分區(qū)合ShuffleBlock沒有被合并,ReducerTasksShuffleBlockReducerTasks大部分輸入數(shù)據(jù)都被合并在集群的一個(gè)節(jié)點(diǎn)之上,SparkDriverReducerTasksShuffle性能。按照不同的ShuffleShuffle當(dāng)中的小數(shù)據(jù)隨機(jī)讀取操作轉(zhuǎn)化為大數(shù)據(jù)的Shuffle的性能。MagnetGridbench性能分析工具,對SparkMagnet后的性能進(jìn)行了分析。下圖中使用了較為復(fù)雜的生成機(jī)器學(xué)習(xí)特Shuffle30%的應(yīng)用運(yùn)行時(shí)間的縮短。目前,領(lǐng)英正在將這種全新Shuffle機(jī)制推廣到生產(chǎn)集群中。簡介:簡介:SPARK+AISUMMIT2020中文精華版線上峰會(huì)將會(huì)帶領(lǐng)大家一起顧2020SPARK又開源項(xiàng)目組的軟件工程師吳一介紹了利用Flash閃存優(yōu)化在Cosco基礎(chǔ)上的SparkShuffleFlashforSparkShufflewithCoscoCoscoFacebookSparkShuffleFlashCoscoSparkShuffleSparkShuffle3I/OQueryCoscoCosco(一)CoscoSparkShuffleMapTaskReduceTaskTaskMapTaskMapOutputFiles,PartitionMapTask執(zhí)行完畢之后,ReduceTaskMapOutputFiles中某個(gè)分區(qū)的數(shù)據(jù),將其合并成某個(gè)大的PartitionI/O性能相關(guān)的問題:第一次是在MapTaskshuffledataShuffleDataSpillMapOutputFiles第三次是在ReduceTasksortSpillsmallIOsShuffle的性能變差?;谏鲜鰡栴},CoscoSparkTask生成一個(gè)自MapOutputFiles,CoscoMapTaskPartition寫入到同一個(gè)內(nèi)存緩存中,緩FlushPartitionFlushReduceTaskHDFS系統(tǒng)中的文件即可,MMxR數(shù)量級。因此,也就解決了小I/OFlashShuffleshuffleCosco中緩存的時(shí)間是可以忽略不計(jì)的。1GB100GB的閃存之間讓我們用來部署集群,我們?nèi)绾尉駬衲??這里有一條不精確的經(jīng)驗(yàn):1GB100GB的閃存這兩種第一種是優(yōu)先緩存內(nèi)存,當(dāng)內(nèi)存達(dá)到一定閾值之后再Flushpartitionpartition用內(nèi)存緩存,對于partition用閃存緩存。250GB的內(nèi)存25TBCosco的優(yōu)勢,用更少的硬件資Flush到閃存中,這樣有兩個(gè)好處:ShuffleDatapartitionpartition用內(nèi)存緩存,對于加載partitionPartition的加載速率,當(dāng)速率小于某個(gè)出,在實(shí)際生產(chǎn)中大多數(shù)Partition的加載速度是比較慢的,少部分加載速度比較快,加載速度比較ReduceTask直接從閃存中讀取緩存的數(shù)據(jù),而不是從HDFS中的文件讀取數(shù)據(jù),這樣子提高了數(shù)據(jù)的讀取速率。另外,在引入閃存之后,Shuffle的數(shù)據(jù)塊會(huì)變得更大,在Reduce端合并數(shù)據(jù)塊的次數(shù)會(huì)變少,讓整個(gè)查詢變得更快。DFS上的讀寫也會(huì)更加高效。DiscreteeventSyntheticloadgenerationonatestShadowtestingonatestSpecialcanaryinaproductionDiscreteeventDiscreteeventsimulation,也就是離散時(shí)間模擬的方法,是一種比較通用的評估方法。我們把每個(gè)ShuffleData到達(dá)閃存的行為作為一個(gè)離散事件,記錄其到達(dá)的時(shí)間、此時(shí)閃存中寫入的數(shù)據(jù)總量以FlushDFS文件的數(shù)據(jù)總量。最終我們會(huì)得到如下圖所示統(tǒng)計(jì)表,包含了最終數(shù)據(jù)塊SpecialcanaryinaproductionCosco中一個(gè)Task可以與多個(gè)ShuffleService進(jìn)行通信,所以很難確定是因?yàn)榧尤肓碎W存提升了性能還是因?yàn)槠渌鸄B保持原來的部署模式。之后,我們再對兩個(gè)子集群進(jìn)行評估,SparkTensorFlow的機(jī)器學(xué)習(xí)AIAI技術(shù)的2019杭州云棲大會(huì)機(jī)器學(xué)習(xí)技術(shù)專場,阿里云高級技術(shù)專家吳威和阿里云技術(shù)專家EMRE-LearningTensorFlowonSpark。EMRE-LearningEMRE-LearningAI技術(shù),通過算法基于歷史數(shù)據(jù)來構(gòu)建機(jī)器學(xué)習(xí)模型,從而AI技術(shù)有了突飛猛進(jìn)的發(fā)展。AI平臺。下圖展示了AIAI訓(xùn)練和評估,包含數(shù)據(jù)存儲;右側(cè)是AI兩套集群運(yùn)維復(fù)雜:從圖中可以看出,AI開發(fā)涉及的兩套集群是分離的,需要單獨(dú)維護(hù),運(yùn)維成EMRGPUCPU機(jī)器;數(shù)據(jù)存儲層包括HDFSOSSKafkaFlumeYARN、K8SZookeeperE-learningSpark,這里的SparkjindoSparkEMRSparkAI場景下的版本,除此之外,還有PAITensorFlowonSpark;最后是計(jì)算分析層,提供了數(shù)據(jù)分析、特征工程、AI訓(xùn)練以及Notebook的功能,方便用戶來使用。EMRCPU、MemGPUYARNSparkDataSourceAPI來方便各類數(shù)據(jù)源的讀取,MLlibpipeline廣泛用Spark+深度學(xué)習(xí)框架:Spark和深度學(xué)習(xí)框架的集成支持,包括高效的SparkTensorFlow之間的數(shù)據(jù)傳輸,Spark資源調(diào)度模型支持分布式深度學(xué)習(xí)訓(xùn)練;資源監(jiān)控與報(bào)警:EMRAPM易用性:JupyternotebookPythonTensorFlowonSpark主要包含了下圖中的六個(gè)具體設(shè)計(jì)目標(biāo)。TensorFlowonSparkPySpark應(yīng)用框架級別的封裝??蚣苤袑?shí)現(xiàn)的主要功能包TensorFlow任務(wù),除此之外還需要將特征工PAITensorFlowRuntime進(jìn)行深度學(xué)習(xí)和機(jī)器學(xué)習(xí)的訓(xùn)練。由于Worker任務(wù))Sparkexecutor,PsWorker任務(wù)通PAITensorFlowRuntimeDataLake中,方便后期的模型發(fā)布。ApacheArrowAPITensorFlowRuntime,從而加速整個(gè)流程TensorFlowonSparkTensorFlowCheckpoints機(jī)制,用戶需要ChenpointDataLakeTensorFlow的時(shí)候,會(huì)讀取最近的CheckpointPs和BarrierExecutiontasktask,重新配置所有環(huán)境變量;TFCheckpoint。TensorFlowonSparkcondapythonvirtualenvTensorFlowTensorFlowPSHorovodMPITensorFlowAPITensorFlowEstimatorAPITensorFlowSessionEMR客戶有很多來自于互聯(lián)網(wǎng)公司,廣告和推送的業(yè)務(wù)場景比較常見,下圖是一個(gè)比較典型的廣告據(jù)進(jìn)行ETLTensorFlow框架高效地喂給PAITensorFlowRuntimeDataLake中。API層面,TensorFlowonSpark提供了一個(gè)基類,該基類中包含了三個(gè)方法需要用戶去實(shí)現(xiàn):SparkDataFrame對象;shutdown方法實(shí)現(xiàn)用戶長連接資源的釋放;train方法是用戶之前在TensorFlowpl_submit命令來提交TensorFlowonSpark的任務(wù)。FM的樣例,F(xiàn)M是一個(gè)比較常見的推薦算法,具體場景是給電影評分,根據(jù)客戶對SparkdatasourceAPISparkjoin、ETL處理等;右側(cè)是TensorFlowGithub。最后總結(jié)一下,EMRE-Learning平臺將大數(shù)據(jù)處理、深度學(xué)習(xí)、機(jī)器學(xué)習(xí)、數(shù)據(jù)湖、GPUs功能特性緊密的結(jié)合,提供一站式大數(shù)據(jù)與機(jī)器學(xué)習(xí)平臺;TensorFlowonSpark提供了高效的數(shù)據(jù)交互流程以E-Learning平臺在公有云服務(wù)不同的客戶,成功案例,CPU1000,GPU集群規(guī)模1000。kubernetesapachespark:最kubernetesapachespark的介紹。內(nèi)容包Summit中文精華版峰會(huì)的精彩內(nèi)容整理。一、DataMechanicsdatamechanicsserverless的平臺,即一個(gè)全托管的平臺,pipeline都會(huì)做得非常的快,已經(jīng)非常地穩(wěn)定。K8S集群。這樣的話,對整個(gè)的安全是一二、Sparkon首先,k8ssparkspark2.3版本以后的事情,在此之前有幾種方式。第一種就是YarnYarn的集群里面了。第四種是Kubernetessparkk8s上面。Sparkonk8sYarnk8s的依賴的管理。這塊是區(qū)分點(diǎn)比較大的一個(gè)地方。Yarn提供一個(gè)全sparkpython的版本,全局的包的依賴,缺少環(huán)境隔離。而k8s是完全的環(huán)境隔離,每一個(gè)應(yīng)用可以跑在完全不同的環(huán)境、版本等。YarnHDFS。K8s的包image中,支持包依賴管理,將包上傳到OSS/HDFS,sparkexecutorsk8snode16G-RAM,4-coreECS,executor都申請不到!如下圖所示。nodecores85%,那么我們應(yīng)該這然后這塊是一個(gè)比較重要的特點(diǎn),就是動(dòng)態(tài)資源。動(dòng)態(tài)資源的完整支持目前做不到。比如說,Kill一pod,shufflefile會(huì)丟失,會(huì)帶來重算。ClusterautoscalingdynamicallocationPPT的某一頁,它有一個(gè)實(shí)線1min~2minexecutor申請過程。Spotinstance75%SLAexecutorkillrecover。如driverkillnodeselectoraffinitiesDrivernode,executorspotinstance。S3Acommitters,JindofsJobCommitter,應(yīng)該設(shè)置ShuffleI/Oshuffleboundworkload的關(guān)鍵點(diǎn),spark2.xdockerfilesystem。Dockerfilesystemvolume來代替。shuffle的提升,中間數(shù)據(jù)的存儲與計(jì)算分離。這塊是一個(gè)比較Nodedecommission,支持上傳python依賴文件等等。k8ssparkonk8s(二)動(dòng)態(tài)資源&shuffleShuffleserviceNAStaskfetchTiered(四)EMRSparkEMREMR集群類型為ONJindoSparkJindoFSService/JindoFSSDKOSSJindoJobCommitterOSSACKEMRHDFSOperator增強(qiáng),DependencyRayOnSparkRayApacheSpark的數(shù)據(jù)處理流水線overhead,支持用戶使用Spark處理的數(shù)據(jù)做新興人工智能應(yīng)用的開Intel大數(shù)據(jù)團(tuán)隊(duì)軟件工程師黃凱為您介紹Ray和Intel的開源項(xiàng)目AnalyticsZoo,RayOnSparkRayOnSpark的落地實(shí)踐。一、OverviewofAnalyticsZoo二、Introductionto三、MotivationsforRayOnApacheSpark四、ImplementationdetailsandAPIdesign五、Real-worldusecases一、OverviewofAnalyticsAIonBig2016年BigDLApacheSpark開發(fā)的分布式高性能的深度學(xué)習(xí)框架,首次將深度學(xué)習(xí)引入到BigDL寫的深度學(xué)習(xí)應(yīng)用是一在性能方面BigDLMKLBigDLCPU能有良好的性能。在可擴(kuò)展性方面,BigDLSpark擴(kuò)展到成百上千個(gè)節(jié)點(diǎn)上做對深度學(xué)習(xí)模型做AnalyticsAnalyticsZooPythonAPI對訓(xùn)練好的模型做線上推理。在流水線之上,AnalyticsZooMLworkflow,幫助用戶Cluster實(shí)際工作中,開發(fā)部署一條數(shù)據(jù)分析和AI的流水線通常需要經(jīng)歷三個(gè)步驟:開發(fā)者首先在筆記本上單機(jī)的代碼無縫地部署在生成環(huán)境中,并且簡化和自動(dòng)化搭建整個(gè)pipeline的過程,這也是開發(fā)AnalyticsZooRayOnSpark的初衷和目的。二、IntroductiontoPythonimportrayray.init()Ray服務(wù)。正常情Python函數(shù)是順序執(zhí)行的,但是如果加上@ray.remote(num_cpus,...)class也能加上@ray.remoteRayactorRay去遠(yuǎn)程地啟動(dòng)。在@ray.remote中還可以指定RayCore實(shí)現(xiàn)簡單的并行之外,Rayhigh-levellibrary,加速人工智能workloadRayTune能自動(dòng)去調(diào)參,RLibAPI去執(zhí)行不同強(qiáng)化學(xué)習(xí)任務(wù),RaySGD在PyTorch和TensorFlow原生的分布式模塊之上實(shí)現(xiàn)了一層wrapper來簡化部署分布式訓(xùn)練的過三、MotivationsforRayOnApacheRayPython環(huán)境和依賴,同時(shí)不給AI任務(wù),不可避免地會(huì)帶overhead,還需要額外的資源去維護(hù)不同的系統(tǒng)和工作流。這些挑戰(zhàn)促使了英特爾開RayOnSparkRay開發(fā)的新興人工智能應(yīng)用。四、ImplementationdetailsandAPIRayOnSparkRayOnSparkRaySpark數(shù)據(jù)處理的流水線中。顧名思義,RayOnSparkRaySparkYARN集群為例,同樣的思路也KubernetesApacheMesosconda-packPython環(huán)境,在運(yùn)行時(shí)分發(fā)到各個(gè)節(jié)點(diǎn)上,這樣一來用戶不需要在每個(gè)節(jié)點(diǎn)上提前裝好Python依賴,程序RayOnSpark整體架構(gòu),SparkDriver節(jié)點(diǎn)上起一SparkContext的實(shí)例,SparkContextSparkExecuterSpark的任務(wù)。除了SparkContext之外,RayOnSparkSparkDriverRayContext的實(shí)例,利用現(xiàn)有的SparkContext將Ray在集群里啟動(dòng)起來,Ray的進(jìn)程會(huì)伴隨著在SparkExecuterRayMasterRaylet進(jìn)程。RayContextSparkExecuterRayManagerRay的RayRayYARN集群上同時(shí)SparkRayin-memorySparkRDDDataFrameRay的應(yīng)用中,Spark的數(shù)據(jù)做新興人工智能應(yīng)用的開發(fā)。RayOnSparkimportAnalyticsZooinit_spark_on_yarnSparkContextobjectcondaPythonSparkExecuterRayContextobjectRaySpark的橋梁,在創(chuàng)建的時(shí)候可以定義五、Real-worlduseRayOnSpark的第一個(gè)應(yīng)用是我們在AnalyticsZoo里基于RayTune和RayOnSpark開發(fā)的AutoML模塊。AutoML可以將這些過程自動(dòng)化,簡化搭建時(shí)間序列模型過程。感興趣的同學(xué)可以參見:/intel-analytics/analytics-zoo/tree/master/pyzoo/zoo/automlusecases。AutoMLRayOnSparkpipeline。用戶可以使用PySparkRay并行進(jìn)行數(shù)據(jù)加載和處理,我們對不同深度學(xué)習(xí)框架使用RayOnSpark實(shí)現(xiàn)了RayOnSpark,通過簡單的代碼修改就可以完成大數(shù)據(jù)合作案例:Drive-thruRecommendationSystematBurger麥克風(fēng)對話),RayOnSpark構(gòu)建了一個(gè)完整的推薦系統(tǒng)流水線。漢堡王作為全球最大的快餐品MXNetGPUMXNetSparkGPU集群上,無疑使得他們耗費(fèi)了很多時(shí)間。RayOnSparkSpark的集群上做分布式的訓(xùn)練,這樣一來數(shù)據(jù)layerMXNetYARN集群上部署。MXNetWorker和ServerRay進(jìn)RayOnSpark的解決方案已經(jīng)被漢堡王部署到了他們的生產(chǎn)環(huán)境中,證明了這種方案更加高效、AutoMLAnalyticsZoo二、ScalableAutoMLforTimeSeries三、UseCaseSharing&Learnings四、FutureWorkAnalyticsBigDLSparkSparkTensorFlow、CaffeBigDLSparkjob的形式跑在大數(shù)深度學(xué)習(xí)的平臺,為用戶提供單機(jī)到集群的無縫式體驗(yàn)。AnalyticsZoo集成了多種軟硬件加速庫,為概覽,AnalyticsZoo可以跑在各種環(huán)境當(dāng)中,包括筆記本、K8S集群、HadoopCluster、SparkCluster等等。AnalyticsZoopipeline的支持,這些流水線組件可以使得用戶更方便的將深度學(xué)習(xí)框架使用到工作學(xué)習(xí)當(dāng)中,比如分布式的TensorFlowPyTorch支持、RayOnSpark、SparkDataFrame、MLpipelinesforDL、InferenceModel等等。流水線之上提供了Workflow方面的支持,如自動(dòng)調(diào)參,ClusterServing等。最上層針對不同的用戶場景,提供了場景的算法,使得用戶模型的搭建更加TimeKPI分析可以用于通信網(wǎng)絡(luò)質(zhì)量檢運(yùn)維方法很難管理大規(guī)模的虛擬機(jī)和服務(wù),AIOpsML、AI的技術(shù)在監(jiān)控?cái)?shù)據(jù)中進(jìn)TimeSeries1個(gè)觀測點(diǎn)到第ty1yt,目標(biāo)是yt+1yt+h,h1,取決于具體場景。理論上,從第一個(gè)觀測點(diǎn)開始的所有itk個(gè)樣本點(diǎn)數(shù)值,如下圖中標(biāo)記的紫色部分,從yt-k+1到y(tǒng)t。很多種方法可以用于時(shí)序預(yù)測,如自歸、指數(shù)平滑歸、ARIMA等時(shí)序預(yù)測要預(yù)測數(shù)值,本質(zhì)上是歸問題,但時(shí)序預(yù)測相比普通的歸問題有一些特殊性,除了模train數(shù)據(jù)時(shí)間戳需要按照順序劃分,在交叉驗(yàn)證時(shí)注模型并不是那么容易的,尤其是那些沒有多少經(jīng)驗(yàn)的用戶,這也就說明了為什么AutoML越來越火。AutoML希望將耗時(shí)的機(jī)器學(xué)習(xí)步驟進(jìn)行自動(dòng)化,以此來減輕建模者調(diào)優(yōu)的工作量,降低機(jī)器學(xué)習(xí)門檻。AutoML初級版本基本思路是將模型參數(shù)進(jìn)行排列組合,逐一跑一遍,找最好的一組參數(shù)?,F(xiàn)代AutoML把自動(dòng)尋找最優(yōu)模型的這個(gè)任務(wù)抽象成了一個(gè)新的優(yōu)化問題,也就是:在一個(gè)給定的預(yù)算之前,AutoML將這些步驟切換成了新的優(yōu)化問題,在給定的范圍之內(nèi)尋找最優(yōu)的目標(biāo)指標(biāo),所使用的方法遠(yuǎn)比簡單的網(wǎng)絡(luò)搜索或隨機(jī)搜索更加復(fù)雜。AutoML不僅可以用于網(wǎng)絡(luò)搜索,還可以用于機(jī)器學(xué)AnalyticsZooAutoML得到更優(yōu)的時(shí)序模型。同時(shí)減少預(yù)處理和特RayandRaySpark大數(shù)據(jù)集群之上。三、ScalableAutoMLforTimeTimeSeriesSolutionInAnalyticsAnalyticsZooTimeSeriesSolutionAutoML框架,框架中對流水法模塊,如時(shí)序預(yù)測和異常檢測。用戶可以基于自己的場景構(gòu)造時(shí)序應(yīng)用。TimeSeriesSolution有三AutoMLAnalyticsZooAIpipeline中的功能,使得整個(gè)自動(dòng)化訓(xùn)練過程更加高SoftwareAutoML框架中包含四個(gè)組件,用藍(lán)色方框表示,。FeatureTransformerModel是成,特征選擇等任務(wù),Model可以做模型構(gòu)建及訓(xùn)練,以及多模型選擇。SearchEngine負(fù)責(zé)超參組合和啟動(dòng)實(shí)驗(yàn),并且負(fù)責(zé)所有實(shí)驗(yàn)在分布式集群上的調(diào)度。當(dāng)超參過程結(jié)束之后,實(shí)驗(yàn)結(jié)果進(jìn)行收,pipeline。Pipeline是端到端的處理過程,包含預(yù)處理,特征工程和模型。Pipeline可以被存儲下來后再加載,做推理和增量式訓(xùn)練,所有模塊都可以被擴(kuò)展用于其AutoML框架中有兩個(gè)灰色的框,在計(jì)劃中但AutoML框架中有兩個(gè)灰色的框,目前還沒有實(shí)現(xiàn)。SearchEngine驅(qū)動(dòng)整個(gè)訓(xùn)練過程。,SearchEngine在構(gòu)造的時(shí)候首先接收一個(gè)FeatureTransformerModelSearchpresets。SearchEngineSearchpresets確定Ray的集群上進(jìn)行調(diào)度。每個(gè)實(shí)驗(yàn)都有不同的參數(shù)組合來被收和分析,根據(jù)最好的參數(shù)組合和訓(xùn)練好的模型構(gòu)建pipeline,最終返給用戶。AglimpseofTimeSequencePredictorfitPredictor,啟動(dòng)自動(dòng)樣分布等等。目前已經(jīng)設(shè)置了一些公用的reciperecipe。另一個(gè)參數(shù)是distributed,表明搜索過程是分布式還是單機(jī)的,在單機(jī)模式下可以適當(dāng)減少開銷。FitAPI用于評估,預(yù)測和增量式訓(xùn)練。UseCaseSharing&ProjectZouwu是在前面所介紹的模塊的加持上實(shí)現(xiàn)的新的項(xiàng)目,專門為電信領(lǐng)域的時(shí)序應(yīng)用所打造的,不KPIKPI的預(yù)測在電信應(yīng)用KPIKPI預(yù)測進(jìn)行有計(jì)劃的實(shí)施資源調(diào)節(jié),比如在特定時(shí)間段關(guān)閉一些閑networkslicingKPIProjectZouwuWIDE項(xiàng)KPIKPI。ProjectZouwu為同一個(gè)用例提供了兩種不ForecasterAutoML加持的AutoTSnotebookKPI的預(yù)測情況。用戶發(fā)現(xiàn)他們只需要對模SK下圖展示了與韓國電信所合作的項(xiàng)目,是KPIKPI預(yù)測檢查小2019Sparktalk,感使用了基于AutoML的時(shí)序預(yù)測方案,在很短很多時(shí)間之內(nèi)就訓(xùn)練出除了達(dá)到準(zhǔn)確度要求的模型。analytics-zooAutoMLpipeline會(huì)獲得自動(dòng)生成額外的特征,如是否處于節(jié)假日,忙時(shí)還是analytics-zooKPI需要AI技術(shù)和產(chǎn)品,從硬件到軟件,以及集成方案。ApacheSpark3.0Prometheus監(jiān)控的EMR技術(shù)專家周康為大家?guī)鞟pacheSpark3.0對Prometheus監(jiān)控的原生支持的spark3.0Prometheusspark3.0是如何實(shí)現(xiàn)對Prometheus更好的本地化的支持。一、用Prometheus監(jiān)控ApacheApacheSparkETLspark程序WebUI。第二塊主要是日些提醒的措施。Metricsspark里邊,同樣spark3.0以前,我們一般怎么去支持Prometheusjavaagent的方式。CustomsinkPushgatewayserverspark3.0中,就提出了新的目標(biāo)。主要有兩個(gè)需要關(guān)注的設(shè)計(jì)點(diǎn)。一個(gè)就是只使用到新的endpointpipeline耦合,不引入對其他一些組件的依賴。另外一塊就是盡量重新使用二、本地化支持PrometheusSupportPrometheusmonitoringspark3.0PrometheusSparkmetricsDropWizardspark3.0中也做了一次升級,帶來jdk11,但是也存在小的負(fù)面點(diǎn),就是它的數(shù)據(jù)格式有所變化,如下圖所示。ExecutorMetricsSourceexecutormemorymetrics做了增為了更好的做一些本地化支持的工作,主要加了兩個(gè)組建。一個(gè)是PrometheusServlet,它會(huì)生成PrometheusResourceexecutormemorymetricsendpoint。3.0spark_infometricdriverserviceannotationPrometheusservicediscovery的特性,從而更方便的進(jìn)行一metrics的采集和監(jiān)控。三、在K8s(MonitoringinK8sPrometheusservicediscovery這一個(gè)特性。如下圖所示,有四個(gè)配置。SparkPispark-submit提交一個(gè)作業(yè)到K8s。Prometheusdriver端采集到監(jiān)控信息,就可以及時(shí)的做一些報(bào)警和預(yù)處理。第二個(gè)場景就是動(dòng)態(tài)調(diào)度。Spark3.0K8s上動(dòng)態(tài)調(diào)度。主要就是把一些配置打開,如下圖中K8s環(huán)境的動(dòng)態(tài)調(diào)度。spark3.0中,添加了spark.sql.streaming.metricsEnabled6種metrics,如下圖所示。對一個(gè)流作業(yè),如果要使用PrometheusnameSpace等提前做好設(shè)置,避免后續(xù)出現(xiàn)數(shù)據(jù)的6metrics對于流作業(yè)都是比較關(guān)鍵的,都應(yīng)該去監(jiān)控并且做一些報(bào)警處理。這里重點(diǎn)States-rowsTotalOOM。Prometheus3.0federation的模式。如下圖所示,左namespace1namespace2下面對流作業(yè)的一個(gè)監(jiān)控。他們都可以同時(shí)發(fā)送到一個(gè)Cluster-wisePrometheusspark_info等等這些信息做一些數(shù)EMRsparkonk8sPrometheus監(jiān)控進(jìn)行了原生化的支持。目前我們javaagent,后續(xù)也會(huì)計(jì)劃去引進(jìn)更多的原生支持。 阿里云開源大數(shù)據(jù)平臺實(shí)踐阿里云開源大數(shù)據(jù)平臺實(shí)踐 PAGE82 阿里云開源大數(shù)據(jù)平E-MapReduce(EMR)是構(gòu)建在阿里云云服務(wù)器ECS上的開源Hadoop、Spark、HBase、Hive、FlinkPaaS產(chǎn)品。提供用戶在云上使用開源技術(shù)建設(shè)數(shù)據(jù)倉庫、離線2019杭州云棲大EMR如何助力云上開源Spark環(huán)境搭建起來,而且這個(gè)版本很快上線并且發(fā)布到E-MapReduce上線之后經(jīng)過四年的時(shí)間發(fā)展到現(xiàn)在,E-MapReduce4.0即將發(fā)布版本,并且在Hadoop3.0以及其他新功能。E-MapReduce將會(huì)為開源生態(tài)提供基礎(chǔ)平臺,在這個(gè)平臺上能夠讓大家選擇各E-MapReduce也希望能夠E-MapReduceE-MapReduce能夠更好地和云原生技術(shù)進(jìn)行結(jié)合。E-MapReduceAWSEMRLike的產(chǎn)品,運(yùn)行一年之AWS的純動(dòng)態(tài)方式并不適合國內(nèi)的場景,因此實(shí)現(xiàn)了第一次調(diào)整,更加重視常駐集群,并且Web控制臺能力,并且支持了集群的高可用和高安全,也在外圍Impala、Kafka、Druid等各個(gè)場景下的軟件,進(jìn)而可以更好地支持各個(gè)業(yè)務(wù)場景。除此之外,HDFSKafka,阿里巴巴則提OSS、SLS、RDSHive、Spark、Flink、Rresto、DataWorks、DataVQuickBI進(jìn)行融合。目前,云上大數(shù)據(jù)方案可以認(rèn)為是半托管的服HadoopHDFS、AlibabaHDFSOSS。HadoopHDFS有三種存儲方式,EBS云盤存儲數(shù)據(jù)可靠,但是后臺有多個(gè)數(shù)據(jù)副本,因此成本較高,同時(shí)通過網(wǎng)絡(luò)AlibabaHDFS,這種方式數(shù)據(jù)可靠,成本中等,并且數(shù)據(jù)全部通過網(wǎng)絡(luò)傳輸,沒有本地計(jì)算。OSS標(biāo)準(zhǔn)存儲經(jīng)過阿里巴巴的改造和優(yōu)化之后可以直接在Hadoop中進(jìn)NativeOSS,NativeOSS存儲數(shù)據(jù)可靠,成本較低,并且通用性比較好,但是性NativeOSSJindoFS,JindoFS做到了數(shù)據(jù)可靠,成MasterTask,Task節(jié)點(diǎn)只進(jìn)行計(jì)算但是不會(huì)進(jìn)行數(shù)據(jù)存儲,因此在云上執(zhí)行計(jì)算任務(wù)時(shí),TaskStopInstanceTask節(jié)點(diǎn),當(dāng)高峰期過去之后,就可以釋放Task節(jié)Hadoop集群,其底層全OSSOSSHive、SparkPresto等,而且這些集群全部都是靈活可銷毀的。右側(cè)同樣建立HadoopGateway以及ClientOSSOSSHDFS,借助這種2015SSD等高效存ECSD12016年,E-MapReduceOSS的結(jié)合,當(dāng)時(shí)因?yàn)閹捪拗?,因此使用的客戶較少。到如今,針對之前的發(fā)展和合作經(jīng)驗(yàn),E-MapReduceJindoFS、AlibabaHDFS等進(jìn)行存儲。IaaSE-MapReduce,IaaS層也經(jīng)歷了多次升級。第一代是D1I1D2OSS上面,所有的計(jì)算都放在動(dòng)態(tài)集群上面進(jìn)行,并且可以隨時(shí)進(jìn)行計(jì)算伸縮,JindoFS為客E-MapReduce希望基于平臺實(shí)現(xiàn)更多的方案,希望能夠更好地賦能客戶的業(yè)務(wù)場景。比如在SparkStreamingSQL中,實(shí)現(xiàn)了將業(yè)務(wù)數(shù)據(jù)庫的數(shù)據(jù)實(shí)時(shí)同步到kudo中,可以實(shí)現(xiàn)OLAP分析的能力。未來,EMRK8S的融合,希望能夠幫助客戶更好地節(jié)約成本,讓用戶在阿里云內(nèi)部可以K8SHadoop節(jié)點(diǎn)中作為計(jì)算E-MapReduce等進(jìn)行動(dòng)態(tài)賦能和線下集群結(jié)合起來使用EMRSpark-SQL簡介:這次的優(yōu)化里面,還有一個(gè)很好玩的優(yōu)化,就是我們引入的簡介:這次的優(yōu)化里面,還有一個(gè)很好玩的優(yōu)化,就是我們引入的NativeRuntime,如果說上Case的殺手锏,NativeRuntime就是一個(gè)廣譜大殺器,根據(jù)我們后期統(tǒng)計(jì),引入NativeRuntime,可以普適性的提高SQLQuery15~20%的E2E耗時(shí),這個(gè)在TPCDSPerf里面也是一個(gè)很大的性能提升點(diǎn)。E-MapReduceTPCDS-Perf榜單中提交了最新成績,相比第二名(2019年提交的記錄)2倍+TPCDSEMapueEMR%E-MapReduceTPCDSPerfSPARK引擎的技術(shù)深度以及技術(shù)實(shí)力,接下來會(huì)有一個(gè)系列的文章,去介紹我們2020年度打榜過程的一些優(yōu)化點(diǎn)還有思sparkspark應(yīng)用開發(fā)者可以關(guān)注我們的系列文章,也歡迎來和我們交流,最關(guān)鍵的是,歡迎多投簡歷,加入阿里云E-MapReduce團(tuán)隊(duì),我們求賢若渴?。。PCDSPerfEMR10TB規(guī)??偣蔡峤涣巳纬煽?。PerfTPCDS關(guān)注的指標(biāo)有Flag,我們要在物理硬件保持不變的條件下,純靠軟件優(yōu)化提升2倍+,這樣子性能指標(biāo)和性價(jià)比SparkSparkV2.4.3TPCDS99QueryLoad3PT6PS.其中社區(qū)SparkV2.4.3版本中Query14以及Query95因?yàn)镺OMSpark200SQueryPS.QueryQuery783X性能提升,Query57100InMemoryTableCacheCTE簡單來說,就是盡量更合理的利用InMemoryTableCache去減少不必要的重復(fù)計(jì)算,比如說Query23A/BCTE優(yōu)化的模式匹配,識別出需要重復(fù)計(jì)算且比較耗時(shí)的操作,并利用InMemoryTable緩存,整體減少E2E時(shí)間FilterFilterDynamicPartition3.02Query64BloomFilterBloomFilterbeforeSMJBloomFilter,JoinSpillDiskPK/FKPK/FKConstraintJoinProjectJoin其GroupByGroupByKeysGroupByKeysGroupBy結(jié)果已經(jīng)沒有影響了,因?yàn)橹麈I列已經(jīng)隱含了Unique的信息GroupByGroupByPushDownFastFastTableAnalyzeStatDecimalLongIntTPCDS99QueryDecimal化都是一些特殊Case的殺手锏,NativeRuntime就是一個(gè)廣譜大殺器,根據(jù)我們后期統(tǒng)計(jì),引入NativeRuntimeSQLQuery15~20%E2ETPCDSPerf里面也是一NativeWholeStageCodeGenerationJavaWeldIR來真實(shí)運(yùn)行。Weld詳細(xì)參考/。在整個(gè)項(xiàng)目里,WeldIR的替換其實(shí)是非常小的WeldIR能夠運(yùn)行起來,我們還需要做以下的工作ExpressionWeldIRCodeGenTPCDS范圍內(nèi)全支持OperatorsWeldIRCodeGenSortMergeJoinC++WeldIR代替(OffHeapUnsafeRowC&WeldBatch(JavaNativeRuntimeJNIWeldRuntime明顯不能這么玩其他高性能NativeSortMergeJoin、PartitionBy、CSVParsingWeldIR提供的接口無法直接實(shí)現(xiàn),我們通過C++來實(shí)現(xiàn)這些算子的Native執(zhí)行Spark-SQL有興趣的同學(xué)們可以多多關(guān)注,多多捧場。同時(shí),EMRHC,請有興趣者聯(lián)系林學(xué)維(峰七也可郵箱xuewei.linxuewei@!!!EMRSpark-SQLRuntimeFilterEMRSpark-SQLRuntimeFilter2019SparkSQLCatalystOptimizer10TB99query35%EMR團(tuán)隊(duì)高級開發(fā)工程師,大數(shù)據(jù)領(lǐng)域技術(shù)愛好Spark、HiveEMR產(chǎn)品中開源計(jì)算引擎的優(yōu)化TPC-DS717storechannel為例,事實(shí)表和維度表的關(guān)聯(lián)關(guān)系如下所示:分析TPC-DS全部99個(gè)查詢語句不難發(fā)現(xiàn),絕大部分語句的過濾條件都不是直接作用于事實(shí)表,join來間接完成。因此,優(yōu)化器很難直接利用事實(shí)表索引2019SparkSQLCatalystOptimizerRuntimeFilter10TB99query35%RuntimeFilter動(dòng)態(tài)分區(qū)裁剪:事實(shí)表以日期列(date_sk)date_dimjoin時(shí),optimizerdate_dimdate_sk取值,并在掃描事實(shí)表前過濾掉非分區(qū)列動(dòng)態(tài)過濾:當(dāng)事實(shí)表與維度表的join列為非分區(qū)列時(shí),optimizer動(dòng)態(tài)構(gòu)建和收集維joinMin-MaxRangeBloomFilter,并在掃描事實(shí)表時(shí)下推至存儲層,利用存儲層索引(如Parquet、ORCFile的zonemap索引)來減少掃描數(shù)據(jù)量。RuntimeFilterquery進(jìn)行了細(xì)致的性queryjoinRuntimeFilter對各個(gè)query的性能提升效果后,我們發(fā)現(xiàn):date_dimjoinRuntimeFilterZoneMapload階段沒有針對非分區(qū)列做任何聚集操作(Clustering),每個(gè)zone的取值一般也稀疏分散在各個(gè)列的值域中。loadjoinZ-Order排序。loadTPC-DS評測總分反而下降。同時(shí),由于joinRuntimeFilter當(dāng)事實(shí)表(lineorder)multi-join時(shí),可將所有維度表的過濾信息下推joinRuntimeFiltermulti-jointree而不是局部binary-jointree。其優(yōu)化效果是即使joinordering為badcase,無用的事實(shí)表數(shù)據(jù)也能夠被盡早過濾掉,即讓查詢執(zhí)行更加robust。SparkCBOJoin-ReorderjoinorderingLIPfiltersSparkBroadcastHashJoin基于過濾條件可以傳遞至復(fù)雜multi-jointree的任意節(jié)點(diǎn)這一思想去發(fā)散思考,我們發(fā)現(xiàn),當(dāng)multi-jointreescan,從而減少后續(xù)事實(shí)表SortMergeJoin等耗時(shí)算子執(zhí)行時(shí)所需處理的數(shù)據(jù)量。以一個(gè)簡化版的query64為例:withcs_uiaswithcs_uias,sum(cs_ext_list_price)assalefromcatalog_saleswherecs_item_sk=andcs_order_number=cr_order_numbergroupbycs_item_sk)selecti_product_name,i_item_skfromstore_saleswheress_item_sk=i_item_skandss_item_sk=sr_item_skandss_ticket_number=sr_ticket_numberandss_item_sk=cs_ui.cs_item_skandi_colorin('almond','indian','sienna','blue','floral','rosy')andi_current_pricebetween19and19+10andi_current_pricebetween19+1and19+15groupbyi_product_nameplantree考慮未實(shí)現(xiàn)維度表過濾廣播的執(zhí)行流程,store_sales數(shù)據(jù)經(jīng)過RuntimeFilter和BroadcastHashJoinjoinSortMergeJoin算子。但如果將LIPfilter下推至4張事實(shí)表的scan算子(無需下推至存儲層),不僅減少了join數(shù)據(jù)量,也減少了catalog_sales和catalog_returns表join后的group-byaggregation數(shù)據(jù)量。LIP在optimizer層,我們在原版RuntimeFilter的SyntheticJoinPredicate規(guī)則后插入PropagateDynamicValueFilter規(guī)則,將合成的動(dòng)態(tài)謂詞廣播至所有合法的join子樹中;同時(shí)結(jié)合原有的謂詞下推邏輯,保證動(dòng)態(tài)謂詞最終傳播到所有相關(guān)的scan算子上。在算子層,LIPfilters的底層實(shí)現(xiàn)可以是HashMap或BloomFilter,針對TPC-DS的數(shù)據(jù)特性,我們選擇BitMap作為廣播過BitMap本身是精確的(ExactFilter),可以結(jié)合主外鍵約束信息進(jìn)一步做semi-join消除優(yōu)化?;谥魍怄I約束的優(yōu)化規(guī)則將在系列后續(xù)文章做詳細(xì)介紹。應(yīng)用該優(yōu)化后,query64177秒降低至632.8Join使用BloomFilter來優(yōu)化大表join是一種常見的查詢優(yōu)化技術(shù),比如在論文《BuildingaHybridWarehouseEfficientJoinsbetweenDataStoredinHDFSandEnterpriseWarehouse》中提出對join兩表BloomFilterzig-zagjoinjoinTPC-DS測試query93為例,store_salesstore_returnsjoinstore_sales原始數(shù)BloomFilter的構(gòu)建和應(yīng)用都存在較高的計(jì)算開銷,對于selectivityjoin,盲目使用這一優(yōu)化可能反而導(dǎo)致性能退?;陟o態(tài)stats的joinselectivity估算往往誤差,Spark現(xiàn)有的CBO優(yōu)化BloomFilterjoinSparkAdaptiveExecution(AE)BloomFilterjoin優(yōu)化規(guī)則。AEstagestagestats信息重新調(diào)整后續(xù)的物理執(zhí)行計(jì)劃。目前主reducestageskewshuffleSortMergeJoinAEstatsjoinsizeBloomFilter(buildside),如果是,則buildside和streamside的scanstage會(huì)依次串行提交執(zhí)行;否則這兩個(gè)stage將并行執(zhí)buildsidescanstage執(zhí)行完成后,AEsizejoinhistogram進(jìn)行代價(jià)估算,并決定最終走BroadcastHashJoin、BloomFilter-SortMergeJoinJoin還是原本的BloomFilter-SortMergeJoinJoin 阿里云開源大數(shù)據(jù)平臺實(shí)踐阿里云開源大數(shù)據(jù)平臺實(shí)踐 PAGE102 sideshuffleBloomFilterstreamsidescanstageBloomFilterBloomFilterBuildBloomFilerNative-InBloomFilterRDDaggregate來合并各個(gè)數(shù)據(jù)分片的BloomFiler會(huì)導(dǎo)致driver成為數(shù)據(jù)傳輸和bitmap合并計(jì)算的性能瓶頸;使用RDDtreeAggregate實(shí)現(xiàn)并行分層合并顯著降低了整體的構(gòu)建延遲。在過濾階段,Native-InBloomFilterscan算子中合SparkSIMDnative函數(shù),CPUBlockedBloomFilter算法實(shí)現(xiàn),該算法通過犧牲少量的bitmap存儲空間來換取訪存時(shí)更低的CPUcachemiss率。應(yīng)用該優(yōu)化后,query93225秒降低至504.5EMRSpark-SQLNativeCodegenFramework作者:周克勇,花名一錘,阿里巴巴計(jì)算平臺事業(yè)部EMR團(tuán)隊(duì)技術(shù)專家,大數(shù)據(jù)領(lǐng)域技術(shù)愛好者,SparkEMR產(chǎn)品中開源計(jì)算引擎的優(yōu)化工作。SparkSQL多年來的性能優(yōu)化集中在Optimizer和Runtime兩個(gè)領(lǐng)域。前者的目的是為了獲得最優(yōu)的執(zhí)世界(Impala,MaxCompute)Batch-Based(Spark,Hive)MPP-Based(Impala,Presto),甚至無HTAP領(lǐng)域(HyPerADB)Optimizer層面考慮的都是非常類似的問題:Stats收集,CostJoinReorderCTE,特定場景下采用不同的空間搜索策略(如遺傳算法vs.動(dòng)態(tài)規(guī)劃),但方法大體是相同的。長期以來,RuntimeMapReduce剛出來時(shí)網(wǎng)絡(luò)帶寬Runtime領(lǐng)域重要的優(yōu)化方向。CPU性能的兩個(gè)主流技術(shù)是以MonetDB/X100[2](VectorWise[3])為代表的向量化(VectorizedProcessing)HyPer[5][6]為代表的代碼生成(CodeGen)技術(shù)(Spark跟進(jìn)的是CodeGen[9])SQLRecord,向:算子融合。簡單來說,CodeGen框架通過打破算子之間的界限把火山模型“壓平”了,把原來迭代器for循環(huán),同時(shí)生成語義相同的代碼(Java/C++/LLVM),緊接著用對應(yīng)的工具鏈編譯生class(Java)so(C++,LLVM)去執(zhí)行,從而把解釋執(zhí)行轉(zhuǎn)變成了編譯執(zhí)行。 阿里云開源大數(shù)據(jù)平臺實(shí)踐阿里云開源大數(shù)據(jù)平臺實(shí)踐 PAGE104 束算子都基本處于寄存器中,不會(huì)物化到內(nèi)存。CodeGenSIMD等優(yōu)化。HyPer[6],Pelonton[7]等。ExpressionCodegen,到后來參考HyPerWholeStageCodegen,再經(jīng)過多年的打磨,SparkSQLCodegen技術(shù)已趨成熟,性能也獲得了兩次數(shù)量級的躍升。然而,也許是出于可維護(hù)性或開發(fā)者接受度的考慮,SparkSQLCodegenJava代碼,并沒有嘗試NativeCode(C/C++,LLVM)JavaNativeCode還是有一定的OverheadSIMD(Javafeature),Prefetch等語義,更重要的是,NativeCode直接操作裸金屬,易于極致壓榨硬件性能,對一些加速器(GPU)或新硬件(AEP)的支持也更方便。基于以上動(dòng)機(jī),EMRSparkSQLNativeCodegenSparkSQL換了引擎,新引20%EMRNativeCodegen框架。如何集成到針對生成什么代碼,結(jié)合調(diào)研的結(jié)果以及開發(fā)同學(xué)的技術(shù)棧,有三個(gè)候選項(xiàng):C/CLLVM,WeldIR。HyPer的測評數(shù)據(jù),C++LLVM高了一個(gè)數(shù)量級。querycase編譯時(shí)間比運(yùn)行時(shí)間還要長?;谶@個(gè)考慮,我們排C/C++LLVMNativeCodeGenHyPer,Impala,MaxCompute,ADBLLVM作為目標(biāo)代碼。LLVM對LLVMHyPer僅把算子核心邏輯用LLVM生成,其他通用功能(如spill,復(fù)雜數(shù)據(jù)結(jié)構(gòu)管理等)C++編寫并提前編譯好。即使LLVM+C++節(jié)省了不少工作量,對我們來說依然不可接受,因此我們把目光轉(zhuǎn)向了第三個(gè)選項(xiàng):WeldIR(IntermediateRepresentation)。WeldWeldShoumikPalkar是MateiZahariaSpark的作者。Weld最初想解決的問題是不同libpandasnumpypandasnumpy讀取內(nèi)存進(jìn)行計(jì)算,對于極度優(yōu)lib來說,內(nèi)存的寫入和讀取的時(shí)間可能會(huì)遠(yuǎn)超計(jì)算本身。針對這個(gè)問題,WeldCommonRuntimeIR,再加上惰性求值的特性,只需(簡單)libWeld的規(guī)范,libWeldRuntime,WeldRuntimelibPipeline,從而省去此外,WeldC代碼,可以方便調(diào)用三方庫。WeldIRRuntime。WeldIR面向數(shù)據(jù)分析進(jìn)行設(shè)計(jì),因此語義上跟SparkSQLUnsafeRowBatchstructvecdict,能較好的表達(dá)SQL里重度Hash結(jié)構(gòu)。操作層面,WeldIRmap,filteriterator等,配合+Project語義,具體含義是若第二列大于10,則返第一列IRgroupBygroupByWeldWeldAPI提供了兩個(gè)核型接口:weld_module_compile,WeldIR編譯成可執(zhí)行模塊(module)weld_module_run,執(zhí)行編譯好的模塊。WeldIRLLVM,C++方便很多)WeldIR作為目標(biāo)代碼。SparkSQL原有的CodeGen/article/727277。我們參考了SparkWholeStageCodegen。Producer-ConsumerWholeStageCodeGenExec負(fù)責(zé)Weld不支持的算子Java,WeldIRvecWeldIRJavaCodegenStageRecord的模式,取而代之FilterProject,F(xiàn)ilter算子生成的IR如下,過濾掉第二列<=10的數(shù)據(jù):Project算子生成的IR如下,返第一列數(shù)據(jù)WeldLoop-Fusion優(yōu)化可以極大簡化CodeGenLoop-Fusion過SQL(3層以上)甚至無法在有限時(shí)間給出結(jié)果。當(dāng)時(shí)面臨兩個(gè)選擇:修改WeldCodeGenLoop-Fusion之后的代碼,我們選擇了后者。重構(gòu)后生成1,2,11行由Scan算子生成,3,4,5,6,8,9,10Filter算子生成,7Project算子|v:vec[{i32,i32}]||v:vec[{i32,i32}]|n.$1>這個(gè)優(yōu)化使得編譯時(shí)間重亞秒級別FallbackWeldWeldSortMergeJoin,Rollup等。即使是NativeCodeGenJavaCodeGen接管。Fallback的粒度:是算子級別還是Stage級別?Pipeline的斷裂。如上文所述,CodeGenStagePipeline化,打破算RecordFallbackStageNativeRuntime,另一部分走JavaRuntime,則兩者連接處無可避免存在中間數(shù)據(jù)物化,這個(gè)開銷通常會(huì)大于NativeRuntime帶來的收益。基于以上考慮,我們選擇了StageFallbackCodeGen階段一旦遇到不支持的算子,則整個(gè)80%。SparkFallbackSpark集成了。Spark的WholeStageCodegenExecTableScan,ShuffleRead,還是Scan)RowIterator,如下圖所示:StageFallbackJavaRuntimeNativeRuntimeRowBatch/RowIteratorWeld認(rèn)識ShuffleReader/BroadCastRowIteratorRowBatch,只不過Spark反序列化后轉(zhuǎn)換成RowIterator后再喂給CodeGenModule,RowBatch包裝成RowIteratorNativeRuntimeRowBatch。解決辦法呼之欲出了:把RowBatchWeldvecRowBatch喂WeldRowBatch也是滿足某種規(guī)范的字節(jié)流而已,Spark也提供了OffHeap模式把內(nèi)存直接存堆外(僅針對ScanStageShuffle數(shù)據(jù)和Broadcast數(shù)據(jù)需要讀到堆外),Weld可以直接訪問。SparkUnsafeRow的內(nèi)存布局大致如下:針對確定的schema,nullbitmapfixed-lengthdata的結(jié)構(gòu)是固定的,可以映射成struct,而針對var-lengthdatacopy到連續(xù)的內(nèi)存地址中。如此一來,針對無變長數(shù)據(jù)的Filter+Project的例子,一條RecordintUnsafeRow的內(nèi)存布局如下(為了對齊,Spark8字節(jié))。顯而易見,這個(gè)結(jié)構(gòu)可以很方便映射成WeldRowBatchWeldInputWeldOutputRowBatch本質(zhì)是以上過程的逆向操作,不再贅述。解決了Java和NativeStage的轉(zhuǎn)換并喂給ShuffleWriter。如下圖所示:本文介紹了EMR團(tuán)隊(duì)在SparkNativeCodegenNativeWeldDict大家感興趣的任何內(nèi)容歡迎溝通MakingSenseofPerformanceinDataAnalyticsFrameworks.KayMonetDB/X100:Hyper-PipeliningQueryExecution.PeterVectorwise:aVectorizedAnal

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論