




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第5章基本統(tǒng)計第5章基本統(tǒng)計15.1相關性計算兩個數據系列之間的相關性是統(tǒng)計學中的常見操作。用spark.ml可靈活地計算多個系列兩兩之間的相關性。目前Spark支持的相關方法是Pearson方法和Spearman方法。Correlation使用指定的方法計算輸入數據集的相關矩陣。輸出將是一個DataFrame,它包含向量列的相關矩陣?!纠?-1】Correlation的PythonAPI代碼。from__future__importprint_function#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportCorrelation#$exampleoff$5.1相關性計算兩個數據系列之間的相關性是統(tǒng)計學中的常見25.1相關性frompyspark.sqlimportSparkSessionif__name__=="__main__":spark=SparkSession\.builder\.appName("CorrelationExample")\.getOrCreate()#$exampleon$data=[(Vectors.sparse(4,[(0,1.0),(3,-2.0)]),),(Vectors.dense([4.0,5.0,0.0,3.0]),),(Vectors.dense([6.0,7.0,0.0,8.0]),),5.1相關性frompyspark.sqlimpor35.1相關性(Vectors.sparse(4,[(0,9.0),(3,1.0)]),)]df=spark.createDataFrame(data,["features"])r1=Correlation.corr(df,"features").head()print("Pearsoncorrelationmatrix:\n"+str(r1[0]))r2=Correlation.corr(df,"features","spearman").head()print("Spearmancorrelationmatrix:\n"+str(r2[0]))#$exampleoff$spark.stop()5.1相關性(Vectors.sparse(4,[(045.1相關性5.1相關性55.2假設檢驗假設檢驗是統(tǒng)計學中一種強有力的工具,用于確定結果是否具有統(tǒng)計顯著性,無論該結果是否偶然發(fā)生。spark.ml目前支持Pearson的卡方(χ2)獨立性測試。ChiSquareTest針對標簽的每個特征進行Pearson獨立測試。對于每個特征,(特征,標簽)對被轉換為列聯(lián)矩陣,對其計算卡方統(tǒng)計量。所有標簽和特征值必須是可分類的?!纠?-2】Pearson卡方獨立性測試的PythonAPI代碼。5.2假設檢驗假設檢驗是統(tǒng)計學中一種強有力的工具,用于65.2假設檢驗from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportChiSquareTest#$exampleoff$if__name__=="__main__":spark=SparkSession\5.2假設檢驗from__future__impo75.2假設檢驗builder\.appName("ChiSquareTestExample")\.getOrCreate()#$exampleon$data=[(0.0,Vectors.dense(0.5,10.0)),(0.0,Vectors.dense(1.5,20.0)),(1.0,Vectors.dense(1.5,30.0)),(0.0,Vectors.dense(3.5,30.0)),(0.0,Vectors.dense(3.5,40.0)),(1.0,Vectors.dense(3.5,40.0))]5.2假設檢驗builder\85.2假設檢驗df=spark.createDataFrame(data,["label","features"])r=ChiSquareTest.test(df,"features","label").head()print("pValues:"+str(r.pValues))print("degreesOfFreedom:"+str(r.degreesOfFreedom))print("statistics:"+str(r.statistics))#$exampleoff$spark.stop()5.2假設檢驗df=spark.createDa95.2假設檢驗將上述代碼保存成Pearsonx.py,然后用命令spark-submitPearsonx.py運行,結果如圖5-2所示。
5.2假設檢驗將上述代碼保存成Pearsonx.py105.3累積器通過Summarizer為Dataframe提供向量列摘要統(tǒng)計??捎弥笜耸橇械?大值、 小值、平均值、方差、非零數及總計數?!纠?-3】Summarizer的PythonAPI代碼。from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.statimportSummarizerfrompyspark.sqlimportRowfrompyspark.ml.linalgimportVectors5.3累積器通過Summarizer為Datafr115.3累積器#$exampleoff$if__name__=="__main__":spark=SparkSession\.builder\.appName("SummarizerExample")\.getOrCreate()sc=spark.sparkContext#$exampleon$5.3累積器#$exampleoff$125.3累積器df=sc.parallelize([Row(weight=1.0,features=Vectors.dense(1.0,1.0,1.0)),Row(weight=0.0,features=Vectors.dense(1.0,2.0,3.0))]).toDF()#createsummarizerformultiplemetrics"mean"and"count"summarizer=Summarizer.metrics("mean","count")#computestatisticsformultiplemetricswithweightdf.select(summarizer.summary(df.features,df.weight)).show(truncate=False)5.3累積器df=sc.parallelize([R135.3累積器#computestatisticsformultiplemetricswithoutweightdf.select(summarizer.summary(df.features)).show(truncate=False)#computestatisticsforsinglemetric"mean"withweightdf.select(Summarizer.mean(df.features,df.weight)).show(truncate=False)#computestatisticsforsinglemetric"mean"withoutweightdf.select(Summarizer.mean(df.features)).show(truncate=False)#$exampleoff$spark.stop()5.3累積器#computestatisticsf145.3累積器將上述代碼保存成Summarizerx.py,然后用命令spark-submitSummarizerx.py運行,結果如圖5-3所示。5.3累積器將上述代碼保存成Summarizerx.p155.4摘要統(tǒng)計【例5-4】通過函數colStats為RDD[Vector]提供列摘要統(tǒng)計信息,colStats()返回一個MultivariateStatisticalSummary實例。importnumpyasnpfrompyspark.mllib.statimportStatisticsmat=sc.parallelize([np.array([1.0,10.0,100.0]),np.array([2.0,20.0,200.0]),np.array([3.0,30.0,300.0])])#anRDDofVectors#Computecolumnsummarystatistics.summary=Statistics.colStats(mat)5.4摘要統(tǒng)計【例5-4】通過函數colStats165.4摘要統(tǒng)計print(summary.mean())#adensevectorcontainingthemeanvalueforeachcolumnprint(summary.variance())#column-wisevarianceprint(summary.numNonzeros())#numberofnonzerosineachcolumn將上述代碼保存成RDDVectorx.py,然后用命令spark-submitRDDVectorx.py運行,結果如圖5-4所示。5.4摘要統(tǒng)計print(summary.mean()175.4摘要統(tǒng)計5.4摘要統(tǒng)計185.4摘要統(tǒng)計與駐留在spark.mllib中的其他統(tǒng)計函數不同,可以對RDD的鍵值對執(zhí)行分層抽樣方法sampleByKey和sampleByKeyExact。對于分層抽樣,可以將鍵視為標簽,將值視為特定屬性。例如,關鍵字可以是男人、女人或文檔ID,并且相應的值可以是人的年齡列表或文檔中的單詞列表。sampleByKey方法將翻轉硬幣以決定是否對樣本進行采樣,因此需要對數據進行一次傳遞,并提供預期的樣本大小。sampleByKeyExact比sampleByKey中使用的每層簡單隨機抽樣需要更多的資源,但是會提供99.99%置信度的精確抽樣大小。5.4摘要統(tǒng)計與駐留在spark.mllib中的其195.4摘要統(tǒng)計【例5-5】分層抽樣。sampleByKey()允許用戶近似采樣。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])
5.4摘要統(tǒng)計【例5-5】分層抽樣。sampleBy205.4摘要統(tǒng)計#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.4摘要統(tǒng)計#specifytheexact215.4摘要統(tǒng)計將上述代碼保存成SparkContext.py,然后用命令spark-submitSparkContext.py運行,結果如圖5-5所示。5.4摘要統(tǒng)計將上述代碼保存成SparkContex225.5分層抽樣與駐留在spark.mllib中的其他統(tǒng)計函數不同,可以對RDD的鍵值對執(zhí)行分層抽樣方法sampleByKey和sampleByKeyExact。對于分層抽樣,可以將鍵視為標簽,將值視為特定屬性。例如,關鍵字可以是男人、女人或文檔ID,并且相應的值可以是人的年齡列表或文檔中的單詞列表。sampleByKey方法將翻轉硬幣以決定是否對樣本進行采樣,因此需要對數據進行一次傳遞,并提供預期的樣本大小。sampleByKeyExact比sampleByKey中使用的每層簡單隨機抽樣需要更多的資源,但是會提供99.99%置信度的精確抽樣大小。5.5分層抽樣與駐留在spark.mllib中的其235.5分層抽樣【例5-5】分層抽樣。sampleByKey()允許用戶近似采樣。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])
5.5分層抽樣【例5-5】分層抽樣。sampleBy245.5分層抽樣#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.5分層抽樣#specifytheexact255.5分層抽樣將上述代碼保存成SparkContext.py,然后用命令spark-submitSparkContext.py運行,結果如圖5-5所示。5.5分層抽樣將上述代碼保存成SparkContex265.6流數據顯著性檢驗Spark提供一些檢驗的在線實現(xiàn),以支持A/B檢驗等用例。這些檢驗可以在SparkStreamingDStream[(Boolean,Double)]上執(zhí)行,其中每個元組的第一個元素表示對照組(false)或實驗組(true),第二個元素是觀測值。流顯著性檢驗支持以下參數:peacePeriod:要忽略的流數據中,初始數據點的數量,用于減輕新異效應。windowSize:執(zhí)行假設檢驗的先前批次數。設置為0將使用所有先前批次執(zhí)行累積處理。5.6流數據顯著性檢驗Spark提供一些檢驗的在線實275.6流數據顯著性檢驗【例5-6】StreamingTest提供的流數據假設檢驗。importorg.apache.spark.SparkConfimportorg.apache.spark.mllib.stat.test.{BinarySample,StreamingTest}importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.util.Utils5.6流數據顯著性檢驗【例5-6】Streaming285.6流數據顯著性檢驗objectStreamingTestExample{defmain(args:Array[String]){if(args.length!=3){//scalastyle:offprintlnSystem.err.println("Usage:StreamingTestExample"+"<dataDir><batchDuration><numBatchesTimeout>")5.6流數據顯著性檢驗objectStreaming295.6流數據顯著性檢驗//scalastyle:onprintlnSystem.exit(1)}valdataDir=args(0)valbatchDuration=Seconds(args(1).toLong)valnumBatchesTimeout=args(2).toInt5.6流數據顯著性檢驗//scalastyle:305.6流數據顯著性檢驗valconf=newSparkConf().setMaster("local").setAppName("StreamingTestExample")valssc=newStreamingContext(conf,batchDuration)ssc.checkpoint{valdir=Utils.createTempDir()dir.toString}5.6流數據顯著性檢驗valconf=new315.6流數據顯著性檢驗//$exampleon$valdata=ssc.textFileStream(dataDir).map(line=>line.split(",")match{caseArray(label,value)=>BinarySample(label.toBoolean,value.toDouble)})valstreamingTest=newStreamingTest().setPeacePeriod(0).setWindowSize(0).setTestMethod("welch")5.6流數據顯著性檢驗//$exampleon$325.6流數據顯著性檢驗valout=streamingTest.registerStream(data)out.print()//$exampleoff$//StopprocessingiftestbecomessignificantorwetimeoutvartimeoutCounter=numBatchesTimeoutout.foreachRDD{rdd=>timeoutCounter-=1valanySignificant=rdd.map(_.pValue<0.05).fold(false)(_||_)if(timeoutCounter==0||anySignificant)rdd.context.stop()}ssc.start()ssc.awaitTermination()}}5.6流數據顯著性檢驗valout=stream335.6流數據顯著性檢驗將上述代碼保存成StreamingTestExample.scala,然后使用命令spark-submit--classorg.apache.spark.examples.mllib.StreamingTestExamplespark-examples_2.11-2.4.0.jar運行,結果如圖5-6所示。
5.6流數據顯著性檢驗將上述代碼保存成Streami345.6流數據顯著性檢驗5.6流數據顯著性檢驗355.7隨機數據生成隨機數據生成對于隨機化算法、原型設計和性能測試都非常有用。Spark支持從給定分布(均勻分布、標準正態(tài)分布或泊松分布)抽取id值,并生成對應的隨機RDD。【例5-7】生成隨機雙RDD,其值遵循標準正態(tài)分布N(0,1),然后將其映射到N(1,4)。RandomRDDs提供相應方法來生成隨機雙RDD或向量RDD。5.7隨機數據生成隨機數據生成對于隨機化算法、原型設計365.7隨機數據生成frompyspark.mllib.randomimportRandomRDDsfrompysparkimportSparkContext,SparkConfsc=SparkContext(appName="PythonRandomnumberGeneration")#sc=...#SparkContext#GeneratearandomdoubleRDDthatcontains1millioni.i.d.valuesdrawnfromthe#standardnormaldistribution`N(0,1)`,evenlydistributedin10partitions.5.7隨機數據生成frompyspark.mllib375.7隨機數據生成u=RandomRDDs.normalRDD(sc,1000000,10)#ApplyatransformtogetarandomdoubleRDDfollowing`N(1,4)`.v=u.map(lambdax:1.0+2.0*x)print(v)將上述代碼保存成RandomRDDs.py,然后用命令spark-submitRandomRDDs.py運行,結果如圖5-7所示。5.7隨機數據生成u=RandomRDDs.nor385.7隨機數據生成5.7隨機數據生成395.8核密度估計核密度估計是一種可用于可視化經驗概率分布的技術,而無須對觀察到樣本的特定分布進行假設。它可以用來計算隨機變量概率密度函數的估計值,在給定的一組點處進行評估。通過將特定點的經驗分布PDF,表示為以每個樣本為中心的正態(tài)分布PDF的均值來實現(xiàn)該估計?!纠?-8】用KernelDensity從樣本的RDD計算核密度估計。frompysparkimportSparkContext#$exampleon$frompyspark.mllib.statimportKernelDensity5.8核密度估計核密度估計是一種可用于可視化經驗概率分405.8核密度估計#$exampleoff$if__name__=="__main__":sc=SparkContext(appName="KernelDensityEstimationExample")#SparkContext#$exampleon$#anRDDofsampledatadata=sc.parallelize([1.0,1.0,1.0,2.0,3.0,4.0,5.0,5.0,6.0,7.0,8.0,9.0,9.0])#ConstructthedensityestimatorwiththesampledataandastandarddeviationfortheGaussian5.8核密度估計#$exampleoff$if415.8核密度估計#kernelskd=KernelDensity()kd.setSample(data)kd.setBandwidth(3.0)#Finddensityestimatesforthegivenvaluesdensities=kd.estimate([-1.0,2.0,5.0])#$exampleoff$print(densities)sc.stop()將上述代碼保存成KernelDensity.py,然后用命令spark-submitKernelDensity.py運行,結果如圖5-8所示。5.8核密度估計#kernels425.8核密度估計5.8核密度估計43習題1. 用PySpark編程生成隨機雙RDD,其值遵循標準正態(tài)分布N(0,1),并將其映射到N(1,5)。2. 通過sampleByKey()進行近似分層采樣編程。3. 用PySpark編程實現(xiàn)Pearson的卡方(χ2)統(tǒng)計。習題1. 用PySpark編程生成隨機雙RDD,44第5章基本統(tǒng)計課件45第5章基本統(tǒng)計課件46第5章基本統(tǒng)計課件47第5章基本統(tǒng)計第5章基本統(tǒng)計485.1相關性計算兩個數據系列之間的相關性是統(tǒng)計學中的常見操作。用spark.ml可靈活地計算多個系列兩兩之間的相關性。目前Spark支持的相關方法是Pearson方法和Spearman方法。Correlation使用指定的方法計算輸入數據集的相關矩陣。輸出將是一個DataFrame,它包含向量列的相關矩陣?!纠?-1】Correlation的PythonAPI代碼。from__future__importprint_function#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportCorrelation#$exampleoff$5.1相關性計算兩個數據系列之間的相關性是統(tǒng)計學中的常見495.1相關性frompyspark.sqlimportSparkSessionif__name__=="__main__":spark=SparkSession\.builder\.appName("CorrelationExample")\.getOrCreate()#$exampleon$data=[(Vectors.sparse(4,[(0,1.0),(3,-2.0)]),),(Vectors.dense([4.0,5.0,0.0,3.0]),),(Vectors.dense([6.0,7.0,0.0,8.0]),),5.1相關性frompyspark.sqlimpor505.1相關性(Vectors.sparse(4,[(0,9.0),(3,1.0)]),)]df=spark.createDataFrame(data,["features"])r1=Correlation.corr(df,"features").head()print("Pearsoncorrelationmatrix:\n"+str(r1[0]))r2=Correlation.corr(df,"features","spearman").head()print("Spearmancorrelationmatrix:\n"+str(r2[0]))#$exampleoff$spark.stop()5.1相關性(Vectors.sparse(4,[(0515.1相關性5.1相關性525.2假設檢驗假設檢驗是統(tǒng)計學中一種強有力的工具,用于確定結果是否具有統(tǒng)計顯著性,無論該結果是否偶然發(fā)生。spark.ml目前支持Pearson的卡方(χ2)獨立性測試。ChiSquareTest針對標簽的每個特征進行Pearson獨立測試。對于每個特征,(特征,標簽)對被轉換為列聯(lián)矩陣,對其計算卡方統(tǒng)計量。所有標簽和特征值必須是可分類的?!纠?-2】Pearson卡方獨立性測試的PythonAPI代碼。5.2假設檢驗假設檢驗是統(tǒng)計學中一種強有力的工具,用于535.2假設檢驗from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportChiSquareTest#$exampleoff$if__name__=="__main__":spark=SparkSession\5.2假設檢驗from__future__impo545.2假設檢驗builder\.appName("ChiSquareTestExample")\.getOrCreate()#$exampleon$data=[(0.0,Vectors.dense(0.5,10.0)),(0.0,Vectors.dense(1.5,20.0)),(1.0,Vectors.dense(1.5,30.0)),(0.0,Vectors.dense(3.5,30.0)),(0.0,Vectors.dense(3.5,40.0)),(1.0,Vectors.dense(3.5,40.0))]5.2假設檢驗builder\555.2假設檢驗df=spark.createDataFrame(data,["label","features"])r=ChiSquareTest.test(df,"features","label").head()print("pValues:"+str(r.pValues))print("degreesOfFreedom:"+str(r.degreesOfFreedom))print("statistics:"+str(r.statistics))#$exampleoff$spark.stop()5.2假設檢驗df=spark.createDa565.2假設檢驗將上述代碼保存成Pearsonx.py,然后用命令spark-submitPearsonx.py運行,結果如圖5-2所示。
5.2假設檢驗將上述代碼保存成Pearsonx.py575.3累積器通過Summarizer為Dataframe提供向量列摘要統(tǒng)計??捎弥笜耸橇械?大值、 小值、平均值、方差、非零數及總計數。【例5-3】Summarizer的PythonAPI代碼。from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.statimportSummarizerfrompyspark.sqlimportRowfrompyspark.ml.linalgimportVectors5.3累積器通過Summarizer為Datafr585.3累積器#$exampleoff$if__name__=="__main__":spark=SparkSession\.builder\.appName("SummarizerExample")\.getOrCreate()sc=spark.sparkContext#$exampleon$5.3累積器#$exampleoff$595.3累積器df=sc.parallelize([Row(weight=1.0,features=Vectors.dense(1.0,1.0,1.0)),Row(weight=0.0,features=Vectors.dense(1.0,2.0,3.0))]).toDF()#createsummarizerformultiplemetrics"mean"and"count"summarizer=Summarizer.metrics("mean","count")#computestatisticsformultiplemetricswithweightdf.select(summarizer.summary(df.features,df.weight)).show(truncate=False)5.3累積器df=sc.parallelize([R605.3累積器#computestatisticsformultiplemetricswithoutweightdf.select(summarizer.summary(df.features)).show(truncate=False)#computestatisticsforsinglemetric"mean"withweightdf.select(Summarizer.mean(df.features,df.weight)).show(truncate=False)#computestatisticsforsinglemetric"mean"withoutweightdf.select(Summarizer.mean(df.features)).show(truncate=False)#$exampleoff$spark.stop()5.3累積器#computestatisticsf615.3累積器將上述代碼保存成Summarizerx.py,然后用命令spark-submitSummarizerx.py運行,結果如圖5-3所示。5.3累積器將上述代碼保存成Summarizerx.p625.4摘要統(tǒng)計【例5-4】通過函數colStats為RDD[Vector]提供列摘要統(tǒng)計信息,colStats()返回一個MultivariateStatisticalSummary實例。importnumpyasnpfrompyspark.mllib.statimportStatisticsmat=sc.parallelize([np.array([1.0,10.0,100.0]),np.array([2.0,20.0,200.0]),np.array([3.0,30.0,300.0])])#anRDDofVectors#Computecolumnsummarystatistics.summary=Statistics.colStats(mat)5.4摘要統(tǒng)計【例5-4】通過函數colStats635.4摘要統(tǒng)計print(summary.mean())#adensevectorcontainingthemeanvalueforeachcolumnprint(summary.variance())#column-wisevarianceprint(summary.numNonzeros())#numberofnonzerosineachcolumn將上述代碼保存成RDDVectorx.py,然后用命令spark-submitRDDVectorx.py運行,結果如圖5-4所示。5.4摘要統(tǒng)計print(summary.mean()645.4摘要統(tǒng)計5.4摘要統(tǒng)計655.4摘要統(tǒng)計與駐留在spark.mllib中的其他統(tǒng)計函數不同,可以對RDD的鍵值對執(zhí)行分層抽樣方法sampleByKey和sampleByKeyExact。對于分層抽樣,可以將鍵視為標簽,將值視為特定屬性。例如,關鍵字可以是男人、女人或文檔ID,并且相應的值可以是人的年齡列表或文檔中的單詞列表。sampleByKey方法將翻轉硬幣以決定是否對樣本進行采樣,因此需要對數據進行一次傳遞,并提供預期的樣本大小。sampleByKeyExact比sampleByKey中使用的每層簡單隨機抽樣需要更多的資源,但是會提供99.99%置信度的精確抽樣大小。5.4摘要統(tǒng)計與駐留在spark.mllib中的其665.4摘要統(tǒng)計【例5-5】分層抽樣。sampleByKey()允許用戶近似采樣。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])
5.4摘要統(tǒng)計【例5-5】分層抽樣。sampleBy675.4摘要統(tǒng)計#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.4摘要統(tǒng)計#specifytheexact685.4摘要統(tǒng)計將上述代碼保存成SparkContext.py,然后用命令spark-submitSparkContext.py運行,結果如圖5-5所示。5.4摘要統(tǒng)計將上述代碼保存成SparkContex695.5分層抽樣與駐留在spark.mllib中的其他統(tǒng)計函數不同,可以對RDD的鍵值對執(zhí)行分層抽樣方法sampleByKey和sampleByKeyExact。對于分層抽樣,可以將鍵視為標簽,將值視為特定屬性。例如,關鍵字可以是男人、女人或文檔ID,并且相應的值可以是人的年齡列表或文檔中的單詞列表。sampleByKey方法將翻轉硬幣以決定是否對樣本進行采樣,因此需要對數據進行一次傳遞,并提供預期的樣本大小。sampleByKeyExact比sampleByKey中使用的每層簡單隨機抽樣需要更多的資源,但是會提供99.99%置信度的精確抽樣大小。5.5分層抽樣與駐留在spark.mllib中的其705.5分層抽樣【例5-5】分層抽樣。sampleByKey()允許用戶近似采樣。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])
5.5分層抽樣【例5-5】分層抽樣。sampleBy715.5分層抽樣#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.5分層抽樣#specifytheexact725.5分層抽樣將上述代碼保存成SparkContext.py,然后用命令spark-submitSparkContext.py運行,結果如圖5-5所示。5.5分層抽樣將上述代碼保存成SparkContex735.6流數據顯著性檢驗Spark提供一些檢驗的在線實現(xiàn),以支持A/B檢驗等用例。這些檢驗可以在SparkStreamingDStream[(Boolean,Double)]上執(zhí)行,其中每個元組的第一個元素表示對照組(false)或實驗組(true),第二個元素是觀測值。流顯著性檢驗支持以下參數:peacePeriod:要忽略的流數據中,初始數據點的數量,用于減輕新異效應。windowSize:執(zhí)行假設檢驗的先前批次數。設置為0將使用所有先前批次執(zhí)行累積處理。5.6流數據顯著性檢驗Spark提供一些檢驗的在線實745.6流數據顯著性檢驗【例5-6】StreamingTest提供的流數據假設檢驗。importorg.apache.spark.SparkConfimportorg.apache.spark.mllib.stat.test.{BinarySample,StreamingTest}importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.util.Utils5.6流數據顯著性檢驗【例5-6】Streaming755.6流數據顯著性檢驗objectStreamingTestExample{defmain(args:Array[String]){if(args.length!=3){//scalastyle:offprintlnSystem.err.println("Usage:StreamingTestExample"+"<dataDir><batchDuration><numBatchesTimeout>")5.6流數據顯著性檢驗objectStreaming765.6流數據顯著性檢驗//scalastyle:onprintlnSystem.exit(1)}valdataDir=args(0)valbatchDuration=Seconds(args(1).toLong)valnumBatchesTimeout=args(2).toInt5.6流數據顯著性檢驗//scalastyle:775.6流數據顯著性檢驗valconf=newSparkConf().setMaster("local").setAppName("StreamingTestExample")valssc=newStreamingContext(conf,batchDuration)ssc.checkpoint{valdir=Utils.createTempDir()dir.toString}5.6流數據顯著性檢驗valconf=new785.6流數據顯著性檢驗//$exampleon$valdata=ssc.textFileStream(dataDir).map(line=>line.split(",")match{caseArray(label,value)=>BinarySample(label.toBoolean,value.toDouble)})valstreamingTest=newStreamingTest().setPeacePeriod(0).setWindowSize(0).setTestMethod("welch")5.6流數據顯著性檢驗//$exampleon$795.6流數據顯著性檢驗valout=streamingTest.registerStream(data)out.print()//$exampleoff$//StopprocessingiftestbecomessignificantorwetimeoutvartimeoutCounter=numBatchesTimeoutout.foreachRDD{rdd=>timeoutCounter-=1valanySignificant=rdd.map(_.pValue<0.05).fold(false)(_||_)if(timeoutCounter==0||anySignificant)rdd.context.stop()}ssc.start()ssc.awaitTermination()}}5.6流數據顯著性檢驗valout=stream805.6流數據顯著性檢驗將上述代碼保存成StreamingTestExample.scala,然后使用命令spark-submit--classorg.apache.spark.examples.mllib.Stre
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- Welcome to school (教學設計)-2024-2025學年外研版(三起)(2024)英語三年級上冊
- 11爸爸媽媽在我心中-愛父母在行動(第2課時)(教學設計)2023-2024學年統(tǒng)編版道德與法治三年級上冊
- 滬科版高中信息技術必修教材《信息技術基礎》教學設計:第3章 信息的加工與獲取 綜合活動 資料網站的制作(保護水資源)
- 全國中圖版高中信息技術選修2第二單元第一節(jié)1、《素材獲取》教學設計
- 新型儲能在電動汽車中的應用
- 第五單元《倍的認識》(教學設計)-2024-2025學年三年級數學上學期人教版
- 影劇院外立面裝修合同范本
- 2025二手房裝修合同5篇
- 城市休閑公園土地資源及使用情況
- 辦公樓裝修改造項目投資分析
- 部編人教版六年級道德與法治下冊全冊完整版課件
- 會議紀要督辦管理制度
- 電動車輛動力電池系統(tǒng)及應用技術 第3版 課件全套 王震坡 第1-11章 動力電池及其驅動的電動車輛- 動力電池充電方法與基礎設施
- 2024云南中考數學二輪專題復習 題型五 二次函數性質綜合題(課件)
- JB∕T 9006-2013 起重機 卷筒標準規(guī)范
- 家庭法律服務行業(yè)市場突圍建議書
- 高一數學同步優(yōu)品講練課件(人教A版2019必修第一冊)3.2 函數的基本性質(課時3 函數的奇偶性)(課件)
- 太平洋保險計劃書模板
- 2024年廣東省中考生物+地理試卷(含答案)
- 智能化弱電工程技術方案(完整)
- 有關煤礦生產新技術、新工藝、新設備和新材料及其安全技術要求課件
評論
0/150
提交評論