UNIT-IV
 SPARK Basics: History Of Spark, Spark Architecture
 Spark Shell
 Working With RDDs In Spark: RDD Basics
 Creating RDDs In Spark, RDD Operations
 Passing Functions To Spark, Transformations And Actions In
  Spark
 Spark RDD Persistence
 Working With Key/Value Pairs : Pair RDDs
 Transformations On Pair RDDs, Actions Available On Pair RDDs
What Is Spark?
      Spark Programming Is Nothing But A General-purpose & Lightning Fast
Cluster Computing Platform.
In Other Words,
      It Is An Open Source, Wide Range Data Processing Engine.
Spark has been proposed by Apache software foundation to speed up the software process
of Hadoop computational computing. Spark includes its cluster management, while Hadoop
is only one of the forms for implementing spark.
Spark applies Hadoop in two forms. The first form is STORAGE and another
one is PROCESSING. Thus, spark includes its computation for cluster management and
applies Hadoop for only storage purposes.
Apache spark is a distributed and open-source processing system. It is used for
the workloads of 'big data'. Spark utilizes optimized query execution and in-
memory caching for rapid queries across any size of data. It is simply a general
and fast engine for much large-scale processing of data.
It is much faster as compared to the previous concepts to implement with big data
 such as classical MapReduce. Spark is faster die to it executes on RAM/memory
and enables the processing faster as compared to the disk drivers.
Spark is simple due to it could be used for more than one thing such as working
with data streams or graphs, machine learning algorithms, inhaling data into the
database, building data pipelines, executing distributed sql, and others.
  Apache Spark History/Evolution
Spark is one of the most important sub-projects of Hadoop.
It was developed in APMlab of UC Berkeley in 2009 by Matei
Zaharia.
In 2010, it was an open-source under the BSD license.
Spark was donated in 2013 to the Apache software foundation.
Apache spark is now a top-level project of Apache from 2014
February.
    Features of Apache Spark
•Fast - it provides high performance for both batch and streaming data, using a state-of-the-
 art DAG scheduler, a query optimizer, and a physical execution engine.
•Easy to use - it facilitates to write the application in java, scala, python, r, and sql. It also
 provides more than 80 high-level operators.
•Generality - it provides a collection of libraries including sql and dataframes, mllib for
 machine learning, graphx, and spark streaming.
•Lightweight - it is a light unified analytics engine which is used for large scale data
 processing.
•Runs everywhere - it can easily run on hadoop, apache mesos, kubernetes, standalone, or
 in the cloud.
   Usage
•Data       of Sparkthe data generated by systems are not consistent enough to combine for
       integration:
 analysis. To fetch consistent data from systems we can use processes like extract, transform
 and load (ETL). Spark is used to reduce the cost and time required for this ETL process.
•Stream processing: it is always difficult to handle the real-time generated data such as log
 files. Spark is capable enough to operate streams of data and refuses potentially fraudulent
 operations.
•Machine learning: machine learning approaches become more feasible and increasingly
 accurate due to enhancement in the volume of data. As spark is capable of storing data in
 memory and can run repeated queries quickly, it makes it easy to work on machine learning
 algorithms.
•Interactive analytics: spark is able to generate the respond rapidly. So, instead of running
 pre-defined queries, we can handle the data interactively.
  Ecosystem of Spark / APACHE Spark COMPONENTS
We discuss spark components. It puts the promise for faster data processing
as well as easier development. It is only possible because of its components.
All these spark components resolved the issues that occurred while using
Hadoop MapReduce.
Now let’s discuss each spark ecosystem component one by one-
 A. Spark core
       Spark core is a central point of spark. Basically, it provides an
 execution platform for all the spark applications. Moreover, to support a
 wide array of applications, spark provides a generalized platform.
 B. Spark SQL
       On the top of spark, spark SQL enables users to run SQL/HQL
 queries. We can process structured as well as semi-structured data, by
 using spark SQL. Moreover, it offers to run unmodified queries up to
 100 times faster on existing deployments.
C. Spark streaming
       Basically, across live streaming, spark streaming enables a powerful
interactive and data analytics application. Moreover, the live streams are
converted into micro-batches those are executed on top of spark core.
D. Spark MLlib
        Machine learning library delivers both efficiencies as well as the high-
