0% found this document useful (0 votes)
146 views7 pages

Spark Repartition1

This document discusses Spark repartition() and coalesce() methods for RDDs and DataFrames. Repartition is used to increase or decrease partitions by moving all data, while coalesce only decreases partitions by moving minimal data between partitions. The default number of partitions depends on factors like HDFS block size and cluster cores. Repartition shuffles all data, while coalesce optimizes data movement for partition decreases. It is best to minimize repartition operations due to their expense.

Uploaded by

dalalroshan
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
146 views7 pages

Spark Repartition1

This document discusses Spark repartition() and coalesce() methods for RDDs and DataFrames. Repartition is used to increase or decrease partitions by moving all data, while coalesce only decreases partitions by moving minimal data between partitions. The default number of partitions depends on factors like HDFS block size and cluster cores. Repartition shuffles all data, while coalesce optimizes data movement for partition decreases. It is best to minimize repartition operations due to their expense.

Uploaded by

dalalroshan
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 7

Spark Repartition() vs Coalesce()

In Spark or PySpark repartition is used to increase or decrease the RDD, DataFrame, Dataset
partitions whereas the Spark coalesce is used to only decrease the number of partitions in an
efficient way. In this article, you will learn what is Spark repartition() and coalesce() methods?
and the difference between repartition vs coalesce with Scala examples.
Here, I will explain examples with RDD and DataFrame using Scala and the same approach can be
used with PySpark (Spark wi th Python).

 Spark partitioning
o Local
o Cluster
o Configuration
 RDD Partition
o RDD repartition
o RDD coalesce
 DataFrame Partition
o DataFrame repartition
o DataFrame coalesce
One important point to note is, Spark repartition() and coalesce() are very expensive operations
as they shuffle the data across many partitions hence try to minimize repartition as much as
possible.
1. How Spark Partitions data files

One main advantage of the Spark is, it splits data into multiple partitions and executes
operations on all partitions of dat a in parallel which allows us to complete the job faster. While
working with partition data we often need to increase or decrease the partitions based on data
distribution. Methods repartition and coalesce helps us to repartition.

When not specified progra mmatically or through configuration, Spark by default partitions data
based on number of factors and the factors differs were you running your job on and what mode.

1.1 Local mode

When you running on local in standalone mode, Spark partitions data into the number of CPU
cores you have on your system or the value you specify at the time of creating SparkSession
object

val spark:SparkSession = SparkSession.builder()


.master("local[5]")
.appName("SparkByExamples.com" )
.getOrCreate()
The above exa mple provides local[5] as an argument to master() method meaning to run the job
locally with 5 partitions. Though if you have just 2 cores on your system, it still creates 5
partition tasks.

val df = spark.range(0,20)
println(df.rdd.partitions.length)
Above example yields output as 5 partitions.

1.2 HDFS cluster mode

When you running Spark jobs on the Hadoop cluster the default number of partitions is based on
the following.

 On the HDFS cluster, by default, Spark creates one Partition for each block of the file.
 In Version 1 Hadoop the HDFS block size is 64 MB and in Version 2 Hadoop the HDFS block size
is 128 MB
 Total number of cores on all executor nodes in a cluster or 2, whichever is larger
For example if you have 640 MB file and running it on Hadoop ve rsion 2, creates 5 partitions with
each consists on 128 MB blocks (5 blocks * 128 MB = 640 MB). If you repartition to 10 then it
creates 2 partitions for each block.

1.3 Spark configuration

 spark.default.parallelism configuration default value set to the n umber of all cores on all
nodes in a cluster, on local it is set to number of cores on your system.
 spark.sql.shuffle.partitions configuration default value is set to 200 and be used when you
call shuffle operations like reduceByKey() , groupByKey(), join() and many more. This
property is available only in DataFrame API but not in RDD.
You can change the values of these properties through programmatically using the below
statement.

<pre><code class="language-scala">
spark.conf.set("spark.sql.shuffle.partit ions", "500")
</code></pre>

