歡迎光臨
我們一直在努力

Spark之Java程式設計

文章摘要: 則該運算元返回一個新的資料集包含 (KV) 且引數RDD(otherDataset)包含元素型別(K

Spark是一個用來實現快速而通用的叢集計算的平臺。擴充套件了廣泛使用的MapReduce計算模型,而且高效地支援更多的計算模式,包括互動式查詢和流處理。在處理大規模資料集的時候,速度是非常重要的。Spark的一個重要特點就是能夠在記憶體中計算,因而更快。即使在磁碟上進行的複雜計算,Spark依然比MapReduce更加高效。

Spark執行模式

Spark執行模式中Hadoop YARN的叢集方式最為常用的方式,目前Spark的執行模式主要有以下幾種:

  • local
    主要用於開發除錯Spark應用程式
  • Standlone
    利用Spark自帶的資源管理與排程器執行Spark叢集,採用Master/Slave結構,為解決單點故障,可以採用Xookeeper實現高可靠(High Availability, HA)
  • Apache Mesos
    執行在著名的Mesos資源管理框架基礎之上,該叢集執行模式將資源管理管理交給Mesos,Spark只負責執行任務排程和計算
  • Hadoop YARN
    叢集執行在Yarn資源管理器上,資源管理交給YARN,Spark只負責進行任務排程和計算

Spark的組成

Spark已經發展成為包含眾多子專案的大資料計算平臺。伯克利將Spark的整個生態系統稱為伯克利資料分析棧(BDAS)。其核心框架是Spark,同時BDAS涵蓋支援結構化資料SQL查詢與分析的查詢引擎Spark SQL和Shark,提供機器學習功能的系統MLbase及底層的分散式機器學習庫MLlib、並行圖計算框架GraphX、流計算框架Spark Streaming、取樣近似計算查詢引擎BlinkDB、記憶體分散式檔案系統Tachyon、資源管理框架Mesos等子專案

spark core

Spark Core是Spark的核心元件,其基於核心的RDD(Resilient Distributed DataSet 彈性分散式資料集)抽象,提供了分散式作業分發、排程及豐富的RDD操作,這些操作通過Java、Python、Scala、R語言介面暴露。RDD是分佈於叢集各節點上的、不可變的彈性資料集,容納的是Java/Python/Scala/R的物件例項。Spark定義了RDD之上的兩種操作:Transformation和Action,RDD上執行一個操作之後,生成另一個RDD或者執行某些動作。

Spark Streaming

Spark Streaming基於Spark的計算效能進行流式的分析,將流式資料根據時間切分為小批量的資料(RDD),並在RDD上執行Transformation、Action操作完成分析。這種方式使得現有的很多批處理程式碼可以直接工作在流式處理的模式下。但是這種模式以犧牲一定的延時為代價,相比於其他基於事件或者訊息的流式處理框架(Storm,Samza,Flink),延時比較大。Spark Streaming內建支援來自Kafka,Flume,ZeroMQ,Kinesis,Twitter,TCP/IP Socket的資料。

Spark SQL

Spark SQL引入了稱為DataFrames的資料抽象,提供對結構化和半結構化資料操作的支援。Spark提供了一套DSL用於DataFrame的操作,DSL可以通過Scala,Java,Python來表示。同時Spark SQK提供了對標準SQL語言的支援,包括命令列介面和ODBC/JDBC支援(驅動)

MLib

MLib是基於Spark的機器學習框架,由於Spark的分散式記憶體框架,其實現的常用演算法包括 概要統計、分類和迴歸、協同過濾、聚類分析、降維、特徵提取與轉換函式。

GraphX

GraphX則是基於Spark的分散式圖處理框架,對標到Hadoop體系下基於MapReduce(因此基於磁碟)的圖處理框架Giraph.由於RDD是不可變的,因此GraphX不適合需要更新操作的場景。GraphX提供了兩套API,一套類似於Google Pregel提供的API,另一套則更像是MapReduce的風格。

Spark的程式碼結構

  • scheduler:資料夾中含有負責整體的Spark應用、任務排程的程式碼。
  • broadcast:含有Broadcast(廣播變數)的實現程式碼,API中是Java和Python API的實
  • deploy:含有Spark部署與啟動執行的程式碼。
  • common:不是一個資料夾,而是代表Spark通用的類和邏輯實現,有5000行程式碼。
  • metrics:是執行時狀態監控邏輯程式碼,Executor中含有Worker節點負責計算的邏輯程式碼。
  • partial:含有近似評估程式碼。
  • network:含有叢集通訊模組程式碼。
  • serializer:含有序列化模組的程式碼。
  • storage:含有儲存模組的程式碼。
  • ui:含有監控介面的程式碼邏輯。其他的程式碼模組分別是對Spark生態系統中其他元件的實現。
  • streaming:是Spark Streaming的實現程式碼。
  • YARN:是Spark on YARN的部分實現程式碼。
  • graphx:含有GraphX實現程式碼。
  • interpreter:程式碼互動式Shell的程式碼量為3300行。
  • mllib:代表MLlib演算法實現的程式碼量。
  • sql代表Spark SQL的程式碼量

Spark的工作流程

Spark的整體流程為:Client提交應用,Master找到一個Worker啟動Driver,Driver向Master或者資源管理器申請資源,之後將應用轉化為RDD Graph,再由DAGScheduler將RDD Graph轉化為Stage的有向無環圖提交給TaskScheduler,由TaskScheduler提交任務給Executor執行。在任務執行的過程中,其他元件協同工作,確保整個應用順利執行。

Java SparkCore程式設計

Spark提供了Java程式設計介面,通常可以先獲取JavaSparkContext,讓過建立RDD物件,然後執行響應的操作即可。可以通過Maven加入如下配置

  
    org.apache.spark  
    spark-core_2.11  
    2.0.2  

然後按照如下模式進行相關的業務程式碼開發

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
  
public class HelloSpark{  
      
    public static void main(String[] args){  
    	// 1 建立一個sparkconf 物件並配置
		// 使用setMaster 可以設定spark叢集可以連結叢集的URL,如果設定local 代表在本地執行而不是在叢集執行
        SparkConf conf = new SparkConf().setMaster("local").setAppName("HelloSpark");

        // 2 建立javasparkContext物件
		// sparkcontext 是一個入口,主要作用就是初始化spark應用程式所需的一些核心元件,例如排程器,task,
		// 還會註冊spark,sparkMaster結點上註冊。
        try (JavaSparkContext jsc = new JavaSparkContext(conf)) { 


            // 3do something here
        }  
    }  
  
}

完成本地測試後可以到響應的程式,提交至spark中執行,通過可以編寫響應的指令碼

/opt/spark/bin/spark-submit                     # 用這個命令啟動
--class com.xxx.HelloSpark     # 配置類名
--num-executors 3                               # 配置在三個結點上執行
--driver-memory 100m                            # drive記憶體
--executor-memory 100m                          # 配置execute記憶體
--executor-cores 3                              # 核心執行單元數
/opt/spark-script/java/HelloSpark-0.0.1-SNAPSHOT.jar      # 執行的jar包

下面是一個使用spark進行詞彙統計的小程式

package com.opslab.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;

public class _01WordCount{
    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
        try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
            /*
* 第3步:根據具體的資料來源(HDFS、HBase、Local FS、DB、S3等)通過SparkContext來建立RDD
* JavaRDD的建立基本有三種方式:根據外部的資料來源(例如HDFS)、根據Scala集合、由其它的RDD操作
* 資料會被JavaRDD劃分成為一系列的Partitions,分配到每個Partition的資料屬於一個Task的處理範疇
*/
            JavaRDD lines = jsc.textFile("D:/workspace/opslabSpark/resources/README.md");
            /*
* 第4步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算
* 第4.1步:講每一行的字串拆分成單個的單詞
*/

            JavaRDD words = lines.flatMap(new FlatMapFunction() {
                @Override
                public Iterator call(String s)throws Exception {
                    return (new ArrayList(Arrays.asList(s.split(" ")))).iterator();
                }
                //如果是scala由於Sam轉化所以可以寫成一行程式碼

            });

            /*
* 第4步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算
* 第4.2步:在單詞拆分的基礎上對每個單詞例項計數為1,也就是word => (word, 1)
*/

            JavaPairRDD pairs = words.mapToPair(new PairFunction() {
                @Override
                public Tuple2 call(String word)throws Exception {
                    // TODO Auto-generated method stub
                    return new Tuple2(word, 1);
                }
            });

            /*
* 第4步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算
* 第4.3步:在每個單詞例項計數為1基礎之上統計每個單詞在檔案中出現的總次數
*/

            JavaPairRDD wordsCount = pairs.reduceByKey(new Function2() {
                 //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)
                @Override
                public Integer call(Integer v1, Integer v2)throws Exception {
                    // TODO Auto-generated method stub
                    return v1 + v2;
                }

            });

            wordsCount.foreach(new VoidFunction>() {
                @Override
                public void call(Tuple2 pairs)throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(pairs._1 + " : " + pairs._2);
                }

            });

            // 5. 結果輸出
            // 5.1 結果輸出到HDFS
            //wordsCount.saveAsTextFile("D:/workspace/opslabJava/spark/out/sparkout/wordcount");

            // 單獨出來結果集
            wordsCount.foreachPartition(new VoidFunction>>() {
                @Override
                public void call(Iterator> tuple2Iterator)throws Exception {
                    Tuple2 t2 = tuple2Iterator.next();
                    System.out.println(t2._1());
                    System.out.println(t2._2());

                }
            });

        }
    }
}

