0% found this document useful (0 votes)
25 views104 pages

02 Sparkml

The document provides an overview of Machine Learning with Spark, detailing its architecture, including the Spark execution model, Resilient Distributed Datasets (RDDs), and DataFrames. It covers various machine learning concepts such as supervised and unsupervised learning, the MLlib package, and the pipeline structure for processing data. Additionally, it explains the roles of transformers and estimators in creating machine learning models.

Uploaded by

mraiyata
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
25 views104 pages

02 Sparkml

The document provides an overview of Machine Learning with Spark, detailing its architecture, including the Spark execution model, Resilient Distributed Datasets (RDDs), and DataFrames. It covers various machine learning concepts such as supervised and unsupervised learning, the MLlib package, and the pipeline structure for processing data. Additionally, it explains the roles of transformers and estimators in creating machine learning models.

Uploaded by

mraiyata
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 104

Machine Learning with Spark

Amir H. Payberah
payberah@kth.se
2021-11-04
The Course Web Page

https://id2223kth.github.io
https://tinyurl.com/6s5jy46a

1 / 73
Where Are We?

2 / 73
Where Are We?

3 / 73
Big Data

4 / 73
Problem

I Traditional platforms fail to show the expected performance.

I Need new systems to store and process large-scale data

5 / 73
Scale Up vs. Scale Out

I Scale up or scale vertically

I Scale out or scale horizontally

6 / 73
Spark

7 / 73
Spark Execution Model (1/3)

I Spark applications consist of


• A driver process
• A set of executor processes

[M. Zaharia et al., Spark: The Definitive Guide, O’Reilly Media, 2018]

8 / 73
Spark Execution Model (2/3)
I The driver process is the heart of a Spark application

I Sits on a node in the cluster

I Runs the main() function

9 / 73
Spark Execution Model (3/3)

I Executors execute codes assigned to them by the driver.

10 / 73
Spark Programming Model

I Job description based on directed acyclic graphs (DAG).

I There are two types of RDD operators: transformations and actions.

11 / 73
Resilient Distributed Datasets (RDD) (1/2)

I A distributed memory abstraction.

I Immutable collections of objects spread across a cluster.


• Like a LinkedList <MyObjects>

12 / 73
Resilient Distributed Datasets (RDD) (2/2)

I An RDD is divided into a number of partitions, which are atomic pieces of information.

I Partitions of an RDD can be stored on different nodes of a cluster.

13 / 73
Creating RDDs

I Turn a collection into an RDD.


val a = sc.parallelize(Array(1, 2, 3))

14 / 73
Creating RDDs

I Turn a collection into an RDD.


val a = sc.parallelize(Array(1, 2, 3))

I Load text file from local FS, HDFS, or S3.

val a = sc.textFile("file.txt")
val b = sc.textFile("directory/*.txt")
val c = sc.textFile("hdfs://namenode:9000/path/file")

14 / 73
RDD Operations

I Transformations: lazy operators that create new RDDs.

I Actions: lunch a computation and return a value to the program or write data to the
external storage.

15 / 73
Spark and Spark SQL

16 / 73
DataFrame

I A DataFrame is a distributed collection of rows with a homogeneous schema.

I It is equivalent to a table in a relational database.

I It can also be manipulated in similar ways to RDDs.

17 / 73
Adding Schema to RDDs

I Spark + RDD: functional transformations on partitioned collections of opaque ob-


jects.

I SQL + DataFrame: declarative transformations on partitioned collections of tuples.

18 / 73
Creating a DataFrame - From an RDD

I You can use toDF to convert an RDD to DataFrame.


val tupleRDD = sc.parallelize(Array(("seif", 65, 0), ("amir", 40, 1)))
val tupleDF = tupleRDD.toDF("name", "age", "id")

19 / 73
Creating a DataFrame - From an RDD

I You can use toDF to convert an RDD to DataFrame.


val tupleRDD = sc.parallelize(Array(("seif", 65, 0), ("amir", 40, 1)))
val tupleDF = tupleRDD.toDF("name", "age", "id")

I If RDD contains case class instances, Spark infers the attributes from it.
case class Person(name: String, age: Int, id: Int)

val peopleRDD = sc.parallelize(Array(Person("seif", 65, 0), Person("amir", 40, 1)))


