Docs
uhadoop
Developer Guide
Spark Development Guide

Spark Development Guide

1. Introduction to Spark

Spark is an open-source cluster computing system based on in-memory computation. Compared to MapReduce, Spark uses a faster computing engine to effectively support multiple types of computation, such as interactive queries and stream processing. Spark is designed to be highly accessible and provides a rich built-in library. Tasks can be designed in Python, Java, Scala, or SQL on Spark.

1.1 Spark Running Mode

Spark can be run in the following ways:

  • Local:

Spark runs locally in single-threaded mode, which is generally suitable for local development and testing.

  • Standalone Spark:

Spark runs with a Master node and multiple Worker nodes. It comes with a complete set of services and can be deployed independently on a cluster without depending on any other resource management system.

  • Spark on Yarn (the way UHadoop uses)

Based on Hadoop’s resource management system Yarn, Spark acts as a client for task submission. All tasks are submitted to Yarn, which is responsible for task allocation. Spark on Yarn is also divided into yarn-cluster and yarn-client modes. The differences are as follows:

  • Yarn-cluster: The Driver runs on the Application Master (AM). The AM process is responsible for both driving the Application and resource application. It runs within the Container, and the client can be shut down after task submission. It is generally suitable for production environments but not suitable for running interactive tasks.
  • Yarn-client: The Driver runs locally. After task submission, the client needs to communicate with the Container for job scheduling. It is suitable for interactive tasks and debugging, making it easier to see the task results.

2. How to Use Spark

2.1 Spark-submit

The most common way to submit spark tasks is through spark-submit. You can view the specific usage of spark-submit by using spark-submit —help.

Example

  • Example: Submit a task to calculate pi in the sample program using spark-submit
spark-submit --master yarn --deploy-mode client --num-executors 2
--executor-cores 1 --executor-memory 1G
$SPARK_HOME/examples/src/main/python/pi.py 100

For more information on submitting tasks, please refer to: https://spark.apache.org/docs/1.6.0/submitting-applications.html

Submitting Tasks from External Machines in the Cluster

  1. Please refer to section 1 for environment setup.
  2. HDFS can only use the local username (the name displayed by whoami) as the HDFS username, so you need to add permission for the local username to access the target file on HDFS. The task submitted by the Spark client will use the /user/[username* directory by default. Therefore, the corresponding user root directory needs to be created on HDFS.

Test command:

[hadoop@10-10-116-236 bin]$ pwd
/root/testsparkclient/spark/bin
[hadoop@10-10-116-236 bin]$ ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster  ../lib/spark-examples*.jar 10

View the running results: If the screen displays the following message, it means the execution was successful.

final status: SUCCEEDED

The output result of this example is printed using standard output

System.out.println("Pi is roughly " + 4.0 * count / n)

So only the client mode will print it to the screen, the yarn mode needs to check it in the log hdfs://Ucluster/var/log/hadoop-yarn/apps/hadoop/logs/applicationid

2.2 Spark-shell

Spark-shell is a way provided by Spark to quickly implement task execution using Scala.

Example

  • Start the spark-shell client
    spark-shell
  • Construct a HiveContext
    scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
  • Create a table src
    scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING)")
  • Load data from local file
    scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/hadoop/spark/examples/src/main/resources/kv1.txt' INTO TABLE src")
  • Table operation, display data from table src
    scala> sqlContext.sql("FROM src SELECT key, value").collect().foreach(println);
  • Example

    scala> val textFile = sc.textFile("file:///home/mine/data_file")
    textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[29] at textFile at <console>:16
    scala> val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).partitionBy(new HashPartitioner(10))
    counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[32] at partitionBy at <console>:18
    scala> counts.reduceByKey(_+_).saveAsTextFile("/home/mine/partition_spark/hash")
    scala>
    

2.3 Spark-pyspark shell

The command line pyspark is a way provided by Spark to quickly implement task execution using Python.

  • Enter the command line interactive client

    pyspark
    
  • Example

    >>> logFile="file:////home/hadoop/conf/core-site.xml"
    >>> logData=sc.textFile(logFile).cache()
    >>> numAs=logData.filter(lambda s:'a' in s).count()
    >>> numBs=logData.filter(lambda s:'b' in s).count()                             
    >>> print("Line with a:%i,line with b:%i" % (numAs,numBs))
    Line with a:25,line with b:7
    

