Spark
Spark
Apache Spark
• Apache Spark is a general-purpose and unified computing engine and a set of
libraries for parallel data processing and distributed querying on computer
clusters. Spark can be used for interactive queries with sub-second latency. Up
to 100 times faster than Apache Hadoop.
• Spark philosophy:
• Unified: a unified platform (the same computing engine with a consistent
and composable set of structured APIs, e.g., DataFrames and Datasets) for • Spark Applications consist of a single driver process (on the master node) and a
writing big data applications to support a wide range of data analytics tasks set of executor processes (on worker nodes). The driver runs your main()
efficiently via optimization function as a SparkSession and is responsible for a) maintaining information
• Computing engine: handles loading data from persistent storage systems about the application; b) responding to a user’s program or input; and 3) creating
(cloud storage, DFS, NoSQL, message queues) and performing computation a DAG for the job, analyzing, distributing, and scheduling work across the
on it, not permanent storage as the end itself. A cluster, or group of executors based on data locality using delay scheduling. The executors are
machines, pools the resources of many machines together allowing us to responsible for executing the work (fetching the bytecode before executing with
use all the cumulative resources as if they were one. a static set of variables and methods within the executor’s context, static means
• Libraries: standard libraries and third-party libraries including interactive or that if the executor changes those variables or overwrites the methods without
ad-hoc queries with SQL and structured data (Spark SQL), machine learning affecting other executors or the driver) and reporting the state of the
(MLlib and ML library), graph analytics (GraphX and GraphFrames), and computation back to the driver. Spark broadcast can quickly distribute large
stream processing (Spark Streaming and Structured Streaming). models to workers.
• Spark is a tool for managing and coordinating the execution of tasks on data • Spark parallel operations:
across a cluster of computers with standalone cluster manager, YARN, or Mesos. • Transformation: Spark will not act on transformations until we call an action.
• Spark is written in Scala, and runs on the Java Virtual Machine (JVM). Run Spark Transformation can be either narrow (only one output partition)
either on your laptop (local mode) or on a cluster (cluster mode). dependencies/transformations or wide (multiple output partitions and thus
• Spark’s programming model is functional programming where the same inputs need shuffle cross the nodes) dependencies/transformations.
always result in the same outputs when the transformations on that data stay • map(), filter(), flatmap(), sample(), groupByKey(), reduceByKey(),
constant, and also allow external developer to extend the optimizer. union(), join(), cogroup(), crossProduct(), mapValues(), sort(),
• Spark architecture: Spark cluster (on-premise or cloud) partitionBy(), etc.
• A Spark master JVM acts as a cluster manager in a standalone deployment • Lazy evaluation: Spark builds up a plan of transformations, waits until
mode to which Spark workers register themselves as part of a quorum. the last minute to execute the code, then compiles this plan from RDD
• A Spark driver program distributes Spark tasks to each worker’s executor, or DataFrame transformations to an efficient physical plan (as a
and receives computed results from each executor’s tasks. directed acyclic graph DAG with DAGSchedulr) that will run as efficiently
• Spark workers launch executors (e.g., avoid shuffling) as possible across the cluster. This provides
• Spark executors (JVM containers) execute Spark tasks, store and cache data immense benefits to the end user because Spark can optimize the
partitions in their memories entire data flow from end to end, e.g., “predicate pushdown”.
• Cache-aware computation: (e.g., in-memory encoding, etc.)
• Whole-stage ode generation via compilers
• No virtual function dispatches
• Intermediate data in memory vs. CPU registers
• Loop unrolling and SIMD
• Spark core immutable abstractions: Datasets, DataFrames, SQL Tables, Resilient
Distributed Datasets (RDD)
• Spark’s APIs
• Low level “unstructured” API
• High level structured API
SparkSQL
• Formally, Hive on Spark (Shark), now SparkSQL
• Python query speeds were often twice as slow as the same Scala queries using
RDDs
High Level Structured API • SparkSQL: allows a user to register any DataFrame as a table or view (a
• Both DataFrames and DataSets are built upon RDDs and Spark SQL engine, form temporary table) and query it using pure SQL. There is no performance
the core high-level and structured distributed data abstraction, and provide a difference between writing SQL queries or writing DataFrame code, they both
uniform API across libraries and components in Spark. “compile” to the same underlying plan that we specify in DataFrame code via
• Spark DataFrames (2013): the most common structured API represents a table Catalyst optimizer.
of data with rows and columns/schema. • SparkSQL provides common and uniform access to Hive, Avro, Parquet (a
• A Spark DataFrame is an immutable distributed collection of data organized columnar format), ORC, JSON, JDBC/ODBC, etc.
into named columns (resemble tables in relational database, impose a
structure and schema) and can span multiple computers, but Python MLlib and ML
Pandas’ DataFrame (or R’s data.frames or data.tables) exist on one machine. • MLlib targets large-scale learning with data-parallelism or model-parallelism on
• We usually do not manipulate chunks/partitions manually. We simply RDDs. MLlib allows for preprocessing, munging, training of models, and making
specify high-level transformations of data in the physical partitions and predictions at scale on data.
Spark determines how this work will actually execute on the cluster. • Data preparation: feature discretization and extraction, transformation,
• Spark Datasets (2015): a distributed collection of data that can be constructed selection, hashing of categorical features, natural language processing
from JVM objects and then manipulated using functional transformations, using methods, etc.
type-safe structured (i.e., object-oriented, strict compile-time type safety, and • Machine learning algorithms: classification, regression, collaborative
errors are caught at compile-time) APIs (available in Scala or Java but not filtering, clustering, dimensionality reduction, etc.
available in Python and R because they are dynamical typed or not-type-safe • Utilities: statistics, descriptive statistics, chi-square testing (for categorical
languages). features), C++-based linear algebra (dense or sparse), model evaluation
• The Dataset API allows users to work with semi-structured data (like JSON methods, I/O formats, optimization primitives, etc.
or key-value pairs) by assigning a Java class to the records inside a • Descriptive statistics: count(), max(), mean(), min(), normL1(),
normL2(), numNonzeros(), variance(), etc.
• ML (spark.ml) operated on DataFrames with 3 main abstract classes: • Grid search: an exhaustive algorithm, may take a lot of time to select the
• Transformer: every new Transformer needs to implement a .transform() best model.
method. The spark.ml.feature contains Binarizer, Bucketizer, ChiSqSelector, • Train-validation splitting:
CountVectorizer, DCT (discrete cosine transform), ElementwiseProduct,
HashingTF (hashing trick transformer), IDF (inverse document frequency), GraphX and GraphFrames
IndexToString, MaxAbsScaler ([-1.0, 1.0]), MinMaxScaler ([0.0, 1.0]), NGram, • Graph structures: vertices, edges, and properties:
Normalizer, OneHotEncoder, PCA (principal component analysis), • Social networks: the vertices are the people while the edges are the
PolynomialExpansion, QuantileDiscretizer, RegexTokenizer, RFormula, connections between them. Restaurant recommendations: the vertices
SQLTransformer, StandardScaler (0 mean and standard deviation of 1), involve the location, cuisine type, and restaurants while the edges are the
StopWordRemover, StringIndexer, Tokenizer, VectorAssembler, connections between them. Create a social network + restaurant
VectorIndexer, VectorSlicer, Word2Vec, etc. recommendation graph based on the reviews of friends within a social circle
• Estimator: make prediction or perform classification. Every new Estimator • Analysis of flight data: airports are represented by vertices and flights
needs to implement a .fit() method. between those airports are represented by edges. Properties: departure
• Classification: LogisticRegression, DecisionTreeClassifier, GBTClassifier delays, plane type, and carrier, etc.
(gradient boosted trees classifier), RandomForestClassifier, NaïveBayes, • GraphX and GraphFrames for graph processing: pageRank (regular and
MultilayerPerceptronClassifier, OneVsRest, etc. personalized), connected components, label propagation algorithm (LPA),
• Regression: AFTSurvivalRegression (accelerated failure time survival SVD++, strongly connected components, shortest paths, breadth-first search,
regression), DecisionTreeRegressor, GBTRegressor, triangle count, etc. It has no Java or Python APIs and based on low-level RDDs.
GeneralizedLinearRegression (supports Gaussion, binomial, and • GraphX unifies ETL, exploratory analysis, and iterative graph computation within
poisson distribution with kernel/link functions), IsotonicRegression, a single system
LinearRegression, RandomForestRegressor, etc. • GraphFrames
• Clustering: BisectingKMeans (hierarchical), KMeans, GaussianMixture, • GraphFrames utilizes the power of Apache Spark DataFrames to support
LDA (latent Dirichlet allocation), general graph processing similar to Spark's GraphX library but with high-
• Analytic models using statistics or machine learning algorithms which are level, expressive and declarative APIs in Scala, Java and Python.
intensive iterative processes, and pipeline (better implemented in GraphFrames can seamlessly converted into GraphX.
C++/GO/Java) can make this process efficient. • Leverages the distribution and expression capabilities of the DataFrame API
• Pipeline: an end-to-end transformation-estimation process consists of to both simplify your queries and leverage the performance optimizations
discrete stages - a sequence of data pre-processing, feature extraction, of the Apache Spark SQL engine.
model fitting, and validation stages – using stages parameter, the output of • GraphFrames consist of DataFrames of vertices and edges.
a preceding stage (using .getOutptCol()) is the input for the following stage. // create a Vertices DataFrame
• The .fit() method returns the PipelineModel object, which can be used val vertices = spark.createDataFrame(
to predict using .transform() method. List(("JFK", "New York", "NY"))).toDF("id", "city", "state")
// create a Edges DataFrame
• You can save the Pipeline definition for later use using .save() method, val edges = spark.createDataFrame(List(("JFK", "SEA", 45,
and load it up by the .load method. Virtually all model returned by .fit() 1058923))).toDF("src", "dst", "delay", "tripID”)
on an Estimator or Transformer can be saved and loaded back for reuse. // create a GraphFrame and use its APIs
• Training machine learning models is a two-phase process. First, we initialize an val airportGF = GraphFrame(vertices, edges)
untrained model, then we train it. // filter all vertices from the GraphFrame with delays greater an
// 30 mins
• The pyspark.ml.evaluation contains evaluator for evaluating the performance of
val delayDF = airportGF.edges.filter(“delay > 30”)
the model. // Using PageRank algorithm, determine the Airport ranking of
• Parameter hyper-tuning (pyspark.ml.tuning): find the best parameters of the // importance
model val pageRanksGF =
airportGF.pageRank.resetProbability(0.15).maxIter(5).run()
display(pageRanksGF.vertices.orderBy(desc("pagerank"))) • Advances and availability of distributed computing (e.g., Apache Spark)
• 3 kinds of queries: SQL-type queries on vertices and edges, graph-type and hardware (especially GPUs – graphic processing units)
queries, and motif queries (providing a structure pattern) • Advances in deep learning research: Deep learning libraries:
• Finding structural motifs, airport ranking using PageRank, and shortest paths TensorFlow, Theano, Torch, Caffe, Microsoft Cognitive Toolkit (CNTK),
between cities, breadth first search (BFS), strongly connected components, mxnet, and DL4J.
saving and loading graphs, etc. • Applications facial recognition, handwritten digit identification, game
• GraphFrames vs. GraphX playing, speech recognition, language translation, and object classification.
GraphFrames GraphX • TensorFlow (the flow of tensors) is a Google open source software library for
Built on DataFrames RDDs numerical computation using data flow graphs by Google's BrainTeam. Built on
languages Scala, Java, Python Scala C++ with a Python interface.
Use cases Queries and algorithms Algorithms • TensorFrames
Vertex IDs Any type (in Catalyst) Long • With TensorFrames, one can manipulate Spark DataFrames with
Vertex/edge Any number of DataFrame Any type (VD, ED) TensorFlow programs.
attributes columns • TensorFrames provides a bridge between Spark DataFrames and
Return types GraphFrame or DataFrame Graph(VD, ED), or TensorFlow. This allows you to take your DataFrames and apply them as
RDD(Long, VD) input into your TensorFlow computation graph. TensorFrames also allows
you to take the TensorFlow computation graph output and push it back into
Deep Learning and TensorFrames (Deprecated) DataFrames.
• Traditional algorithmic approach: programming known steps or quantities, that • Parallel training to determine optimal hyperparameters (configuration,
is, you already know the steps to solve a specific problem learning rate, number of neurons in each layer, etc.)
• Human brain has about 100 billion neurons in our brain, each connected to
approximately 10,000 other neurons, resulting in a mind-boggling 1015 synaptic Polyglot Persistence with Blaze
connections. • Solve your problems with the tools that are designed to solve them easily. But,
• Neural networks learn by example and are not actually programmed to perform this does not make your company agile and is prone to errors and lots of
a specific task tweaking and hacking needing to be done.
• Learning processes – modifications of the connections between the • Our world is complex and no single approach exists that solves all problems.
interconnected elements There is no such thing as a one-size-fits-all solution.
• Multilayer perceptron (MLP) has three layers: input, hidden, and output • Polyglot persistence
• Machine learning need feature engineering • Polyglot programming: using multiple programming languages that were
• Feature engineering is about determining which of these features more suitable for certain problems.
(independent variables) are important in defining the model. Coming up • In the parallel world of data, adapt a range of technologies that allows it to
with features is difficult, time-consuming, requires expert knowledge. solve the problems in a minimal time, thus minimizing the costs.
• Feature selection based on domain knowledge to select useful subset of the • Polyglot persistence: always chooses the right tool for the right job instead
features of trying to coerce a single technology into solving all of its problems, e.g.,
• Feature extraction: transform the data from a high dimensional space to a persisting transaction records in a relational database.
smaller space of fewer dimensions, e.g., principal component analysis (PCA). • Blaze abstracts most of the technologies and exposes a simple and elegant data
• Deep Learning is part of a family of machine learning methods. Deep learning structure and API.
replace or minimize the need for manual feature engineering - automating • Working with databases: Blaze can read from SQL databases such as
feature engineering or teaching machines to learn how to learn (unsupervised PostgreSQL (http://localhost:5432/) or SQLite, and NoSQL such as MongoDB
Feature Learning). database (http://localhost:27017).
• Deep learning success by
Spark Streaming
• Spark Streaming: a single API addresses both batch and streaming (including late
events, partial outputs to the final data source, state recovery on failure, and/or
distributed reads/writes.) and the benefits are performance, event time,
windowing, sessions, sources, and sink.
• Spark Streaming is a scalable, fault-tolerant streaming system that takes the
RDD batch paradigm (in Scala) and speeds it up.
• Spark Streaming is a high-level API that provides fault-tolerant exactly-once
semantics for stateful operations.
• Spark Streaming operates in mini-batches or batch intervals (from 500ms to
larger interval windows) at scale and in real time.
• Microbatches: Spark runs each streaming computation as a series of short
batch jobs.
• The key abstraction for Spark Streaming is Discretized Stream (DStream)
1. starts with the Spark Streaming Context, ssc.start()
built on RDDs.
2. When the Spark Streaming Context starts, the driver will execute a long-
• Spark Streaming integrates with MLlib, Spark SQL, DataFrames, and GraphX.
running task on the executors (that is, the Spark workers). The Receiver
Apache Spark unifies all of these disparate data processing paradigms within
on the executors (Executor 1 in this diagram) receives a data stream
the same framework.
from the Streaming Sources. With the incoming data stream, the
• Spark Streaming has built in receivers that can take on many sources, with
receiver divides the stream into blocks and keeps these blocks in
the most common being Apache Kafka, Flume, HDFS/S3, Kinesis, and
memory.
Twitter.
3. These blocks are also replicated to another executor to avoid data loss.
• Currently, there are four broad use cases surrounding Spark Streaming: 4. The block ID information is transmitted to the Block Management
• Streaming ETL: Data is continuously being cleansed and aggregated Master on the driver.
prior to being pushed downstream. 5. For every batch interval configured within Spark Streaming Context
• Triggers: Real-time detection of behavioral or anomaly events trigger (commonly this is every 1 second), the driver will launch Spark tasks to
immediate and downstream actions. process the blocks. Those blocks are then persisted to any number of
• Data enrichment: Real-time data joined to other datasets allowing for target data stores.
richer analysis. • End-to-end applications that continuously react to data in real-time:
• Complex sessions and continuous learning: Multiple sets of events
associated with real-time streams are continuously analyzed and/or
updating machine learning models.
• Spark Streaming application data flow
• Structured Streaming (Spark 2.0) import spark.implicits._
• Structured Streaming extended support for data source and data sinks, and // streaming DataFrame of schema
// { timestamp: Timestamp, word: String }
buttressed streaming operations including event-time processing, val words = ...
watermarking, and checkpointing. // Group the data by window and word and compute the count of
• Structured Streaming utilizes Spark DataFrames. It allows you to take the // each group
same operations that you perform in batch mode using Spark’s structured val deviceCounts = devices.groupBy(
APIs, and run them in a streaming fashion. This can reduce latency and allow window($"timestamp", "10 minutes", "5 minutes"), $"type"
for incremental processing. ).count()
• Incremental Execution Plan: structured streaming repeatedly applies the • Watermarking: mark or specify a threshold in your processing timeline interval
execution plan for every new set of blocks it receives. beyond which any data’s event time is deemed useless (i.e., retain only late data
• When is a stream not a stream: treat a stream of data not as a stream but within that time threshold or interval), it can be discarded without ramifications.
val deviceCounts = devices
as an unbounded table. As new data arrives from the steam, new rows of .withWatermark("timestamp", "15 minutes")
DataFrames are appended to an unbounded table. Developers can express .groupBy(
their steaming computations just like batch computations, and Spark will window($"timestamp", "10 minutes", "5 minutes"), $"type")
automatically execute it incrementally as data arrives in the stream. .count()
• 3 built-in data sources: • Continuous applications problems: late events, partial outputs, state recovery on
• File source (local drive, HDFS, S3 bucket) implementing the failure, distributed reads and writes, and so on.
DataStreamReader interface which supports data formats (avro, JSON,
CSV). SparkR
• Apache Kafka source: poll or read data from subscribed topics adhering
to all Kafka semantics. PySpark
• Network socket source: read UTF-8 text data from a socket connection
(IP and port pair), does not guarantee any end-to-end fault-tolerance as
the other two sources do.
• 4 built-in data sinks implementing foreach interface:
• File sink: directories or files
• Foreach sink implemented by the developer, a ForeachWriter interface
writes your data to the destination (NoSQL/JDBC sink, listening socket,
REST call to an external service) using open(), process(), and close().
• Console/stdout sink for debugging (incurs heavy memory usage on the
driver side)
• Memory sink for debugging: data is stored as an in-memory table
• Streaming operations on DataFrames and DataSets: select, projection,
aggregation
# streaming DataFrame with IOT device data with schema
# {device: string, type: string, signal: double, time: DateType}
devicesDF = ...
# Select the devices which have signal more than 10
devicesDF.select("device").where("signal > 10")
# Running count of the number of updates for each device type
devicesDF.groupBy("type").count()
• Windowing event time aggregations to handle out-of-order or late data