0% found this document useful (0 votes)
87 views9 pages

DATAFRAME Vs DATASETS

The document discusses operations on RDDs, DataFrames, and Datasets in Spark SQL. It shows how to create RDDs from arrays, transform RDDs using map and reduce, create DataFrames from case classes and JSON data, and convert between DataFrames and Datasets. Examples demonstrate filtering, selecting columns, and writing Datasets to Hive tables. Word count is implemented using DataFrames/Datasets to count the frequency of words from a text file.

Uploaded by

surendra yandra
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
87 views9 pages

DATAFRAME Vs DATASETS

The document discusses operations on RDDs, DataFrames, and Datasets in Spark SQL. It shows how to create RDDs from arrays, transform RDDs using map and reduce, create DataFrames from case classes and JSON data, and convert between DataFrames and Datasets. Examples demonstrate filtering, selecting columns, and writing Datasets to Hive tables. Word count is implemented using DataFrames/Datasets to count the frequency of words from a text file.

Uploaded by

surendra yandra
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 9

RDD OPERATIONS RELATED

scala> val data = Array(1,2,3,4,5,6,6,7,8)


data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 6, 7, 8)

scala> val distriData = sc.parallelize(data)


distriData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> distriData.map(_ + 2).collect().mkString("\n")


res0: String =
3
4
5
6
7
8
8
9
10

scala> val sumData = distriData.map(_ + 2) // Transformed RDD


sumData: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:28

scala> sumData.reduce(_+_) // ActionRDD


res1: Int = 60

scala>

DATA FRAME ( schemaRDD)

DataFrame is an abstraction which gives a schema view of data. Which means it gives us a view of
data as columns with column name and types info, We can think data in data frame like a table in the
database.

DATA FRAME using CASE CLASS

scala> case class Person(name : String , age:Int , address:String)


defined class Person

scala> val df =
List(Person("Raja",21,"HYD"),Person("Ramya",34,"BAN"),Person("Rani",30,"MUM")).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> df.collect().mkString("\n")
res2: String =
[Raja,21,HYD]
[Ramya,34,BAN]
[Rani,30,MUM]
scala> df.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+

scala> df.filter("age > 25").show


+-----+---+-------+
| name|age|address|
+-----+---+-------+
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+

scala> df.filter("salary > 25").show


org.apache.spark.sql.AnalysisException: cannot resolve '`salary`' given input columns: [name, age,
address]; line 1 pos 0

DATA SET [DS]


Data Set is an extension to Dataframe API, the latest abstraction which tries to provide best of both
RDD and Dataframe.

CONVERT “DATA FRAME(DF)” TO “DATASET(DS)”


NOTE: we can always convert a data frame at any point of time into a dataset by calling ‘as’
method on Dataframe. Example: df.as[MyClass]

i.e by providing the case class only we can convert a DATA FRAME into DATA SET.
scala> val ds = df.as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int ... 1 more field]

scala> ds.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+

scala> ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int ... 1 more field]

scala> ds.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+

scala> ds.filter(_.age > 21).show()


+-----+---+-------+
| name|age|address|
+-----+---+-------+
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+

scala> ds.filter(_.salary > 21).show()


<console>:28: error: value salary is not a member of Person
ds.filter(_.salary > 21).show()

OBSERVATION : Unlike data frame , which is giving a runtime exception saying “can not
resolve salary” , analysis exception , dataset is showing the COMPILE TIME ERROR only.

So Datasets API provides compile time safety which was not available in Data frames

 A Dataset can be constructed from JVM objects and then manipulated using functional
transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python
does not have the support for the Dataset API

CONVERTING “DATASET[DS]” to “DATA FRAME[DF]”


We can directly use toDF method to convert Data Set back to Data Frame , No need of using any Case
Class over here

scala> val newdf = ds.toDF


newdf: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> newdf.show
+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21| HYD|
|Ramya| 34| BAN|
| Rani| 30| MUM|
+-----+---+-------+

scala>

READING “JSON” DATA using “DATA FRAME” &


CONVERTING INTO “DATA SET”
scala> case class Emp(name:String,Desg:String,YrsOfExp:Double,Address:String,State:String)
defined class Emp