2.4 Spark-sql

Spark-sql is a component provided by Spark that uses SQL to process structured data. It provides a programmable abstract data model called DataFrames, and it can be regarded as a distributed SQL query engine, supporting most commonly used Hive SQL.

Example

  • Start the spark-sql client

    spark-sql

  • Perform sql query

    spark-sql> select * from src;

2.5 Spark-Hive

When using Spark Hive, it is necessary to configure hive-site.xml under SPARK_HOME/conf. Operate the Hive table through spark-shell.

We can check the usage of spark-shell through spark-shell —help.

For example:

spark-shell —master yarn —deploy-mode client —num-executors 3

We use yarn client mode to start spark-shell and set the number of executors to 3

After spark-shell starts, we can perform related operations:

  • Construct a HiveContext
  • Create table src
  • Load data
  • Table operation

Operate Hive table through spark-sql

We can check the usage of spark-sql through spark-sql —help.

For example:

spark-sql —master yarn —deploy-mode client —num-executors 3

We use yarn client mode to start spark-sql, and set the number of executors to 3

After spark-sql starts, we can operate the Hive table.

2.6 Spark-ThriftServer

Operate hive table via Thrift JDBC/ODBC server

Example

  • Start spark-thriftserver. Execute the following command under the hadoop user on Master1 node

    /home/hadoop/spark/sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=`hostname`  --supervise
    
  • Connect to the thrift interface using beeline

    beeline> !connect jdbc:hive2://uhadoop-******-master1:10000/default;
    

    Note: The username and password should be left blank for this.

  • Execute sql

    0: jdbc:hive2://uhadoop-*****-master1:10000/> show tables;
    0: jdbc:hive2://uhadoop-*****-master1:10000/> select * from src;
    

3. Spark Application Development

3.1 JAVA - WordCount Example

In examples, spark provides the following java examples:

Take wordcount as an example to show the running method:

  spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn --deploy-mode cluster /home/hadoop/spark/lib/spark-examples*.jar /input/kv.txt /output/wordcount

Note:

  1. Prepare /input/kv.txt data on hdfs, any text will do.
  2. /input/kv.txt /output/wordcount are running parameters, if it is another program, replace it with the corresponding parameters.
  3. Here the allocated resources are not specified. In the actual test process of the user, the resources need to be set reasonably according to the amount of calculation used in the test.

The following will display the full process

Using eclipse to develop the spark java program process:

  1. Create Java Project
  1. Create lib directory and add the project-dependent package spark-assembly-1.4.1-hadoop2.6.0-cdh5.4.4.jar into lib package, and add to build path
  1. Complete function code

The code content is as follows:

  package test.java.spark.job;
  
  import java.util.Arrays;
  import java.util.List;
  import java.util.regex.Pattern;
  import org.apache.spark.SparkConf;
  import org.apache.spark.api.java.JavaPairRDD;
  import org.apache.spark.api.java.JavaRDD;
  import org.apache.spark.api.java.JavaSparkContext;
  import org.apache.spark.api.java.function.FlatMapFunction;
  import org.apache.spark.api.java.function.Function2;
  import org.apache.spark.api.java.function.PairFunction;
  
  import scala.Tuple2;
  
  public final class JavaWordCount {
      private static final Pattern SPACE = Pattern.compile(" ");
  
      public static void main(String[] args) throws Exception {
  
        if (args.length < 2) {
          System.err.println("Usage: JavaWordCount <input> <output>");
          System.exit(1);
        }
  
        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = ctx.textFile(args[0], 1);
  
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public Iterable<String> call(String s) {
            return Arrays.asList(SPACE.split(s));
          }
        });
  
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
          }
        });
  
        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
          @Override
          public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
          }
        });
  
        List<Tuple2<String, Integer>> output = counts.collect();
        counts.saveAsTextFile(args[1]);
        for (Tuple2<?,?> tuple : output) {
          System.out.println(tuple._1() + ": " + tuple._2());
        }
        ctx.stop();
     } 
  }
  1. Export project files
  1. Data preparation

Upload a paragraph of text to this location in hdfs /input/kv1.txt

