RDD OPERATIONS RELATED
scala> val data = Array(1,2,3,4,5,6,6,7,8)
data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 6, 7, 8)
scala> val distriData = sc.parallelize(data)
distriData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> distriData.map(_ + 2).collect().mkString("\n")
res0: String =
3
4
5
6
7
8
8
9
10
scala> val sumData = distriData.map(_ + 2) // Transformed RDD
sumData: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:28
scala> sumData.reduce(_+_) // ActionRDD
res1: Int = 60
scala>
DATA FRAME ( schemaRDD)
DataFrame is an abstraction which gives a schema view of data. Which means it gives us a view of
data as columns with column name and types info, We can think data in data frame like a table in the
database.
DATA FRAME using CASE CLASS
scala> case class Person(name : String , age:Int , address:String)
defined class Person
scala> val df =
List(Person("Raja",21,"HYD"),Person("Ramya",34,"BAN"),Person("Rani",30,"MUM")).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> df.collect().mkString("\n")
res2: String =
[Raja,21,HYD]
[Ramya,34,BAN]
[Rani,30,MUM]
scala> df.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+
scala> df.filter("age > 25").show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+
scala> df.filter("salary > 25").show
org.apache.spark.sql.AnalysisException: cannot resolve '`salary`' given input columns: [name, age,
address]; line 1 pos 0
DATA SET [DS]
Data Set is an extension to Dataframe API, the latest abstraction which tries to provide best of both
RDD and Dataframe.
CONVERT “DATA FRAME(DF)” TO “DATASET(DS)”
NOTE: we can always convert a data frame at any point of time into a dataset by calling ‘as’
method on Dataframe. Example: df.as[MyClass]
i.e by providing the case class only we can convert a DATA FRAME into DATA SET.
scala> val ds = df.as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int ... 1 more field]
scala> ds.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+
scala> ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int ... 1 more field]
scala> ds.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+
scala> ds.filter(_.age > 21).show()
+-----+---+-------+
| name|age|address|
+-----+---+-------+
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+
scala> ds.filter(_.salary > 21).show()
<console>:28: error: value salary is not a member of Person
ds.filter(_.salary > 21).show()
OBSERVATION : Unlike data frame , which is giving a runtime exception saying “can not
resolve salary” , analysis exception , dataset is showing the COMPILE TIME ERROR only.
So Datasets API provides compile time safety which was not available in Data frames
A Dataset can be constructed from JVM objects and then manipulated using functional
transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python
does not have the support for the Dataset API
CONVERTING “DATASET[DS]” to “DATA FRAME[DF]”
We can directly use toDF method to convert Data Set back to Data Frame , No need of using any Case
Class over here
scala> val newdf = ds.toDF
newdf: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> newdf.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+
scala>
READING “JSON” DATA using “DATA FRAME” &
CONVERTING INTO “DATA SET”
scala> case class Emp(name:String,Desg:String,YrsOfExp:Double,Address:String,State:String)
defined class Emp
scala> val df = spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/InputData.json")
df: org.apache.spark.sql.DataFrame = [Address: string, Age: bigint ... 4 more fields]
scala> df.show(4)
+---------+----+-----------+---------+--------+-------+
| Address| Age| Desg| State|YrsOfExp| name|
+---------+----+-----------+---------+--------+-------+
|Hyderabad|null| STA|Telangana| 12.5| Gopal|
|Bangalore| 6| null|Karnataka| null|Mounika|
| Chennai| 22|Sw Engineer|TamilNadu| 1.2| Ramya|
|Hyderabad|null| TA|Telangana| 12.5| Sekhar|
+---------+----+-----------+---------+--------+-------+
only showing top 4 rows
scala> val ds =
spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/InputData.json").as[Emp]
ds: org.apache.spark.sql.Dataset[Emp] = [Address: string, Age: bigint ... 4 more fields]
scala>
TO CONVERT THE ABOVE “DATA FRAME(df)” to “DATA SET(ds)” – as[case class]
gopalkrishna@ubuntu:~/PRAC/SparkSQL/JDBC-JSON$ pwd
/home/gopalkrishna/PRAC/SparkSQL/JDBC-JSON
gopalkrishna@ubuntu:~/PRAC/SparkSQL/JDBC-JSON$ cat Data.json
{"empid":102,"ename":"Aravind","esal":54000}
{"empid":104,"ename":"Rakesh","esal":84000}
{"empid":105,"ename":"Danya","esal":55000}
{"empid":108,"ename":"Venkat","esal":74000}
{"empid":109,"ename":"RajVardhan","esal":87000}
{"empid":110,"ename":"SekharRaj","esal":56000}
{"empid":112,"ename":"Vardhan","esal":64000}
{"empid":113,"ename":"Richard","esal":68000}
{"empid":114,"ename":"Bruce","esal":96000}
{"empid":115,"ename":"Balamani","esal":49000}
{"empid":117,"ename":"Rajsekhar","esal":88000}
{"empid":118,"ename":"Ravali","esal":68000}
{"empid":119,"ename":"Nasal","esal":50000}
{"empid":125,"ename":"Ramya","esal":42000}
{"empid":126,"ename":"Rama","esal":44000}
{"empid":128,"ename":"RamBabu","esal":46000}
gopalkrishna@ubuntu:~/PRAC/SparkSQL/JDBC-JSON$
LOGIC
scala> case class emp(empid:BigInt,ename:String,esal:Double)
defined class emp
scala> var jsonDS = spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/JDBC-
JSON/Data.json").as[emp]
jsonDS: org.apache.spark.sql.Dataset[emp] = [empid: bigint, ename: string ... 1 more field]
scala> jsonDS.show(5)
+-----+----------+-----+
|empid| ename| esal|
+-----+----------+-----+
| 102| Aravind|54000|
| 104| Rakesh|84000|
| 105| Danya|55000|
| 108| Venkat|74000|
| 109|RajVardhan|87000|
+-----+----------+-----+
only showing top 5 rows
scala>
scala> case class emp(empid:BigInt,ename:String,esal:Double)
defined class emp
scala> var parDS = spark.read.parquet("file:///home/gopalkrishna/PRAC/SparkSQL/JDBC-
PARQUET/Data.parquet").as[emp]
parDS: org.apache.spark.sql.Dataset[emp] = [empid: int, ename: string ... 1 more field]
scala> parDS.show(5)
+-----+----------+-----+
|empid| ename| esal|
+-----+----------+-----+
| 102| Aravind|54000|
| 104| Rakesh|84000|
| 105| Danya|55000|
| 108| Venkat|74000|
| 109|RajVardhan|87000|
+-----+----------+-----+
only showing top 5 rows
scala> parDS.write.
bucketBy format jdbc mode options parquet save sortBy
csv insertInto json option orc partitionBy saveAsTable text
TO WRITE THE SAME “DATA SET” data to “HIVE TABLE”
scala> parDS.write.saveAsTable("testdb.parquettab")
scala>
scala> case class Emp(name:String,Desg:String,YrsOfExp:Double,Address:String,State:String)
defined class Emp
scala> val ds =
spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/InputData.json").as[Emp]
ds: org.apache.spark.sql.Dataset[Emp] = [Address: string, Age: bigint ... 4 more fields]
scala> ds.columns
res1: Array[String] = Array(Address, Age, Desg, State, YrsOfExp, name)
scala> ds.show(5)
+---------+----+-----------+---------+--------+-------+
| Address| Age| Desg| State|YrsOfExp| name|
+---------+----+-----------+---------+--------+-------+
|Hyderabad|null| STA|Telangana| 12.5| Gopal|
|Bangalore| 6| null|Karnataka| null|Mounika|
| Chennai| 22|Sw Engineer|TamilNadu| 1.2| Ramya|
|Hyderabad|null| TA|Telangana| 12.5| Sekhar|
|Bangalore| 12| null|Karnataka| null| Reshma|
+---------+----+-----------+---------+--------+-------+
only showing top 5 rows
scala> ds.count
res3: Long = 27
scala>
Word Count Use Case through Data Sets
scala> var file = sc.textFile("file:///home/gopalkrishna/PRAC/SPARK/Input.log")
file: org.apache.spark.rdd.RDD[String] = file:///home/gopalkrishna/PRAC/SPARK/Input.log
MapPartitionsRDD[45] at textFile at <console>:24
scala> var splitfile = file.flatMap(_.split(" "))
splitfile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at flatMap at <console>:26
scala> var ds = splitfile.toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.orderBy("value").groupBy("value").count.show
+---------+-----+
| value|count|
+---------+-----+
| A| 1|
| ERROR| 9|
| ETA| 2|
| Getting| 2|
| HDFS| 2|
| Hadoop| 2|
| No| 1|
| Perfect| 1|
| a| 2|
|analytics| 2|
| and| 4|
| any| 1|
| as| 2|
| bigdata| 4|
| breaker| 2|
| can| 2|
| code| 1|
| code,| 2|
|component| 2|
|cubersome| 2|
+---------+-----+
only showing top 20 rows
scala>
scala> var data = sc.makeRDD( 1 to 30 )
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24
scala> var mapdata = data.map(x => (x , (x*x) , (x*x*x) ))
mapdata: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[67] at map at <console>:26
scala> var ds = mapdata.toDS
ds: org.apache.spark.sql.Dataset[(Int, Int, Int)] = [_1: int, _2: int ... 1 more field]
scala> ds.columns
res13: Array[String] = Array(_1, _2, _3)
scala> var newds = ds.selectExpr("_1 AS num","_2 AS square","_3 as cube")
newds: org.apache.spark.sql.DataFrame = [num: int, square: int ... 1 more field]
scala> newds.columns
res17: Array[String] = Array(num, square, cube)
scala> newds.filter('square > 100 && 'cube < 8000).show
+---+------+----+
|num|square|cube|
+---+------+----+
| 11| 121|1331|
| 12| 144|1728|
| 13| 169|2197|
| 14| 196|2744|
| 15| 225|3375|
| 16| 256|4096|
| 17| 289|4913|
| 18| 324|5832|
| 19| 361|6859|
+---+------+----+
scala>
scala> var dsobj = sc.parallelize(List( ("Spark",20),("Scala",60),("Java",70),("C++",50),("C",50),
("Python",40),("Hadoop",50) )).toDS
dsobj: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
scala> dsobj.show
+------+---+
| _1| _2|
+------+---+
| Spark| 20|
| Scala| 60|
| Java| 70|
| C++| 50|
| C| 50|
|Python| 40|
|Hadoop| 50|
+------+---+
scala> dsobj.withColumnRenamed("_1","tech").withColumnRenamed("_2","rating").filter('rating >
30).show
+------+------+
| tech|rating|
+------+------+
| Scala| 60|
| Java| 70|
| C++| 50|
| C| 50|
|Python| 40|
|Hadoop| 50|
+------+------+
scala>