quality algorithms. Moreover, it is the hottest choice for a data scientist. Since
it is capable of in-memory data processing, that improves the performance of
iterative algorithm drastically.
E. Spark graphx
       Basically, spark graphx is the graph computation engine built on
top of apache spark that enables to process graph data at scale.
F. Sparkr
       Basically, to use apache spark from R. It is R package that gives
light-weight frontend. Moreover, it allows data scientists to analyze
large datasets. Also allows running jobs interactively on them from the
R shell. Although, the main idea behind sparkr was to explore different
techniques to integrate the usability of R with the scalability of spark.
  Architecture Of Spark
The Architecture Of Spark Contains Three Of The Main
Elements Which Are Listed Below:
•API
•Data Storage
•Resource Management
    Let's Defines These Elements In Detail.
API
This element facilitates many developers of the applications for creating
spark-based applications with a classic API interface. Spark offers API for
python, java, and Scala programming languages.
Data storage
Spark applies the Hadoop distributed file system for various purposes of data
storage. It works with any data source that is compatible with Hadoop
including Cassandra, Hbase, HDFS, etc.
Resource management
The spark could be expanded as the stand-alone server. Also, it can be
expanded on any shared computing framework such as YARN or Mesos.
•SPARK ARCHITECTURE
the spark follows the master-slave architecture. Its cluster consists of a single master and multiple slaves.
The spark architecture depends upon two abstractions:
         Resilient Distributed Dataset (RDD)
         Directed Acyclic Graph (DAG)
  Resilient distributed datasets (RDD)
 The resilient distributed datasets are the group of data items that can be
 stored in-memory on worker nodes. Here,
   ◦ Resilient: restore the data on failure.
   ◦ Distributed: data is distributed among different nodes.
   ◦ Dataset: group of data.
 We will learn about rdd later in detail.
Directed acyclic graph (DAG)
Directed Acyclic Graph is a finite direct graph that performs a sequence of computations on data.
Each node is an RDD partition, and the edge is a transformation on top of data. Here, the graph
refers the navigation whereas directed and acyclic refers to how it is done.
Let's understand the spark architecture.
• Driver Program
The Driver Program is a process that runs the main() function of the
application   and     creates    the SparkContext object. The purpose
of SparkContext is to coordinate the spark applications, running as
independent sets of processes on a cluster.
To run on a cluster, the SparkContext connects to a different type of cluster
managers and then perform the following tasks: -
 • It acquires executors on nodes in the cluster.
 • Then, it sends your application code to the executors. Here, the application
   code can be defined by JAR or Python files passed to the SparkContext.
 • At last, the SparkContext sends tasks to the executors to run.
• Cluster Manager
 • The role of the cluster manager is to allocate resources across applications.
   The Spark is capable enough of running on a large number of clusters.
 • It consists of various types of cluster managers such as Hadoop YARN,
   Apache Mesos and Standalone Scheduler.
 • Here, the Standalone Scheduler is a standalone spark cluster manager that
   facilitates to install Spark on an empty set of machines.
• Worker Node
 • The worker node is a slave node
 • Its role is to run the application code in the cluster.
Executor
 • An executor is a process launched for an application on a worker node.
 • It runs tasks and keeps data in memory or disk storage across them.
 • It read and write data to the external sources.
 • Every application contains its executor.
Task
 • A unit of work that will be sent to one executor.
SPARK SHELL
The shell acts as an interface to access the operating system’s
service.
Apache spark is shipped with an interactive shell/scala
prompt with the interactive shell we can run different
commands to process the data.
This is an Apache spark shell commands guide with step by
step list of basic spark commands/operations to interact with
Spark Shell
Start the spark shell
Apache Spark is shipped with an interactive shell/scala prompt, as the spark is developed in
scala. Using the interactive shell we will run different commands (RDD transformation/action
) to process the data.
The command to start the apache spark shell:
[php] $bin/spark-shell [/php]
2.1. Create a new RDD
 ◦ a) Read file from local filesystem and create an RDD.
   ◦ [Php]scala> val data = sc.Textfile(“data.Txt”)[/php]