hdfs dfs –put kv1.txt /input/kv1.txt
  1. Submit task to run
  spark-submit --class test.java.spark.JavaWordCount --master yarn --deploy-mode client --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1  /home/hadoop/sparkJavaExample.jar /input/kv1.txt /output/wordcount

Note:

Don’t exceed the machine’s quota when setting resources.

3.2 Scala - HiveFromSpark Example

  • Install sbt
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
sudo yum install sbt -y
  • Build code

Take HiveFromSpark as an example of Spark example:

mkdir -p /data/HiveFromSpark/src/main/scala/com/ucl/spark/examples
cd /data/HiveFromSpark/src/main/scala/com/ucl/spark/examples
touch HiveFromSpark.scala; 

For Spark1.6.0 and scala-2.10.5, you can add the following code to HiveFromSpark.scala

package com.ucl.spark.examples
 
import com.google.common.io.{ByteStreams, Files}
import java.io.File
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
 
object HiveFromSpark {
  case class Record(key: Int, value: String)
 
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HiveFromSpark")
    val sc = new SparkContext(sparkConf)
 
    val hiveContext = new HiveContext(sc)
    import hiveContext.implicits._
    import hiveContext.sql
 
    // Queries are expressed in HiveQL
    println("Result of 'SELECT *': ")
    sql("SELECT * FROM src").collect().foreach(println)
 
    // Aggregation queries are also supported.
    val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
    println(s"COUNT(*): $count")
 
    // The results of SQL queries are themselves RDDs and support all normal RDD functions.  The
    // items in the RDD are of type Row, which allows you to access each column by ordinal.
    val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
 
    println("Result of RDD.map:")
    val rddAsStrings = rddFromSql.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
 
    // You can also register RDDs as temporary tables within a HiveContext.
    val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
     // You can also register RDDs as temporary tables within a HiveContext.
    val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
    rdd.toDF().registerTempTable("records")
 
    // Queries can then join RDD data with data stored in Hive.
    println("Result of SELECT *:")
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
 
    sc.stop()
  }
}
  • Build sbt file
cd /data/HiveFromSpark/
touch  HiveFromSpark.sbt; 

Add the following content to the file:

name := "HiveFromSpark"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0"
  • Compile
cd /data/HiveFromSpark/
sbt package

Since it needs to connect to maven to download related dependencies, the compilation time is limited by the network environment, please be patient and wait. The compiled file is located in /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar

  • Execute

Client mode:

spark-submit --class com.ucl.spark.examples.HiveFromSpark --master yarn --deploy-mode client --num-executors 4 --executor-cores 1 /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar

Cluster mode:

spark-submit --class com.ucl.spark.examples.HiveFromSpark --master yarn --deploy-mode cluster --num-executors 4 --executor-cores 1 --files /home/hadoop/hive/conf/hive-site.xml --jars /home/hadoop/spark/lib/datanucleus-api-jdo-3.2.6.jar,/home/hadoop/spark/lib/datanucleus-rdbms-3.2.9.jar,/home/hadoop/spark/lib/datanucleus-core-3.2.10.jar /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar

3.3 Python - Example

3.3.1 PI Example

Note: Please refer to the example program in the examples/src/main/python directory under the spark installation directory.

Sample Code

from __future__ import print_function
 
import sys
from random import random
from operator import add
 
from pyspark.sql import SparkSession
 
if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()
 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions
 
    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0
 
    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))
 
    spark.stop()

You can submit the Spark PI task through the following command

spark-submit  --master yarn --deploy-mode client --num-executors 4 --executor-cores 1 --executor-memory 2G $SPARK_HOME/examples/src/main/python/pi.py 100

Finally, a result similar to “Pi is roughly 3.141039” will appear in the console log

3.3.2 Package Python Dependencies

If our python application has many source files (or directory hierarchies) or relies on third-party modules, then when submitting the spark task, we need to package them and submit them to the cluster. For example, to package dependencies into a zip file:

  pip install -t dependencies -r requirements.txt
  cd dependencies
  zip -r ../dependencies.zip .

Then add parameters when spark-submit

  spark-submit --py-files dependencies.zip,other-libs.zip main.py --job jobname

Pure Python modules can be imported from zip files, but C-extension modules are not. They must be loaded by the OS runtime. PyInstaller, py2exe, and other tools all have this problem. Therefore, such dependent modules must be installed on each node of the cluster.