val peopleDF = peopleRDD.toDF

19 / 73
Creating a DataFrame - From Data Source

I Data sources supported by Spark.


• CSV, JSON, Parquet, ORC, JDBC/ODBC connections, Plain-text files
• Cassandra, HBase, MongoDB, AWS Redshift, XML, etc.

val peopleJson = spark.read.format("json").load("people.json")

val peopleCsv = spark.read.format("csv")


.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("people.csv")

20 / 73
Column

I Different ways to refer to a column.

val people = spark.read.format("json").load("people.json")

people.col("name")

col("name")

column("name")

’name

$"name"

expr("name")

21 / 73
DataFrame Transformations (1/6)

I select allows to do the DataFrame equivalent of SQL queries on a table of data.

people.select("name", "age", "id").show(2)


people.select(col("name"), expr("age + 3")).show()
people.select(expr("name AS username")).show(2)

22 / 73
DataFrame Transformations (1/6)

I select allows to do the DataFrame equivalent of SQL queries on a table of data.

people.select("name", "age", "id").show(2)


people.select(col("name"), expr("age + 3")).show()
people.select(expr("name AS username")).show(2)

I filter and where both filter rows.


people.filter(col("age") < 20).show()

people.where("age < 20").show()

22 / 73
DataFrame Transformations (1/6)

I select allows to do the DataFrame equivalent of SQL queries on a table of data.

people.select("name", "age", "id").show(2)


people.select(col("name"), expr("age + 3")).show()
people.select(expr("name AS username")).show(2)

I filter and where both filter rows.


people.filter(col("age") < 20).show()

people.where("age < 20").show()

I distinct can be used to extract unique rows.

people.select("name").distinct().count()

22 / 73
DataFrame Transformations (2/6)

I withColumn adds a new column to a DataFrame.


people.withColumn("teenager", expr("age < 20")).show()

23 / 73
DataFrame Transformations (2/6)

I withColumn adds a new column to a DataFrame.


people.withColumn("teenager", expr("age < 20")).show()

I withColumnRenamed renames a column.


people.withColumnRenamed("name", "username").columns

23 / 73
DataFrame Transformations (2/6)

I withColumn adds a new column to a DataFrame.


people.withColumn("teenager", expr("age < 20")).show()

I withColumnRenamed renames a column.


people.withColumnRenamed("name", "username").columns

I drop removes a column.

people.drop("name").columns

23 / 73
DataFrame Transformations (3/6)

I count returns the total number of values.


people.select(count("age")).show()

24 / 73
DataFrame Transformations (3/6)

I count returns the total number of values.


people.select(count("age")).show()

I countDistinct returns the number of unique groups.

people.select(countDistinct("name")).show()

24 / 73
DataFrame Transformations (3/6)

I count returns the total number of values.


people.select(count("age")).show()

I countDistinct returns the number of unique groups.

people.select(countDistinct("name")).show()

I first and last return the first and last value of a DataFrame.
people.select(first("name"), last("age")).show()

24 / 73
DataFrame Transformations (4/6)

I min and max extract the minimum and maximum values from a DataFrame.
people.select(min("name"), max("age"), max("id")).show()

25 / 73
DataFrame Transformations (4/6)

I min and max extract the minimum and maximum values from a DataFrame.
people.select(min("name"), max("age"), max("id")).show()

I sum adds all the values in a column.


people.select(sum("age")).show()

25 / 73
DataFrame Transformations (4/6)

I min and max extract the minimum and maximum values from a DataFrame.
people.select(min("name"), max("age"), max("id")).show()

I sum adds all the values in a column.


people.select(sum("age")).show()

I avg calculates the average.

people.select(avg("age")).show()

25 / 73
DataFrame Transformations (5/6)

I groupBy and agg together perform aggregations on groups.

people.groupBy("name").agg(count("age")).show()

26 / 73
DataFrame Transformations (5/6)

I groupBy and agg together perform aggregations on groups.

people.groupBy("name").agg(count("age")).show()

I join performs the join operation between two tables.

val t1 = spark.createDataFrame(Seq((0, "a", 0), (1, "b", 1), (2, "c", 1)))
.toDF("num", "name", "id")
val t2 = spark.createDataFrame(Seq((0, "x"), (1, "y"), (2, "z")))
.toDF("id", "group")