Note: sc is the object of sparkcontext
Note: you need to create a file data.Txt in spark_home directory
 ◦ b) Create an RDD through parallelized collection
   ◦ [Php]scala> val no = array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   ◦ Scala> val nodata = sc.Parallelize(no)[/php]
C) From Existing RDDs
◦ [Php]scala> val newrdd = no.Map(data => (data * 2))[/php]
These are three methods to create the RDD. We can use the first
method, when data is already available with the external systems like a
local filesystem, HDFS, hbase, cassandra, S3, etc. One can create an
RDD by calling a textfile method of spark context with path / URL as
the argument. The second approach can be used with the existing
collections and the third one is a way to create new RDD from the
existing one.
2.2. Number of items in the RDD
        Count the number of items available in the RDD. To count the items we need to call an action:
        [Php]scala> data.Count()[/php]
2.3. Filter operation
         Filter the RDD and create new RDD of items which contain word “dataflair”. To filter, we need to call
transformation filter, which will return a new RDD with subset of items.
        [Php]scala> val dfdata = data.Filter(line => line.Contains(“dataflair”))[/php]
2.4. Transformation and action together
         For complex requirements, we can chain multiple operations together like filter transformation and count
action together:
        [Php]scala> data.Filter(line => line.Contains(“dataflair”)).Count()[/php]
2.5. Read the first item from the RDD
        To read the first item from the file, you can use the following command:
        [Php]scala> data.First()[/php]
2.6. Read the first 5 item from the RDD
      To read the first 5 item from the file, you can use the following command:
      [Php]scala> data.Take(5)[/php]
2.7. RDD partitions
      An RDD is made up of multiple partitions, to count the number of partitions:
      [Php]scala> data.Partitions.Length[/php]