RDD運算元

RDD支援兩種型別的運算元(operation):transformation運算元 和 action運算元

transformation運算元可以將已有RDD轉換得到一個新的RDD,而action運算元則是基於資料集計算,並將結果返回給驅動器(driver)。例如,map是一個transformation運算元,它將資料集中每個元素傳給一個指定的函式,並將該函式返回結果構建為一個新的RDD;而 reduce是一個action運算元,它可以將RDD中所有元素傳給指定的聚合函式,並將最終的聚合結果返回給驅動器(還有一個reduceByKey運算元,其返回的聚合結果是一個數據集)。

Spark中所有transformation運算元都是懶惰的,也就是說,這些運算元並不立即計算結果,而是記錄下對基礎資料集(如:一個數據檔案)的轉換操作。只有等到某個action運算元需要計算一個結果返回給驅動器的時候,transformation運算元所記錄的操作纔會被計算。這種設計使Spark可以執行得更加高效 – 例如,map運算元建立了一個數據集,同時該資料集下一步會呼叫reduce運算元,那麼Spark將只會返回reduce的最終聚合結果(單獨的一個數據)給驅動器,而不是將map所產生的資料集整個返回給驅動器。預設情況下,每次呼叫action運算元的時候,每個由transformation轉換得到的RDD都會被重新計算。然而,你也可以通過呼叫persist(或者cache)操作來持久化一個RDD,這意味著Spark將會把RDD的元素都儲存在叢集中,因此下一次訪問這些元素的速度將大大提高。同時,Spark還支援將RDD元素持久化到記憶體或者磁碟上,甚至可以支援跨節點多副本。