val joinExpression = t1.col("id") === t2.col("id")


var joinType = "inner"

t1.join(t2, joinExpression, joinType).show()

26 / 73
DataFrame Transformations (6/6)

I You can use udf to define new column-based functions.


import org.apache.spark.sql.functions.udf

val df = spark.createDataFrame(Seq((0, "hello"), (1, "world"))).toDF("id", "text")

val upper: String => String = _.toUpperCase


val upperUDF = spark.udf.register("upper", upper)

df.withColumn("upper", upperUDF(col("text"))).show

27 / 73
DataFrame Actions

I Like RDDs, DataFrames also have their own set of actions.

I collect: returns an array that contains all the rows in this DataFrame.

I count: returns the number of rows in this DataFrame.

I first and head: returns the first row of the DataFrame.

I show: displays the top 20 rows of the DataFrame in a tabular form.

I take: returns the first n rows of the DataFrame.

28 / 73
Machine Learning

29 / 73
Machine Learning with Spark

I Spark provides support for statistics and machine learning.


• Supervised learning
• Unsupervised engines
• Deep learning

30 / 73
Supervised Learning

I Using labeled historical data and training a model to predict the values of those labels
based on various features of the data points.

I Classification (categorical values)


• E.g., predicting disease, classifying images, ...

I Regression (continuous values)


• E.g., predicting sales, predicting height, ...

31 / 73
Unsupervised Learning

I No label to predict.

I Trying to find patterns or discover the underlying structure in a given set of data.
• Clustering, anomaly detection, ...

32 / 73
The Advanced Analytic Process

I Data collection
I Data cleaning
I Feature engineering
I Training models
I Model tuning and
evaluation

33 / 73
What is MLlib? (1/2)

I MLlib is a package built on Spark.


I It provides interfaces for:
• Gathering and cleaning data
• Feature engineering and feature selection
• Training and tuning large-scale supervised and unsupervised machine learning models
• Using those models in production

34 / 73
What is MLlib? (2/2)

I MLlib consists of two packages.

35 / 73
What is MLlib? (2/2)

I MLlib consists of two packages.

I org.apache.spark.mllib
• Uses RDDs
• It is in maintenance mode (only receives bug fixes, not new features)

35 / 73
What is MLlib? (2/2)

I MLlib consists of two packages.

I org.apache.spark.mllib
• Uses RDDs
• It is in maintenance mode (only receives bug fixes, not new features)

I org.apache.spark.ml
• Uses DataFrames
• Offers a high-level interface for building machine learning pipelines

35 / 73
High-Level MLlib Concepts

I ML pipelines (spark.ml) provide a uniform set of high-level APIs built on top of


DataFrames to create machine learning pipelines.

36 / 73
Pipeline

I Pipeline is a sequence of algorithms to process and learn from data.

37 / 73
Pipeline

I Pipeline is a sequence of algorithms to process and learn from data.

I E.g., a text document processing workflow might include several stages:

37 / 73
Pipeline

I Pipeline is a sequence of algorithms to process and learn from data.

I E.g., a text document processing workflow might include several stages:


• Split each document’s text into words.

37 / 73
Pipeline

I Pipeline is a sequence of algorithms to process and learn from data.

I E.g., a text document processing workflow might include several stages:


• Split each document’s text into words.
• Convert each document’s words into a numerical feature vector.

37 / 73
Pipeline

I Pipeline is a sequence of algorithms to process and learn from data.

I E.g., a text document processing workflow might include several stages:


• Split each document’s text into words.
• Convert each document’s words into a numerical feature vector.
• Learn a prediction model using the feature vectors and labels.

37 / 73
Pipeline

I Pipeline is a sequence of algorithms to process and learn from data.

I E.g., a text document processing workflow might include several stages:


• Split each document’s text into words.
• Convert each document’s words into a numerical feature vector.
• Learn a prediction model using the feature vectors and labels.

I Main pipeline components: transformers and estimators

37 / 73
Transformers

I Transformers take a DataFrame as input and produce a new DataFrame as output.

// transformer: DataFrame =[transform]=> DataFrame

transform(dataset: DataFrame): DataFrame