Note: minimum no. Of partitions in the RDD is 2 (by default). When we create RDD
from HDFS file then a number of blocks will be equals to the number of partitions.
2.8. Cache the file
caching is the optimization technique. Once we cache the RDD in the memory all future computation will work on the
In-memory data, which saves disk seeks and improve the performance.
[Php]scala> data.Cache()[/php]
RDD will not be cached once you run above operation, you can visit the web UI:
Http://localhost:4040/storage, it will be blank. Rdds are not explicitly cached once
we run cache(), rather
Rdds will be cached once we run the action, which actually needs data read from the
disk.
Let’s run some actions
[Php]scala> data.Count()[/php]
[Php]scala> data.Collect()[/php]
2.9. Read data from HDFS file
          To read data from HDFS file we can specify complete hdfs URL like hdfs://IP:PORT/PATH
          [Php]scala> var hfile = sc.Textfile(“hdfs://localhost:9000/inp”)[/php]
2.10. Spark wordcount program in scala
          One of the most popular operations of mapreduce – wordcount. Count all the words available in the file.
[Php]scala> val wc = hfile.Flatmap(line => line.Split(” “)).Map(word => (word, 1)).Reducebykey(_ + _)[/php]
Read the result on console
          [Php]scala> wc.Take(5)[/php]
It will display first 5 results
2.11 write the data to HDFS file
To write the data from HFDS:
[Php]scala> wc.Saveastextfile(“hdfs://localhost:9000/out”)[/php]
Working With RDDs In Spark: (RDD Basics, Creating RDDs In Spark)
What is RDD?
The RDD (resilient distributed dataset) is the spark's core abstraction. It
is a collection of elements, partitioned across the nodes of the cluster so
that we can execute various parallel operations on it.
There are two ways to create RDDs:
 ◦ Parallelizing an existing data in the driver program
 ◦ Referencing a dataset in an external storage system, such as a shared
   filesystem, HDFS, Hbase, or any data source offering a Hadoop
   InputFormat.
Parallelized collections
To create parallelized collection, call sparkcontext's parallelize method
on an existing collection in the driver program. Each element of
collection is copied to form a distributed dataset that can be operated on
in parallel.
             Val info = array(1, 2, 3, 4)
             Val distinfo = sc.Parallelize(info)
Now, we can operate the distributed dataset (distinfo) parallel such like
distinfo.Reduce((a, b) => a + b).
External Datasets
In Spark, the distributed datasets can be created from any type of storage sources supported
by Hadoop such as HDFS, Cassandra, HBase and even our local file system. Spark provides
the support for text files, SequenceFiles, and other types of Hadoop InputFormat.
SparkContext's textFile method can be used to create RDD's text file. This method takes a
URI for the file (either a local path on the machine or a hdfs://) and reads the data of the file.
Now, we can operate data on by dataset operations such as we can add up the sizes of all
the lines using the map and reduceoperations as follows: data.map(s =>
s.length).reduce((a, b) => a + b).
RDD Operations (Passing Functions To Spark, Transformations And Actions In Spark)
The RDD provides the two types of operations:
 ◦ Transformation
 ◦ Action
Transformation
In spark, the role of transformation is to create a new dataset from an
existing one. The transformations are considered lazy as they only
computed when an action requires a result to be returned to the driver
program.
Let's see some of the frequently used RDD transformations.
Transformation        Description
map(func)             It returns a new distributed dataset
                      formed by passing each element of the
                      source through a function func.
filter(func)          It returns a new dataset formed by
                      selecting those elements of the source on
                      which func returns true.
flatMap(func)         Here, each input item can be mapped to
                      zero or more output items, so func should
                      return a sequence rather than a single
                      item.
mapPartitions(func)   It is similar to map, but runs separately
                      on each partition (block) of the RDD, so
                      func must be of type Iterator<T> =>
                      Iterator<U> when running on an RDD of
                      type T.
mapPartitionsWithIndex(func)              It is similar to mapPartitions that provides
                                          func with an integer value representing the
                                          index of the partition, so func must be of
                                          type (Int, Iterator<T>) => Iterator<U> when
                                          running on an RDD of type T.
sample(withReplacement, fraction, seed)   It samples the fraction fraction of the data,
                                          with or without replacement, using a given
                                          random number generator seed.
union(otherDataset)                       It returns a new dataset that contains the
                                          union of the elements in the source dataset
                                          and the argument.
intersection(otherDataset)                It returns a new RDD that contains the
                                          intersection of elements in the source
                                          dataset and the argument.
distinct([numPartitions]))           It returns a new dataset that contains the
                                     distinct elements of the source dataset.
groupByKey([numPartitions])          It returns a dataset of (K, Iterable) pairs
                                     when called on a dataset of (K, V) pairs.
reduceByKey(func, [numPartitions])   When called on a dataset of (K, V) pairs,
                                     returns a dataset of (K, V) pairs where the
                                     values for each key are aggregated using
                                     the given reduce function func, which
                                     must be of type (V,V) => V.
aggregateByKey(zeroValue)(seqOp,     When called on a dataset of (K, V) pairs,
combOp, [numPartitions])             returns a dataset of (K, U) pairs where the
                                     values for each key are aggregated using
                                     the given combine functions and a neutral
                                     "zero" value.
sortByKey([ascending], [numPartitions])   It returns a dataset of key-value pairs
                                          sorted by keys in ascending or descending
                                          order, as specified in the boolean
                                          ascending argument.
join(otherDataset, [numPartitions])       When called on datasets of type (K, V) and
                                          (K, W), returns a dataset of (K, (V, W))
                                          pairs with all pairs of elements for each
                                          key. Outer joins are supported through
                                          leftOuterJoin,     rightOuterJoin,     and
                                          fullOuterJoin.
cogroup(otherDataset, [numPartitions])    When called on datasets of type (K, V) and
                                          (K, W), returns a dataset of (K, (Iterable,
                                          Iterable)) tuples. This operation is also
                                          called groupWith.
cartesian(otherDataset)                   When called on datasets of types T and U,
                                          returns a dataset of (T, U) pairs (all pairs of
                                          elements).
pipe(command, [envVars])                   Pipe each partition of the RDD through a
                                           shell command, e.g. a Perl or bash script.
coalesce(numPartitions)                    It decreases the number of partitions in the
                                           RDD to numPartitions.
repartition(numPartitions)                 It reshuffles the data in the RDD randomly
                                           to create either more or fewer partitions
                                           and balance it across them.
repartitionAndSortWithinPartitions(partiti It repartition the RDD according to the
oner)                                      given partitioner and, within each resulting
                                           partition, sort records by their keys.
 Action
 In Spark, the role of action is to return a value to the driver program after running a
 computation on the dataset.
 Let's see some of the frequently used RDD Actions.
Action                                         Description
reduce(func)                                  It aggregate the elements of the dataset using a function
                                              func (which takes two arguments and returns one). The
                                              function should be commutative and associative so that it
                                              can be computed correctly in parallel.
collect()                                     It returns all the elements of the dataset as an array at the
                                              driver program. This is usually useful after a filter or other
                                              operation that returns a sufficiently small subset of the data.
count()                                       It returns the number of elements in the dataset.
first()                                       It returns the first element of the dataset (similar to take(1)).
take(n)                                       It returns an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) It returns an array with a random sample of
                                         num elements of the dataset, with or
                                         without replacement, optionally pre-
                                         specifying a random number generator
                                         seed.
takeOrdered(n, [ordering])               It returns the first n elements of the RDD
                                         using either their natural order or a custom
                                         comparator.
saveAsTextFile(path)                     It is used to write the elements of the
                                         dataset as a text file (or set of text files) in a
                                         given directory in the local filesystem,
                                         HDFS or any other Hadoop-supported file
                                         system. Spark calls toString on each
                                         element to convert it to a line of text in the
                                         file.
saveAsSequenceFile(path)                 It is used to write the elements of the
(Java and Scala)                         dataset as a Hadoop SequenceFile in a
                                         given path in the local filesystem, HDFS or
                                         any other Hadoop-supported file system.
saveAsObjectFile(path)   It is used to write the elements of the
(Java and Scala)         dataset in a simple format using Java
                         serialization, which can then be loaded
                         usingSparkContext.objectFile().
countByKey()             It is only available on RDDs of type (K,
                         V). Thus, it returns a hashmap of (K, Int)
                         pairs with the count of each key.
foreach(func)            It runs a function func on each element of
                         the dataset for side effects such as
                         updating an Accumulator or interacting
                         with external storage systems.
     Spark RDD Persistence
Spark provides a convenient way to work on the dataset by persisting it in memory across
operations. While persisting an RDD, each node stores any partitions of it that it computes
in memory. Now, we can also reuse them in other tasks on that dataset.
We can use either persist() or cache() method to mark an rdd to be persisted. Spark?
S cache is fault-tolerant. In any case, if the partition of an RDD is lost, it will automatically
be recomputed using the transformations that originally created it.
There is an availability of different storage levels which are used to store persisted rdds. Use
these levels by passing a storagelevel object (scala, java, python) to persist(). However, the
cache() method is used for the default storage level, which is storagelevel.Memory_only.
The following are the set of storage levels:
Storage Level         Description
MEMORY_ONLY           It stores the RDD as deserialized Java objects
                      in the JVM. This is the default level. If the
                      RDD doesn't fit in memory, some partitions
                      will not be cached and recomputed each time
                      they're needed.
MEMORY_AND_DISK       It stores the RDD as deserialized Java objects
                      in the JVM. If the RDD doesn't fit in memory,
                      store the partitions that don't fit on disk, and
                      read them from there when they're needed.
MEMORY_ONLY_SER       It stores RDD as serialized Java objects ( i.e.
(Java and Scala)      one-byte array per partition). This is generally
                      more space-efficient than deserialized
                      objects.
MEMORY_AND_DISK_SER   It is similar to MEMORY_ONLY_SER, but
(Java and Scala)      spill partitions that don't fit in memory to disk
                      instead of recomputing them.
DISK_ONLY                 It stores the RDD partitions only on disk.
MEMORY_ONLY_2,            It is the same as the levels above, but
MEMORY_AND_DISK_2, etc.   replicate each partition on two cluster
                          nodes.
OFF_HEAP (experimental)   It is similar to MEMORY_ONLY_SER,
                          but store the data in off-heap memory. The
                          off-heap memory must be enabled.
WORKING WITH KEY/VALUE PAIRS :
PAIR RDDS
TRANSFORMATIONS ON PAIR RDDS
ACTIONS AVAILABLE ON PAIR RDDS
In apache spark, key-value pairs are what we call as paired RDD.
Transformations and actions in spark RDD. Here transformation operations
are groupByKey, reduceByKey, join, left outer join/right OuterJoin.
Whereas actions like countByKey. However initially, we will learn a brief
introduction to spark RDDs.
So, let’s start spark paired RDD tutorial.
Spark Paired RDDs are nothing but RDDs containing a key-value pair.
Basically, key-value pair (KVP) consists of a two linked data item in it. Here,
the key is the identifier, whereas value is the data corresponding to the key
value.
Moreover, Spark operations work on RDDs containing any type of objects.
However key-value pair RDDs attains few special operations in it. Such as,
distributed “shuffle” operations, grouping or aggregating the elements by a key.
In addition, on Spark Paired RDDs containing Tuple2 objects in Scala, these
operations are automatically available. Basically, operations for the key-value
pair are available in the Pair RDD functions class. However, that wraps around
a Spark RDD of tuples.
How to Create Spark Paired RDD
There are several ways to create Paired RDD in Spark, like by running a map() function
that returns key-value pairs. However, language differs the procedure to build the key-
value RDD. Such as
a. In Python language
It is a requirement to return an RDD composed of Tuples for the functions of keyed data
to work. Moreover, in spark for creating a pair RDD, we use the first word as the key in
python programming language.
pairs = lines.map(lambda x: (x.split(” “)[0], x))
b. In Scala language
As similar to the previous example here also we need to return tuples. Furthermore, this
will make available the functions of keyed data. Also, to offer the extra key or value
functions, an implicit conversion on Spark RDD of tuples exists.
Let’s revise Data type Mapping between R and Spark
Afterward, again by using the first word as the keyword creating apache spark pair RDD.
val pairs = lines.map(x => (x.split(” “)(0), x))
c. In Java language
Basically, Java doesn’t have a built-in function of tuple function. Therefore, we can
use the Scala. It only sparks’ Java API has users create tuples.Tuple2 class. However
by, by writing new Tuple2(elem1, elem2) in Java, we can create a new tuple.
Moreover, we can access its relevant elements with the _1() and _2() methods.
Moreover, when we create paired RDDs in Spark, it is must to call special versions of
spark’s functions in java. As an example, we can use mapToPair () function in place of
the                      basic                     map()                      function.
Again, here using the first word as the keyword to create a Spark paired RDD,
 PairFunction<String, String, String> keyData = new PairFunction<String, String, String>()
 {
 public Tuple2<String, String> call(String x)
 {
 return new Tuple2(x.split(” “)[0], x);
 }
 };
 JavaPairRDD<String, String> pairs = lines.mapToPair(keyData)
  n s   tio   era   p   O   D   D   R   ired   a   P   rk   a   p   S
A. Transformation Operations
Paired RDD allows the same transformation those are available to standard
RDDs. Moreover, here also same rules apply from “passing functions to
spark”. Also in Spark, there are tuples available in paired RDDs. Basically,
we need to pass functions that operate on tuples, despite on individual
elements. Let’s discuss some of the transformation methods below, like
 • groupByKey
The groupbykey operation generally groups all the values with the same key.
rdd.groupByKey()
 • reduceByKey(fun)
Here, the reduceByKey operation generally combines values with the same
key.
add.reduceByKey( (x, y) => x + y)
•combineByKey(createCombiner, mergeValue, mergeCombiners,
partitioner)
CombineByKey uses a different result type, then combine those values with the same key.
•mapValues(func)
      Even without changing the key, mapValues operation applies a function to each value
of            a             paired             RDD                 of              spark.
rdd.mapValues(x => x+1)
•keys()
      Keys()      operation    generally     returns     a   spark   RDD     of     just     the    keys.
rdd.keys()
•values()
       values()    operation     generally     returns       an   RDD   of        just     the     values.
rdd.values()
•sortByKey()
       Similarly, the sortByKey operation generally returns an RDD sorted by the key.
rdd.sortByKey()
B. Action operations
As similar as RDD transformations, there are same RDD actions available on spark pair RDD. However,
paired rdds also attains some additional actions of spark. Basically, those leverages the advantage of data
which is of keyvalue nature. Let’s discuss some of the action methods below, like
•countByKey()
Through countByKey operation, we can count the number of elements for each key.
rdd.countByKey()
•collectAsMap()
Here, collectAsMap() operation helps to collect the result as a map to provide easy lookup.
rdd.collectAsMap()
•lookup(key)
Moreover, it returns all values associated with the provided key.
rdd.lookup()