轉換運算元 – transformation

以下是Spark支援的一些常用transformation運算元。詳細請參考 RDD API doc (Scala, Java, Python, R) 以及 鍵值對 RDD 函式 (Scala, Java) 。Java的相關API可以查閱 http://spark.apache.org/docs/latest/api/java/index.html

  • map(func)
    返回一個新的分散式資料集,其中每個元素都是由源RDD中一個元素經func轉換得到的。
  • filter(func)
    返回一個新的資料集,其中包含的元素來自源RDD中元素經func過濾後(func返回true時才選中)的結果
  • flatMap(func)
    類似於map,但每個輸入元素可以對映到0到n個輸出元素(所以要求func必須返回一個Seq而不是單個元素)
  • mapPartitions(func)
    類似於map,但基於每個RDD分割槽(或者資料block)獨立執行,所以如果RDD包含元素型別為T,則 func 必須是 Iterator

    => Iterator
    的對映函式。
  • mapPartitionsWithIndex(func)
    類似於 mapPartitions,只是func 多了一個整型的分割槽索引值,因此如果RDD包含元素型別為T,則 func 必須是 Iterator

    => Iterator
    的對映函式。
  • sample(withReplacement, fraction, seed)
    取樣部分(比例取決於 fraction )資料,同時可以指定是否使用回置取樣(withReplacement),以及隨機數種子(seed)
  • union(otherDataset)
    返回源資料集和引數資料集(otherDataset)的並集
  • intersection(otherDataset)
    返回源資料集和引數資料集(otherDataset)的交集
  • distinct([numTasks]))
    返回對源資料集做元素去重後的新資料集
  • groupByKey([numTasks])
    只對包含鍵值對的RDD有效,如源RDD包含 (K, V) 對,則該運算元返回一個新的資料集包含 (K, Iterable

    ) 對。

    注意:如果你需要按key分組聚合的話(如sum或average),推薦使用 reduceByKey或者 aggregateByKey 以獲得更好的效能。

    注意:預設情況下,輸出計算的並行度取決於源RDD的分割槽個數。當然,你也可以通過設定可選引數 numTasks 來指定並行任務的個數。

  • reduceByKey(func, [numTasks])
    如果源RDD包含元素型別 (K, V) 對,則該運算元也返回包含(K, V) 對的RDD,只不過每個key對應的value是經過func聚合後的結果,而func本身是一個 (V, V) => V 的對映函式。
    另外,和 groupByKey 類似,可以通過可選引數 numTasks 指定reduce任務的個數。
  • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    如果源RDD包含 (K, V) 對,則返回新RDD包含 (K, U) 對,其中每個key對應的value都是由 combOp 函式 和 一個「0」值zeroValue 聚合得到。允許聚合後value型別和輸入value型別不同,避免了不必要的開銷。和 groupByKey 類似,可以通過可選引數 numTasks 指定reduce任務的個數。
  • sortByKey([ascending], [numTasks])
    如果源RDD包含元素型別 (K, V) 對,其中K可排序,則返回新的RDD包含 (K, V) 對,並按照 K 排序(升序還是降序取決於 ascending 引數)
  • join(otherDataset, [numTasks])
    如果源RDD包含元素型別 (K, V) 且引數RDD(otherDataset)包含元素型別(K, W),則返回的新RDD中將包含內關聯後key對應的 (K, (V, W)) 對。外關聯(Outer joins)操作請參考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 運算元。
  • cogroup(otherDataset, [numTasks])
    如果源RDD包含元素型別 (K, V) 且引數RDD(otherDataset)包含元素型別(K, W),則返回的新RDD中包含 (K, (Iterable

    , Iterable

    ))。該運算元還有個別名:groupWith
  • cartesian(otherDataset)
    如果源RDD包含元素型別 T 且引數RDD(otherDataset)包含元素型別 U,則返回的新RDD包含前二者的笛卡爾積,其元素型別為 (T, U) 對。
  • pipe(command, [envVars])
    以shell命令列管道處理RDD的每個分割槽,如:Perl 或者 bash 指令碼。
    RDD中每個元素都將依次寫入程序的標準輸入(stdin),然後按行輸出到標準輸出(stdout),每一行輸出字串即成為一個新的RDD元素。
  • coalesce(numPartitions)
    將RDD的分割槽數減少到numPartitions。當以後大資料集被過濾成小資料集後,減少分割槽數,可以提升效率。
  • repartition(numPartitions)
    將RDD資料重新混洗(reshuffle)並隨機分佈到新的分割槽中,使資料分佈更均衡,新的分割槽個數取決於numPartitions。該運算元總是需要通過網路混洗所有資料。
  • repartitionAndSortWithinPartitions(partitioner)
    根據partitioner(spark自帶有HashPartitioner和RangePartitioner等)重新分割槽RDD,並且在每個結果分割槽中按key做排序。這是一個組合運算元,功能上等價於先 repartition 再在每個分割槽內排序,但這個運算元內部做了優化(將排序過程下推到混洗同時進行),因此效能更好。
    例項
    package com.opslab.spark;
    
    import com.google.common.collect.Lists;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import java.util.List;
    
    public class ApiMap{
        public static void main(String[] args){
            //演示api函式map
            SparkConf conf = new SparkConf().setMaster("local").setAppName("APIMap");
            try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
                List list = Lists.newArrayList(1, 2, 3);
    
                JavaRDD javaRDD = jsc.parallelize(list);
    
                //對資料集執行map操作,返回一個新的javaRDD
                JavaRDD mapRDD = javaRDD.map(new Function() {
                    @Override
                    public Integer call(Integer integer)throws Exception {
                        return integer * integer;
                    }
                });
    
                //mapRDD.collect();
                mapRDD.foreach(new VoidFunction() {
                    @Override
                    public void call(Integer integer)throws Exception {
                        System.out.println(integer);
                    }
                });
    
                //與上面的方式相同,只是使用了便捷的lambda
                JavaRDD mapRDD1 = javaRDD.map(s -> s * s);
                //mapRDD.collect();
                mapRDD1.foreach(new VoidFunction() {
                    @Override
                    public void call(Integer integer)throws Exception {
                        System.out.println(integer);
                    }
                });
            }
        }
    }
    