38 / 73
Transformers

I Transformers take a DataFrame as input and produce a new DataFrame as output.


I The class Transformer implements a method transform() that converts one
DataFrame into another.

// transformer: DataFrame =[transform]=> DataFrame

transform(dataset: DataFrame): DataFrame

38 / 73
Estimators

I Estimator is an abstraction of a learning algorithm that fits a model on a dataset.

// estimator: DataFrame =[fit]=> Model

fit(dataset: DataFrame): M

39 / 73
Estimators

I Estimator is an abstraction of a learning algorithm that fits a model on a dataset.


I The class Estimator implements a method fit(), which accepts a DataFrame and
produces a Model (Transformer).

// estimator: DataFrame =[fit]=> Model

fit(dataset: DataFrame): M

39 / 73
How Does Pipeline Work? (1/3)

I A pipeline is a sequence of stages.

I Stages of a pipeline run in order.

40 / 73
How Does Pipeline Work? (1/3)

I A pipeline is a sequence of stages.

I Stages of a pipeline run in order.


I The input DataFrame is transformed as it passes through each stage.
• Each stage is either a Transformer or an Estimator.

40 / 73
How Does Pipeline Work? (1/3)

I A pipeline is a sequence of stages.

I Stages of a pipeline run in order.


I The input DataFrame is transformed as it passes through each stage.
• Each stage is either a Transformer or an Estimator.

I E.g., a Pipeline with three stages: Tokenizer and HashingTF are Transformers, and
LogisticRegression is an Estimator.

40 / 73
How Does Pipeline Work? (2/3)

I Pipeline.fit(): is called on the original DataFrame


• DataFrame with raw text documents and labels

41 / 73
How Does Pipeline Work? (2/3)

I Pipeline.fit(): is called on the original DataFrame


• DataFrame with raw text documents and labels
I Tokenizer.transform(): splits the raw text documents into words
• Adds a new column with words to the DataFrame

41 / 73
How Does Pipeline Work? (2/3)

I Pipeline.fit(): is called on the original DataFrame


• DataFrame with raw text documents and labels
I Tokenizer.transform(): splits the raw text documents into words
• Adds a new column with words to the DataFrame
I HashingTF.transform(): converts the words column into feature vectors
• Adds new column with those vectors to the DataFrame

41 / 73
How Does Pipeline Work? (2/3)

I Pipeline.fit(): is called on the original DataFrame


• DataFrame with raw text documents and labels
I Tokenizer.transform(): splits the raw text documents into words
• Adds a new column with words to the DataFrame
I HashingTF.transform(): converts the words column into feature vectors
• Adds new column with those vectors to the DataFrame
I LogisticRegression.fit(): produces a model (LogisticRegressionModel).

41 / 73
How Does Pipeline Work? (3/3)

I A Pipeline is an Estimator (DataFrame =[fit]=> Model).

Pipeline PipelineModel

42 / 73
How Does Pipeline Work? (3/3)

I A Pipeline is an Estimator (DataFrame =[fit]=> Model).


I After a Pipeline’s fit() runs, it produces a PipelineModel.

Pipeline PipelineModel

42 / 73
How Does Pipeline Work? (3/3)

I A Pipeline is an Estimator (DataFrame =[fit]=> Model).


I After a Pipeline’s fit() runs, it produces a PipelineModel.
I PipelineModel is a Transformer (DataFrame =[transform]=> DataFrame).

Pipeline PipelineModel

42 / 73
How Does Pipeline Work? (3/3)

I A Pipeline is an Estimator (DataFrame =[fit]=> Model).


I After a Pipeline’s fit() runs, it produces a PipelineModel.
I PipelineModel is a Transformer (DataFrame =[transform]=> DataFrame).
I The PipelineModel is used at test time.

Pipeline PipelineModel

42 / 73
Example - Input DataFrame (1/2)

I Make a DataFrame of the type Article.

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

case class Article(id: Long, topic: String, text: String)

val articles = spark.createDataFrame(Seq(


Article(0, "sci.math", "Hello, Math!"),
Article(1, "alt.religion", "Hello, Religion!"),
Article(2, "sci.physics", "Hello, Physics!"),
Article(3, "sci.math", "Hello, Math Revised!"),
Article(4, "sci.math", "Better Math"),
Article(5, "alt.religion", "TGIF"))).toDF

