Learning Apache Spark with Python
28 Chapter 4. An Introduction to Apache Spark
Learning Apache Spark with Python
• separate process to execute user applications
• creates SparkContext to schedule jobs execution and negotiate with cluster manager
2. Executors
• run tasks scheduled by driver
• store computation results in memory, on disk or off-heap
• interact with storage systems
3. Cluster Manager
• Mesos
• YARN
• Spark Standalone
Spark Driver contains more components responsible for translation of user code into actual jobs executed
on cluster:
• SparkContext
– represents the connection to a Spark cluster, and can be used to create RDDs, accu-
mulators and broadcast variables on that cluster
• DAGScheduler
– computes a DAG of stages for each job and submits them to TaskScheduler deter-
mines preferred locations for tasks (based on cache status or shuffle files locations)
and finds minimum schedule to run the jobs
• TaskScheduler
– responsible for sending tasks to the cluster, running them, retrying if there are failures,
and mitigating stragglers
• SchedulerBackend
4.2. Spark Components 29
Learning Apache Spark with Python
– backend interface for scheduling systems that allows plugging in different implemen-
tations(Mesos, YARN, Standalone, local)
• BlockManager
– provides interfaces for putting and retrieving blocks both locally and remotely into
various stores (memory, disk, and off-heap)
4.3 Architecture
4.4 How Spark Works?
Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities.
The layers are independent of each other.
The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter
your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When
the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler
divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of
the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map
operators can be scheduled in a single stage. This optimization is key to Sparks performance. The final
result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task
scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t
know about dependencies among stages.
30 Chapter 4. An Introduction to Apache Spark
CHAPTER
FIVE
PROGRAMMING WITH RDDS
Chinese proverb
If you only know yourself, but not your opponent, you may win or may lose. If you know neither
yourself nor your enemy, you will always endanger yourself – idiom, from Sunzi’s Art of War
RDD represents Resilient Distributed Dataset. An RDD in Spark is simply an immutable distributed
collection of objects sets. Each RDD is split into multiple partitions (similar pattern with smaller sets),
which may be computed on different nodes of the cluster.
5.1 Create RDD
Usually, there are two popular way to create the RDDs: loading an external dataset, or distributing a set
of collection of objects. The following examples show some simplest ways to create RDDs by using
parallelize() fucntion which takes an already existing collection in your program and pass the same
to the Spark Context.
1. By using parallelize( ) fucntion
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])
Then you will get the RDD data:
df.show()
+----+----+----+-----+
(continues on next page)
31
Learning Apache Spark with Python
(continued from previous page)
|col1|col2|col3| col4|
+----+----+----+-----+
| 1| 2| 3|a b c|
| 4| 5| 6|d e f|
| 7| 8| 9|g h i|
+----+----+----+-----+
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])
Then you will get the RDD data:
myData.collect()
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
2. By using createDataFrame( ) function
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Employee = spark.createDataFrame([
('1', 'Joe', '70000', '1'),
('2', 'Henry', '80000', '2'),
('3', 'Sam', '60000', '2'),
('4', 'Max', '90000', '1')],
['Id', 'Name', 'Sallary','DepartmentId']
)
Then you will get the RDD data:
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
| 1| Joe| 70000| 1|
| 2|Henry| 80000| 2|
| 3| Sam| 60000| 2|
| 4| Max| 90000| 1|
+---+-----+-------+------------+
32 Chapter 5. Programming with RDDs
Learning Apache Spark with Python
3. By using read and load functions
a. Read dataset from .csv file
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("/home/feng/Spark/Code/data/Advertising.csv",
˓→header=True)
df.show(5)
df.printSchema()
Then you will get the RDD data:
+---+-----+-----+---------+-----+
|_c0| TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
| 1|230.1| 37.8| 69.2| 22.1|
| 2| 44.5| 39.3| 45.1| 10.4|
| 3| 17.2| 45.9| 69.3| 9.3|
| 4|151.5| 41.3| 58.5| 18.5|
| 5|180.8| 10.8| 58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows
root
|-- _c0: integer (nullable = true)
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
Once created, RDDs offer two types of operations: transformations and actions.
b. Read dataset from DataBase
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
(continues on next page)
5.1. Create RDD 33
Learning Apache Spark with Python
(continued from previous page)
.getOrCreate()
## User information
user = 'your_username'
pw = 'your_password'
## Database information
table_name = 'table_name'
url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'&
˓→password='+pw
properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user
˓→': user}
df = spark.read.jdbc(url=url, table=table_name,
˓→properties=properties)
df.show(5)
df.printSchema()
Then you will get the RDD data:
+---+-----+-----+---------+-----+
|_c0| TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
| 1|230.1| 37.8| 69.2| 22.1|
| 2| 44.5| 39.3| 45.1| 10.4|
| 3| 17.2| 45.9| 69.3| 9.3|
| 4|151.5| 41.3| 58.5| 18.5|
| 5|180.8| 10.8| 58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows
root
|-- _c0: integer (nullable = true)
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
Note: Reading tables from Database needs the proper drive for the corresponding Database. For example,
the above demo needs org.postgresql.Driver and you need to download it and put it in jars folder
of your spark installation path. I download postgresql-42.1.1.jar from the official website and put
it in jars folder.
C. Read dataset from HDFS
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
(continues on next page)
34 Chapter 5. Programming with RDDs
Learning Apache Spark with Python
(continued from previous page)
sc= SparkContext('local','example')
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://cdhstltest/user/data/demo.CSV")
print(tf1.first())
hc.sql("use intg_cme_w")
spf = hc.sql("SELECT * FROM spf LIMIT 100")
print(spf.show(5))
5.2 Spark Operations
Warning: All the figures below are from Jeffrey Thompson. The interested reader is referred to pyspark
pictures
There are two main types of Spark operations: Transformations and Actions [Karau2015].
Note: Some people defined three types of operations: Transformations, Actions and Shuffles.
5.2. Spark Operations 35
Learning Apache Spark with Python
5.2.1 Spark Transformations
Transformations construct a new RDD from a previous one. For example, one common transformation is
filtering data that matches a predicate.
5.2.2 Spark Actions
Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or
save it to an external storage system (e.g., HDFS).
36 Chapter 5. Programming with RDDs
Learning Apache Spark with Python
5.3 rdd.DataFrame vs pd.DataFrame
5.3.1 Create DataFrame
1. From List
my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']
:: Python Code:
# caution for the columns=
pd.DataFrame(my_list,columns= col_name)
#
spark.createDataFrame(my_list, col_name).show()
:: Comparison:
+---+---+---+
| A| B| C|
A B C +---+---+---+
0 a 1 2 | a| 1| 2|
1 b 2 3 | b| 2| 3|
2 c 3 4 | c| 3| 4|
+---+---+---+
Attention: Pay attentation to the parameter columns= in pd.DataFrame. Since the default value
will make the list as rows.
:: Python Code:
# caution for the columns=
pd.DataFrame(my_list, columns= col_name)
#
pd.DataFrame(my_list, col_name)
:: Comparison:
A B C 0 1 2
0 a 1 2 A a 1 2
1 b 2 3 B b 2 3
2 c 3 4 C c 3 4
5.3. rdd.DataFrame vs pd.DataFrame 37