Spark Development Guide
1. Introduction to Spark
Spark is an open-source cluster computing system based on in-memory computation. Compared to MapReduce, Spark uses a faster computing engine to effectively support multiple types of computation, such as interactive queries and stream processing. Spark is designed to be highly accessible and provides a rich built-in library. Tasks can be designed in Python, Java, Scala, or SQL on Spark.
1.1 Spark Running Mode
Spark can be run in the following ways:
- Local:
Spark runs locally in single-threaded mode, which is generally suitable for local development and testing.
- Standalone Spark:
Spark runs with a Master node and multiple Worker nodes. It comes with a complete set of services and can be deployed independently on a cluster without depending on any other resource management system.
- Spark on Yarn (the way UHadoop uses)
Based on Hadoop’s resource management system Yarn, Spark acts as a client for task submission. All tasks are submitted to Yarn, which is responsible for task allocation. Spark on Yarn is also divided into yarn-cluster and yarn-client modes. The differences are as follows:
- Yarn-cluster: The Driver runs on the Application Master (AM). The AM process is responsible for both driving the Application and resource application. It runs within the Container, and the client can be shut down after task submission. It is generally suitable for production environments but not suitable for running interactive tasks.
- Yarn-client: The Driver runs locally. After task submission, the client needs to communicate with the Container for job scheduling. It is suitable for interactive tasks and debugging, making it easier to see the task results.
2. How to Use Spark
2.1 Spark-submit
The most common way to submit spark tasks is through spark-submit. You can view the specific usage of spark-submit by using spark-submit —help.
Example
- Example: Submit a task to calculate pi in the sample program using spark-submit
spark-submit --master yarn --deploy-mode client --num-executors 2
--executor-cores 1 --executor-memory 1G
$SPARK_HOME/examples/src/main/python/pi.py 100
For more information on submitting tasks, please refer to: https://spark.apache.org/docs/1.6.0/submitting-applications.html
Submitting Tasks from External Machines in the Cluster
- Please refer to section 1 for environment setup.
- HDFS can only use the local username (the name displayed by whoami) as the HDFS username, so you need to add permission for the local username to access the target file on HDFS. The task submitted by the Spark client will use the /user/[username* directory by default. Therefore, the corresponding user root directory needs to be created on HDFS.
Test command:
[hadoop@10-10-116-236 bin]$ pwd
/root/testsparkclient/spark/bin
[hadoop@10-10-116-236 bin]$ ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ../lib/spark-examples*.jar 10
View the running results: If the screen displays the following message, it means the execution was successful.
final status: SUCCEEDED
The output result of this example is printed using standard output
System.out.println("Pi is roughly " + 4.0 * count / n)
So only the client mode will print it to the screen, the yarn mode needs to check it in the log hdfs://Ucluster/var/log/hadoop-yarn/apps/hadoop/logs/applicationid
2.2 Spark-shell
Spark-shell is a way provided by Spark to quickly implement task execution using Scala.
Example
- Start the spark-shell client
spark-shell
- Construct a HiveContext
scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
- Create a table src
scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING)")
- Load data from local file
scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/hadoop/spark/examples/src/main/resources/kv1.txt' INTO TABLE src")
- Table operation, display data from table src
scala> sqlContext.sql("FROM src SELECT key, value").collect().foreach(println);
-
Example
scala> val textFile = sc.textFile("file:///home/mine/data_file") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[29] at textFile at <console>:16 scala> val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).partitionBy(new HashPartitioner(10)) counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[32] at partitionBy at <console>:18 scala> counts.reduceByKey(_+_).saveAsTextFile("/home/mine/partition_spark/hash") scala>
2.3 Spark-pyspark shell
The command line pyspark is a way provided by Spark to quickly implement task execution using Python.
-
Enter the command line interactive client
pyspark
-
Example
>>> logFile="file:////home/hadoop/conf/core-site.xml" >>> logData=sc.textFile(logFile).cache() >>> numAs=logData.filter(lambda s:'a' in s).count() >>> numBs=logData.filter(lambda s:'b' in s).count() >>> print("Line with a:%i,line with b:%i" % (numAs,numBs)) Line with a:25,line with b:7
2.4 Spark-sql
Spark-sql is a component provided by Spark that uses SQL to process structured data. It provides a programmable abstract data model called DataFrames, and it can be regarded as a distributed SQL query engine, supporting most commonly used Hive SQL.
Example
-
Start the spark-sql client
spark-sql
-
Perform sql query
spark-sql> select * from src;
2.5 Spark-Hive
When using Spark Hive, it is necessary to configure hive-site.xml under SPARK_HOME/conf. Operate the Hive table through spark-shell.
We can check the usage of spark-shell through spark-shell —help.
For example:
spark-shell —master yarn —deploy-mode client —num-executors 3
We use yarn client mode to start spark-shell and set the number of executors to 3
After spark-shell starts, we can perform related operations:
- Construct a HiveContext
- Create table src
- Load data
- Table operation
Operate Hive table through spark-sql
We can check the usage of spark-sql through spark-sql —help.
For example:
spark-sql —master yarn —deploy-mode client —num-executors 3
We use yarn client mode to start spark-sql, and set the number of executors to 3
After spark-sql starts, we can operate the Hive table.
2.6 Spark-ThriftServer
Operate hive table via Thrift JDBC/ODBC server
Example
-
Start spark-thriftserver. Execute the following command under the hadoop user on Master1 node
/home/hadoop/spark/sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=`hostname` --supervise
-
Connect to the thrift interface using beeline
beeline> !connect jdbc:hive2://uhadoop-******-master1:10000/default;
Note: The username and password should be left blank for this.
-
Execute sql
0: jdbc:hive2://uhadoop-*****-master1:10000/> show tables; 0: jdbc:hive2://uhadoop-*****-master1:10000/> select * from src;
3. Spark Application Development
3.1 JAVA - WordCount Example
In examples, spark provides the following java examples:
Take wordcount as an example to show the running method:
spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn --deploy-mode cluster /home/hadoop/spark/lib/spark-examples*.jar /input/kv.txt /output/wordcount
Note:
- Prepare /input/kv.txt data on hdfs, any text will do.
- /input/kv.txt /output/wordcount are running parameters, if it is another program, replace it with the corresponding parameters.
- Here the allocated resources are not specified. In the actual test process of the user, the resources need to be set reasonably according to the amount of calculation used in the test.
The following will display the full process
Using eclipse to develop the spark java program process:
- Create Java Project
- Create lib directory and add the project-dependent package spark-assembly-1.4.1-hadoop2.6.0-cdh5.4.4.jar into lib package, and add to build path
- Complete function code
The code content is as follows:
package test.java.spark.job;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <input> <output>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
counts.saveAsTextFile(args[1]);
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
}
}
- Export project files
- Data preparation
Upload a paragraph of text to this location in hdfs /input/kv1.txt
hdfs dfs –put kv1.txt /input/kv1.txt
- Submit task to run
spark-submit --class test.java.spark.JavaWordCount --master yarn --deploy-mode client --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1 /home/hadoop/sparkJavaExample.jar /input/kv1.txt /output/wordcount
Note:
Don’t exceed the machine’s quota when setting resources.
3.2 Scala - HiveFromSpark Example
- Install sbt
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
sudo yum install sbt -y
- Build code
Take HiveFromSpark as an example of Spark example:
mkdir -p /data/HiveFromSpark/src/main/scala/com/ucl/spark/examples
cd /data/HiveFromSpark/src/main/scala/com/ucl/spark/examples
touch HiveFromSpark.scala;
For Spark1.6.0 and scala-2.10.5, you can add the following code to HiveFromSpark.scala
package com.ucl.spark.examples
import com.google.common.io.{ByteStreams, Files}
import java.io.File
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
object HiveFromSpark {
case class Record(key: Int, value: String)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveFromSpark")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
// Queries are expressed in HiveQL
println("Result of 'SELECT *': ")
sql("SELECT * FROM src").collect().foreach(println)
// Aggregation queries are also supported.
val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
println(s"COUNT(*): $count")
// The results of SQL queries are themselves RDDs and support all normal RDD functions. The
// items in the RDD are of type Row, which allows you to access each column by ordinal.
val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
println("Result of RDD.map:")
val rddAsStrings = rddFromSql.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
rdd.toDF().registerTempTable("records")
// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
sc.stop()
}
}
- Build sbt file
cd /data/HiveFromSpark/
touch HiveFromSpark.sbt;
Add the following content to the file:
name := "HiveFromSpark"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0"
- Compile
cd /data/HiveFromSpark/
sbt package
Since it needs to connect to maven to download related dependencies, the compilation time is limited by the network environment, please be patient and wait. The compiled file is located in /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar
- Execute
Client mode:
spark-submit --class com.ucl.spark.examples.HiveFromSpark --master yarn --deploy-mode client --num-executors 4 --executor-cores 1 /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar
Cluster mode:
spark-submit --class com.ucl.spark.examples.HiveFromSpark --master yarn --deploy-mode cluster --num-executors 4 --executor-cores 1 --files /home/hadoop/hive/conf/hive-site.xml --jars /home/hadoop/spark/lib/datanucleus-api-jdo-3.2.6.jar,/home/hadoop/spark/lib/datanucleus-rdbms-3.2.9.jar,/home/hadoop/spark/lib/datanucleus-core-3.2.10.jar /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar
3.3 Python - Example
3.3.1 PI Example
Note: Please refer to the example program in the examples/src/main/python directory under the spark installation directory.
Sample Code
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
You can submit the Spark PI task through the following command
spark-submit --master yarn --deploy-mode client --num-executors 4 --executor-cores 1 --executor-memory 2G $SPARK_HOME/examples/src/main/python/pi.py 100
Finally, a result similar to “Pi is roughly 3.141039” will appear in the console log
3.3.2 Package Python Dependencies
If our python application has many source files (or directory hierarchies) or relies on third-party modules, then when submitting the spark task, we need to package them and submit them to the cluster. For example, to package dependencies into a zip file:
pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .
Then add parameters when spark-submit
spark-submit --py-files dependencies.zip,other-libs.zip main.py --job jobname
Pure Python modules can be imported from zip files, but C-extension modules are not. They must be loaded by the OS runtime. PyInstaller, py2exe, and other tools all have this problem. Therefore, such dependent modules must be installed on each node of the cluster.