You can also set the partition value of these configurations using spark -submit command.

./bin/spark-submit --conf spark.sql.shuffle.partitions= 500 --conf spark.default.parallelism= 500


1. RDD Partition and repartition

In RDD, you can create parallelism at the time of the creation of an


RDD using parallelize(), textFile() and wholeTextFiles(). You can download the test.txt file used in
this example from GitHub.

val spark:SparkSession = SparkSession.builder()


.master("local[5]")
.appName("SparkByExamples.com" )
.getOrCreate()

val rdd = spark.sparkContext.parallelize(Range( 0,20))


println("From local[5]"+rdd.partitions.size)
val rdd1 = spark.sparkContext.parallelize(Range( 0,25), 6)
println("parallelize : "+rdd1.partitions.size)

val rddFromFile = spark.sparkContext.textFile( "src/main/resources/test.txt",10)


println("TextFile : "+rddFromFile.partitions.size)
The above example yields below output

From local[5] : 5
Parallelize : 6
TextFile : 10
spark.sparkContext.parallelize(Range(0,20),6) distributes RDD into 6 partitions and the data is
distributed as below.

rdd1.saveAsTextFile("/tmp/partition")
//Writes 6 part files, one for each partition
Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19
1.1 RDD repartition()

Spark RDD repartition() method is used to increase or decrease the partitions. The below
example decreases the partitions from 10 to 4 by moving dat a from all partitions.

val rdd2 = rdd1.repartition(4)


println("Repartition size : "+rdd2.partitions.size)
rdd2.saveAsTextFile( "/tmp/re-partition")
This yields output Repartition size : 4 and the repartition re -distributes the data(as shown below)
from all partitions which is full shuffle leading to very expensive operation when dealing with
billions and trillions of data.

Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18
1.2 RDD coalesce()

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or
improved version of repartition() where the movement of the data across the partitions is lower
using coalesce.

val rdd3 = rdd1.coalesce(4)


println("Repartition size : "+rdd3.partitions.size)
rdd3.saveAsTextFile( "/tmp/coalesce")
If you compared the below output with section 1, you will notice partition 3 has been moved to 2
and Partition 6 has moved to 5, resulting data movement from just 2 partitions.

Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15 16 17 18 19
1.3 Complete Example of Spark RDD repartition and coalesce

Below is complete example of Spark RDD repartition and coalesce in Scala language.

package com.sparkbyexamples.spark.rdd

import org.apache.spark.sql.SparkSession

object RDDRepartitionExample extends App {

val spark:SparkSession = SparkSession.builder()


.master("local[5]")
.appName("SparkByExamples.com" )
.getOrCreate()

val rdd = spark.sparkContext.parallelize(Range( 0,20))


println("From local[5]"+rdd.partitions.size)

val rdd1 = spark.sparkContext.parallelize(Range( 0,20), 6)


println("parallelize : "+rdd1.partitions.size)

rdd1.partitions.foreach(f=> f.toString)
val rddFromFile = spark.sparkContext.textFile( "src/main/resources/test.txt",9)

println("TextFile : "+rddFromFile.partitions.size)

rdd1.saveAsTextFile( "c:/tmp/partition" )
val rdd2 = rdd1.repartition(4)
println("Repartition size : "+rdd2.partitions.size)

rdd2.saveAsTextFile("c:/tmp/re-partition")

val rdd3 = rdd1.coalesce(4)


println("Repartition size : "+rdd3.partitions.size)

rdd3.saveAsTextFile( "c:/tmp/coalesce")
}
2. DataFrame Partition and repartition
Unlike RDD, you can’t specify the partition/paralle lism while creating DataFrame. DataFrame or
Dataset by default uses the methods specified in Section 1 to determine the default partition and
splits the data for parallelism.

val spark:SparkSession = SparkSession.builder()


.master("local[5]")
.appName("SparkByExamples.com" )
.getOrCreate()

val df = spark.range(0,20)
println(df.rdd.partitions.length)