scala> val df = spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/InputData.json")


df: org.apache.spark.sql.DataFrame = [Address: string, Age: bigint ... 4 more fields]

scala> df.show(4)
+---------+----+-----------+---------+--------+-------+
| Address| Age| Desg| State|YrsOfExp| name|
+---------+----+-----------+---------+--------+-------+
|Hyderabad|null| STA|Telangana| 12.5| Gopal|
|Bangalore| 6| null|Karnataka| null|Mounika|
| Chennai| 22|Sw Engineer|TamilNadu| 1.2| Ramya|
|Hyderabad|null| TA|Telangana| 12.5| Sekhar|
+---------+----+-----------+---------+--------+-------+
only showing top 4 rows

scala> val ds =
spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/InputData.json").as[Emp]
ds: org.apache.spark.sql.Dataset[Emp] = [Address: string, Age: bigint ... 4 more fields]

scala>
TO CONVERT THE ABOVE “DATA FRAME(df)” to “DATA SET(ds)” – as[case class]

gopalkrishna@ubuntu:~/PRAC/SparkSQL/JDBC-JSON$ pwd
/home/gopalkrishna/PRAC/SparkSQL/JDBC-JSON
gopalkrishna@ubuntu:~/PRAC/SparkSQL/JDBC-JSON$ cat Data.json
{"empid":102,"ename":"Aravind","esal":54000}
{"empid":104,"ename":"Rakesh","esal":84000}
{"empid":105,"ename":"Danya","esal":55000}
{"empid":108,"ename":"Venkat","esal":74000}
{"empid":109,"ename":"RajVardhan","esal":87000}
{"empid":110,"ename":"SekharRaj","esal":56000}
{"empid":112,"ename":"Vardhan","esal":64000}
{"empid":113,"ename":"Richard","esal":68000}
{"empid":114,"ename":"Bruce","esal":96000}
{"empid":115,"ename":"Balamani","esal":49000}
{"empid":117,"ename":"Rajsekhar","esal":88000}
{"empid":118,"ename":"Ravali","esal":68000}
{"empid":119,"ename":"Nasal","esal":50000}
{"empid":125,"ename":"Ramya","esal":42000}
{"empid":126,"ename":"Rama","esal":44000}
{"empid":128,"ename":"RamBabu","esal":46000}
gopalkrishna@ubuntu:~/PRAC/SparkSQL/JDBC-JSON$

LOGIC
scala> case class emp(empid:BigInt,ename:String,esal:Double)
defined class emp

scala> var jsonDS = spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/JDBC-


JSON/Data.json").as[emp]
jsonDS: org.apache.spark.sql.Dataset[emp] = [empid: bigint, ename: string ... 1 more field]

scala> jsonDS.show(5)
+-----+----------+-----+
|empid| ename| esal|
+-----+----------+-----+
| 102| Aravind|54000|
| 104| Rakesh|84000|
| 105| Danya|55000|
| 108| Venkat|74000|
| 109|RajVardhan|87000|
+-----+----------+-----+
only showing top 5 rows

scala>
scala> case class emp(empid:BigInt,ename:String,esal:Double)
defined class emp