articles.show

43 / 73
Example - Input DataFrame (2/2)

I Add a new column label to the DataFrame.


I udf is a feature of Spark SQL to define new Column-based functions.

val topic2Label: Boolean => Double = x => if (x) 1 else 0

val toLabel = spark.udf.register("topic2Label", topic2Label)

val labelled = articles.withColumn("label", toLabel($"topic".like("sci%"))).cache

labelled.show

44 / 73
Example - Transformers (1/2)

I Break each sentence into individual terms (words).

import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.RegexTokenizer

val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")

val tokenized = tokenizer.transform(labelled)

tokenized.show(false)

45 / 73
Example - Transformers (2/2)

I Takes a set of words and converts them into fixed-length feature vector.
• 5000 in our example
I Uses a hash function to map each word into an index in the feature vector.
I Then computes the term frequencies based on the mapped indices.

import org.apache.spark.ml.feature.HashingTF

val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol)


.setOutputCol("features")
.setNumFeatures(5000)

val hashed = hashingTF.transform(tokenized)

hashed.show(false)

46 / 73
Example - Estimator

val Array(trainDF, testDF) = hashed.randomSplit(Array(0.8, 0.2))

trainDF.show

testDF.show

import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setMaxIter(20).setRegParam(0.01)

val model = lr.fit(trainDF)

val pred = model.transform(testDF).select("topic", "label", "prediction")

pred.show

47 / 73
Example - Pipeline

val Array(trainDF2, testDF2) = labelled.randomSplit(Array(0.8, 0.2))

trainDF2.show

testDF2.show

import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

val model2 = pipeline.fit(trainDF2)

val pred = model2.transform(testDF2).select("topic", "label", "prediction")

pred.show

48 / 73
Parameters

I MLlib Estimators and Transformers use a uniform API for specifying parameters.

49 / 73
Parameters

I MLlib Estimators and Transformers use a uniform API for specifying parameters.

I Param: a named parameter

I ParamMap: a set of (parameter, value) pairs

49 / 73
Parameters

I MLlib Estimators and Transformers use a uniform API for specifying parameters.

I Param: a named parameter

I ParamMap: a set of (parameter, value) pairs


I Two ways to pass parameters to an algorithm:
1. Set parameters for an instance, e.g., lr.setMaxIter(10)
2. Pass a ParamMap to fit() or transform().

49 / 73
Example - ParamMap

// set parameters using setter methods.


val lr = new LogisticRegression()

lr.setMaxIter(10).setRegParam(0.01)

// specify parameters using a ParamMap


val lr = new LogisticRegression()

val paramMap = ParamMap(lr.maxIter -> 20)


.put(lr.maxIter, 30) // specify one Param
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // specify multiple Params

val model = lr.fit(training, paramMap)

50 / 73
Low-Level Data Types - Local Vector

I Stored on a single machine


I Dense and sparse
• Dense (1.0, 0.0, 3.0): [1.0, 0.0, 3.0]
• Sparse (1.0, 0.0, 3.0): (3, [0, 2], [1.0, 3.0])

import org.apache.spark.mllib.linalg.{Vector, Vectors}

val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)

val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))


val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

51 / 73
Preprocessing and Feature Engineering

52 / 73
Formatting Models

I In most of classification and regression algorithms, we want to get the data.


• A column to represent the label (Double).
• A column to represent the features (Vector)

53 / 73
Transformers and Estimators

Transformer Estimator

54 / 73
Transformer Properties

I All transformers require you to specify the input and output columns.
I We can set these with setInputCol and setOutputCol.

val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")

55 / 73
Vector Assembler

I Concatenate all your features into one vector.

import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3"))


.setOutputCol("features")

va.transform(numsDF).show()

56 / 73
MLlib Transformers

I Continuous features

I Categorical features

I Text data

57 / 73
MLlib Transformers

I Continuous features

I Categorical features

I Text data

58 / 73
Continuous Features - Bucketing

I Convert continuous features into categorical features.

import org.apache.spark.ml.feature.Bucketizer

val contDF = spark.range(20).selectExpr("cast(id as double)")