df.write.mode(SaveMode.Overwrite)csv( "partition.csv")
The above example creates 5 partitions as specified in master("local[5]") and the data is
distributed across all these 5 partitions.

Partition 1 : 0 1 2 3
Partition 2 : 4 5 6 7
Partition 3 : 8 9 10 11
Partition 4 : 12 13 14 15
Partition 5 : 16 17 18 19
2.1 DataFrame repartition()

Similar to RDD, the Spark DataFrame repartition() method is used to increase or decrease the
partitions. The below example increases the partitions from 5 to 6 by moving data from all
partitions.

val df2 = df.repartition(6)


println(df2.rdd.partitions.length)
Just increasing 1 partition results data movements from all partitions.

Partition 1 : 14 1 5
Partition 2 : 4 16 15
Partition 3 : 8 3 18
Partition 4 : 12 2 19
Partition 5 : 6 17 7 0
Partition 6 : 9 10 11 13
And, even decreasing the partitions also results in moving data from all partitions. hence when
you wanted to decrease the partition recommendation is to use coalesce()/

2.2 DataFrame coalesce()

Spark DataFrame coalesce() is used only to decrease the number of partiti ons. This is an
optimized or improved version of repartition() where the movement of the data across the
partitions is fewer using coalesce.

val df3 = df.coalesce(2)


println(df3.rdd.partitions.length)
This yields output 2 and the resultant partition loo ks like

Partition 1 : 0 1 2 3 8 9 10 11
Partition 2 : 4 5 6 7 12 13 14 15 16 17 18 19
Since we are reducing 5 to 2 partitions, the data movement happens only from 3 partitions and it
moves to remain 2 partitions.

Default Shuffle Partition

Calling groupBy(), groupByKey(), reduceByKey(), join() and similar functions on DataFrame results
in shuffling data between multiple executors and even machines and finally repartitions data into
200 partitions by default. Spark default defines shuffling partition to 200
using spark.sql.shuffle.partitions configuration.

val df4 = df.groupBy("id").count()


println(df4.rdd.getNumPartitions)

Creating SparkSession from Scala program

To create SparkSession in Scala or Python, you need to use the builder pattern
method builder() and calling getOrCreate() method. If SparkSession already exists it returns
otherwise creates new SparkSession.

val spark = SparkSession.builder()


.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate();

master() – If you are running it on the cluster you need to use your master name as an argument
to master(). usually, it would be either yarn or mesos depends on your cluster setup.
 Use local[x] when running in Standalone mode. x should be an integer value and should be
greater than 0; this represents how many partitions it should create when using RDD,
DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
appName() – Used to set your application name.
getOrCreate() – This returns a SparkSession object if already exists, creates new one if not exists.

Complete UDF Example

Below is complete UDF function example in Scala


import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{ Row, SparkSession }

object SparkUDF extends App{

val spark: SparkSession = SparkSession.builder()


.master("local[1]")
.appName("SparkByExamples.com" )
.getOrCreate()

import spark.implicits._
val columns = Seq("Seqno","Quote")
val data = Seq(("1", "Be the change that you wish to see in the world"),
("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
("3", "The purpose of our lives is to be happy.")

)
val df = data.toDF(columns:_*)
df.show(false)

val convertCase = (str:String) => {


val arr = str.split(" ")
arr.map(f=> f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
}

//Using with DataFrame


val convertUDF = udf(convertCase)
df.select(col("Seqno"),
convertUDF(col("Quote")).as("Quote") ).show(false)

// Using it on SQL
spark.udf.register("convertUDF", convertCase)
df.createOrReplaceTempView ("QUOTE_TABLE")
spark.sql("select Seqno, convertUDF(Quote) from QUOTE_TABLE").show(false)

textFile() – Read single or multiple text, csv files and returns a single Spark RDD [String]

wholeTextFiles() – Reads single or multiple files and returns a single RDD[Tuple2[String, String]],
where first value (_1) in a tuple is a file name and second value (_2) is content of the file.

You might also like