轉換運算元 – action

以下是Spark支援的一些常用action運算元。詳細請參考 RDD API doc (Scala, Java, Python, R) 以及 鍵值對 RDD 函式 (Scala, Java) 。

  • reduce(func)
    將RDD中元素按func進行聚合(func是一個 (T,T) => T 的對映函式,其中T為源RDD元素型別,並且func需要滿足 交換律 和 結合律 以便支援平行計算)
  • collect()
    將資料集中所有元素以陣列形式返回驅動器(driver)程式。通常用於,在RDD進行了filter或其他過濾操作後,將一個足夠小的資料子集返回到驅動器記憶體中。
  • count()
    返回資料集中元素個數
  • first()
    返回資料集中首個元素(類似於 take(1) )
  • take(n)
    返回資料集中前 n 個元素
  • takeSample(withReplacement,num, [seed])
    返回資料集的隨機取樣子集,最多包含 num 個元素,withReplacement 表示是否使用回置取樣,最後一個引數為可選引數seed,隨機數生成器的種子。
  • takeOrdered(n, [ordering])
    按元素排序(可以通過 ordering 自定義排序規則)後,返回前 n 個元素
  • saveAsTextFile(path)
    將資料集中元素儲存到指定目錄下的文字檔案中(或者多個文字檔案),支援本地檔案系統、HDFS 或者其他任何Hadoop支援的檔案系統。
    儲存過程中,Spark會呼叫每個元素的toString方法,並將結果儲存成檔案中的一行。
  • saveAsSequenceFile(path)
    將資料集中元素儲存到指定目錄下的Hadoop Sequence檔案中,支援本地檔案系統、HDFS 或者其他任何Hadoop支援的檔案系統。適用於實現了Writable介面的鍵值對RDD。在Scala中,同樣也適用於能夠被隱式轉換為Writable的型別(Spark實現了所有基本型別的隱式轉換,如:Int,Double,String 等)
  • saveAsObjectFile(path)
    將RDD元素以Java序列化的格式儲存成檔案,儲存結果檔案可以使用 SparkContext.objectFile 來讀取。
  • countByKey()
    只適用於包含鍵值對(K, V)的RDD,並返回一個雜湊表,包含 (K, Int) 對,表示每個key的個數。
  • foreach(func)
    在RDD的每個元素上執行 func 函式。通常被用於累加操作,如:更新一個累加器(Accumulator ) 或者 和外部儲存系統互操作。
    注意:用 foreach 操作出累加器之外的變數可能導致未定義的行為。更詳細請參考前面的「理解閉包」(Understanding closures )這一小節。
    例項
    package com.opslab.spark;
    import com.google.common.collect.Lists;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import java.util.List;
    public class ApiReduce{
        public static void main(String[] args){
            //演示api函式map
            SparkConf conf = new SparkConf().setMaster("local").setAppName("APIMap");
            try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
                List list = Lists.newArrayList(1, 2, 3);
    
                JavaRDD javaRDD = jsc.parallelize(list);
    
                //對資料集執行map操作,返回一個新的javaRDD
                Integer reduce = javaRDD.reduce(new Function2() {
                    @Override
                    public Integer call(Integer integer, Integer integer2)throws Exception {
                        return integer + integer2;
                    }
                });
                System.out.println(reduce);
            }
        }
    }
    

未經允許不得轉載:頭條楓林網 » Spark之Java程式設計