scala> var parDS = spark.read.parquet("file:///home/gopalkrishna/PRAC/SparkSQL/JDBC-


PARQUET/Data.parquet").as[emp]
parDS: org.apache.spark.sql.Dataset[emp] = [empid: int, ename: string ... 1 more field]

scala> parDS.show(5)
+-----+----------+-----+
|empid| ename| esal|
+-----+----------+-----+
| 102| Aravind|54000|
| 104| Rakesh|84000|
| 105| Danya|55000|
| 108| Venkat|74000|
| 109|RajVardhan|87000|
+-----+----------+-----+
only showing top 5 rows

scala> parDS.write.
bucketBy format jdbc mode options parquet save sortBy
csv insertInto json option orc partitionBy saveAsTable text
TO WRITE THE SAME “DATA SET” data to “HIVE TABLE”
scala> parDS.write.saveAsTable("testdb.parquettab")

scala>

scala> case class Emp(name:String,Desg:String,YrsOfExp:Double,Address:String,State:String)


defined class Emp

scala> val ds =
spark.read.json("file:///home/gopalkrishna/PRAC/SparkSQL/InputData.json").as[Emp]
ds: org.apache.spark.sql.Dataset[Emp] = [Address: string, Age: bigint ... 4 more fields]

scala> ds.columns
res1: Array[String] = Array(Address, Age, Desg, State, YrsOfExp, name)

scala> ds.show(5)
+---------+----+-----------+---------+--------+-------+
| Address| Age| Desg| State|YrsOfExp| name|
+---------+----+-----------+---------+--------+-------+
|Hyderabad|null| STA|Telangana| 12.5| Gopal|
|Bangalore| 6| null|Karnataka| null|Mounika|
| Chennai| 22|Sw Engineer|TamilNadu| 1.2| Ramya|
|Hyderabad|null| TA|Telangana| 12.5| Sekhar|
|Bangalore| 12| null|Karnataka| null| Reshma|
+---------+----+-----------+---------+--------+-------+
only showing top 5 rows

scala> ds.count
res3: Long = 27

scala>
Word Count Use Case through Data Sets

scala> var file = sc.textFile("file:///home/gopalkrishna/PRAC/SPARK/Input.log")


file: org.apache.spark.rdd.RDD[String] = file:///home/gopalkrishna/PRAC/SPARK/Input.log
MapPartitionsRDD[45] at textFile at <console>:24

scala> var splitfile = file.flatMap(_.split(" "))


splitfile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at flatMap at <console>:26

scala> var ds = splitfile.toDS


ds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> ds.orderBy("value").groupBy("value").count.show
+---------+-----+
| value|count|
+---------+-----+
| A| 1|
| ERROR| 9|
| ETA| 2|
| Getting| 2|
| HDFS| 2|
| Hadoop| 2|
| No| 1|
| Perfect| 1|
| a| 2|
|analytics| 2|
| and| 4|
| any| 1|
| as| 2|
| bigdata| 4|
| breaker| 2|
| can| 2|
| code| 1|
| code,| 2|
|component| 2|
|cubersome| 2|
+---------+-----+
only showing top 20 rows

scala>

scala> var data = sc.makeRDD( 1 to 30 )


data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24

scala> var mapdata = data.map(x => (x , (x*x) , (x*x*x) ))


mapdata: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[67] at map at <console>:26

scala> var ds = mapdata.toDS


ds: org.apache.spark.sql.Dataset[(Int, Int, Int)] = [_1: int, _2: int ... 1 more field]

scala> ds.columns
res13: Array[String] = Array(_1, _2, _3)

scala> var newds = ds.selectExpr("_1 AS num","_2 AS square","_3 as cube")


newds: org.apache.spark.sql.DataFrame = [num: int, square: int ... 1 more field]

scala> newds.columns
res17: Array[String] = Array(num, square, cube)

scala> newds.filter('square > 100 && 'cube < 8000).show


+---+------+----+
|num|square|cube|
+---+------+----+
| 11| 121|1331|
| 12| 144|1728|
| 13| 169|2197|
| 14| 196|2744|
| 15| 225|3375|
| 16| 256|4096|
| 17| 289|4913|
| 18| 324|5832|
| 19| 361|6859|
+---+------+----+

scala>
scala> var dsobj = sc.parallelize(List( ("Spark",20),("Scala",60),("Java",70),("C++",50),("C",50),
("Python",40),("Hadoop",50) )).toDS
dsobj: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

scala> dsobj.show
+------+---+
| _1| _2|
+------+---+
| Spark| 20|
| Scala| 60|
| Java| 70|
| C++| 50|
| C| 50|
|Python| 40|
|Hadoop| 50|
+------+---+

scala> dsobj.withColumnRenamed("_1","tech").withColumnRenamed("_2","rating").filter('rating >


30).show
+------+------+
| tech|rating|
+------+------+
| Scala| 60|
| Java| 70|
| C++| 50|
| C| 50|
|Python| 40|
|Hadoop| 50|
+------+------+

scala>

You might also like