val bucketBorders = Array(-1.0, 5.0, 10.0, 15.0, 20.0)

val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("id")

bucketer.transform(contDF).show()

59 / 73
Continuous Features - Scaling and Normalization

I To scale and normalize continuous data.

import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)


val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF
val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3"))
.setOutputCol("features")
val nums = va.transform(numsDF)

import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaled")


scaler.fit(nums).transform(nums).show()

60 / 73
Continuous Features - Maximum Absolute Scaler

I Scales the data by dividing each feature by the maximum absolute value in this
feature (column).

import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)


val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF
val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3"))
.setOutputCol("features")
val nums = va.transform(numsDF)

import org.apache.spark.ml.feature.MaxAbsScaler

val maScaler = new MaxAbsScaler().setInputCol("features").setOutputCol("mas")


maScaler.fit(nums).transform(nums).show()

61 / 73
MLlib Transformers

I Continuous features

I Categorical features

I Text data

62 / 73
Categorical Features - String Indexer

I Maps strings to different numerical IDs.

val simpleDF = spark.read.json("simple-ml.json")

import org.apache.spark.ml.feature.StringIndexer

val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")


val idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)

idxRes.show()

63 / 73
Categorical Features - Converting Indexed Values Back to Text

I Maps back to the original values.

import org.apache.spark.ml.feature.IndexToString

val labelReverse = new IndexToString().setInputCol("labelInd").setOutputCol("original")

labelReverse.transform(idxRes).show()

64 / 73
Categorical Features - One-Hot Encoding

I Converts each distinct value to a boolean flag as a component in a vector.

val simpleDF = spark.read.json("simple-ml.json")

import org.apache.spark.ml.feature.OneHotEncoder

val lblIndxr = new StringIndexer().setInputCol("color").setOutputCol("colorInd")


val colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
val ohe = new OneHotEncoder().setInputCol("colorInd").setOutputCol("one-hot")
ohe.transform(colorLab).show()

// Since there are three values, the vector is of length 2 and the mapping is as follows:
// 0 -> 10, (2,[0],[1.0])
// 1 -> 01, (2,[1],[1.0])
// 2 -> 00, (2,[],[])
// (2,[0],[1.0]) means a vector of length 2 with 1.0 at position 0 and 0 elsewhere.

65 / 73
MLlib Transformers

I Continuous features

I Categorical features

I Text data

66 / 73
Text Data - Tokenizing Text

I Converting free-form text into a list of tokens or individual words.

val sales = spark.read.format("csv").option("header", "true").load("sales.csv")


.where("Description IS NOT NULL")

sales.show(false)

import org.apache.spark.ml.feature.Tokenizer

val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")


val tokenized = tkn.transform(sales.select("Description"))
tokenized.show(false)

67 / 73
Text Data - Removing Common Words

I Filters stop words, such as ”the”, ”and”, and ”but”.

import org.apache.spark.ml.feature.StopWordsRemover

val df = spark.createDataFrame(Seq((0, Seq("I", "saw", "the", "red", "balloon")),


(1, Seq("Mary", "had", "a", "little", "lamb")))).toDF("id", "raw")

val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")

val stops = new StopWordsRemover().setStopWords(englishStopWords)


.setInputCol("raw").setOutputCol("WithoutStops")

stops.transform(df).show(false)

68 / 73
Text Data - Converting Words into Numerical Representations

I Counts instances of words in word features.


I Treats every row as a document, every word as a term, and the total collection of all
terms as the vocabulary.

import org.apache.spark.ml.feature.CountVectorizer

val df = spark.createDataFrame(Seq((0, Array("a", "b", "c")),


(1, Array("a", "b", "b", "c", "a")))).toDF("id", "words")

val cvModel = new CountVectorizer().setInputCol("words").setOutputCol("features")


.setVocabSize(3).setMinDF(2)

val fittedCV = cvModel.fit(df)

fittedCV.transform(df).show(false)

69 / 73
Summary

70 / 73
Summary

I Spark: RDD

I Spark SQL: DataFrame

I MLlib
• Transformers and Estimators
• Pipeline
• Feature engineering

71 / 73
References

I Matei Zaharia et al., Spark - The Definitive Guide, (Ch. 24 and 25)

72 / 73
Questions?

73 / 73

You might also like