import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("nik").getOrCreate()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
Double-click (or enter) to edit
df = spark.read.csv("simple-zipcodes.csv", header="TRUE")
df.printSchema()
root
|-- RecordNumber: string (nullable = true)
|-- Country: string (nullable = true)
|-- City: string (nullable = true)
|-- Zipcode: string (nullable = true)
|-- State: string (nullable = true)
df.show()
+------------+-------+-------------------+-------+-----+
|RecordNumber|Country| City|Zipcode|State|
+------------+-------+-------------------+-------+-----+
| 1| US| PARC PARQUE| 704| PR|
| 2| US|PASEO COSTA DEL SUR| 704| PR|
| 10| US| BDA SAN LUIS| 709| PR|
| 49347| US| HOLT| 32564| FL|
| 49348| US| HOMOSASSA| 34487| FL|
| 61391| US| CINGULAR WIRELESS| 76166| TX|
| 61392| US| FORT WORTH| 76177| TX|
| 61393| US| FT WORTH| 76177| TX|
| 54356| US| SPRUCE PINE| 35585| AL|
| 76511| US| ASH HILL| 27007| NC|
| 4| US| URB EUGENE RICE| 704| PR|
| 39827| US| MESA| 85209| AZ|
| 39828| US| MESA| 85210| AZ|
| 49345| US| HILLIARD| 32046| FL|
| 49346| US| HOLDER| 34445| FL|
| 3| US| SECT LANAUSSE| 704| PR|
| 54354| US| SPRING GARDEN| 36275| AL|
| 54355| US| SPRINGVILLE| 35146| AL|
| 76512| US| ASHEBORO| 27203| NC|
| 76513| US| ASHEBORO| 27204| NC|
+------------+-------+-------------------+-------+-----+
# to work with SQL queries on spark we need to create a temporary view on the data frame
spark.read.csv("simple-zipcodes.csv", header="TRUE").createOrReplaceTempView("Zipcodes")
# Spark SQL to Select Columns
# The select() function of DataFrame API is used to select the specific columns from the DataFrame.
df.select("country","city","zipcode","state").show(5)
+-------+-------------------+-------+-----+
|country| city|zipcode|state|
+-------+-------------------+-------+-----+
| US| PARC PARQUE| 704| PR|
| US|PASEO COSTA DEL SUR| 704| PR|
| US| BDA SAN LUIS| 709| PR|
| US| HOLT| 32564| FL|
| US| HOMOSASSA| 34487| FL|
+-------+-------------------+-------+-----+
only showing top 5 rows
# In SQL, you can achieve the same using SELECT FROM clause as shown below.
# SQL Select query
spark.sql("SELECT country, city, zipcode, state FROM ZIPCODES").show(5)
# Both above examples yields the below output.
+-------+-------------------+-------+-----+
|country| city|zipcode|state|
+-------+-------------------+-------+-----+
| US| PARC PARQUE| 704| PR|
| US|PASEO COSTA DEL SUR| 704| PR|
| US| BDA SAN LUIS| 709| PR|
| US| HOLT| 32564| FL|
| US| HOMOSASSA| 34487| FL|
+-------+-------------------+-------+-----+
only showing top 5 rows
Filter Rows To filter the rows from the data, you can use where() function from the DataFrame API.
df.select("country","city","zipcode","state").where("state == 'AZ'").show(5)
+-------+----+-------+-----+
|country|city|zipcode|state|
+-------+----+-------+-----+
| US|MESA| 85209| AZ|
| US|MESA| 85210| AZ|
+-------+----+-------+-----+
Similarly, in SQL you can use WHERE clause as follows.
spark.sql(""" SELECT country, city, zipcode, state FROM ZIPCODES WHERE state = 'AZ' """).show(5)
+-------+----+-------+-----+
|country|city|zipcode|state|
+-------+----+-------+-----+
| US|MESA| 85209| AZ|
| US|MESA| 85210| AZ|
+-------+----+-------+-----+
Sorting To sort rows on a specific column use orderBy() function on DataFrame API.
df.select("country","city","zipcode","state")\
.where("state in ('PR','AZ','FL')") \
.orderBy("state") \
.show(10)
+-------+-------------------+-------+-----+
|country| city|zipcode|state|
+-------+-------------------+-------+-----+
| US| MESA| 85209| AZ|
| US| MESA| 85210| AZ|
| US| HOLT| 32564| FL|
| US| HOMOSASSA| 34487| FL|
| US| HILLIARD| 32046| FL|
| US| HOLDER| 34445| FL|
| US| PARC PARQUE| 704| PR|
| US|PASEO COSTA DEL SUR| 704| PR|
| US| BDA SAN LUIS| 709| PR|
| US| URB EUGENE RICE| 704| PR|
+-------+-------------------+-------+-----+
only showing top 10 rows
In SQL, you can achieve sorting by using ORDER BY clause.
spark.sql(""" SELECT country, city, zipcode, state FROM ZIPCODES
WHERE state in ('PR','AZ','FL') order by state """) \
.show(10)
+-------+-------------------+-------+-----+
|country| city|zipcode|state|
+-------+-------------------+-------+-----+
| US| MESA| 85209| AZ|
| US| MESA| 85210| AZ|
| US| HOLT| 32564| FL|
| US| HOMOSASSA| 34487| FL|
| US| HILLIARD| 32046| FL|
| US| HOLDER| 34445| FL|
| US| PARC PARQUE| 704| PR|
| US|PASEO COSTA DEL SUR| 704| PR|
| US| BDA SAN LUIS| 709| PR|
| US| URB EUGENE RICE| 704| PR|
+-------+-------------------+-------+-----+
only showing top 10 rows
Grouping The groupBy().count() is used to perform the group by on DataFrame.
df.groupBy("state").count() \
.show()
+-----+-----+
|state|count|
+-----+-----+
| AZ| 2|
| NC| 3|
| AL| 3|
| TX| 3|
| FL| 4|
| PR| 5|
+-----+-----+
You can achieve group by in Spark SQL by using GROUP BY clause.
spark.sql(""" SELECT state, count(*) as count FROM ZIPCODES
GROUP BY state""") \
.show()
+-----+-----+
|state|count|
+-----+-----+
| AZ| 2|
| NC| 3|
| AL| 3|
| TX| 3|
| FL| 4|
| PR| 5|
+-----+-----+
SQL Join Operations Similarly, if you have two tables, you can perform the Join operations in Spark. Here is an example
emp = ((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
empColumns = ("emp_id","name","superior_emp_id","year_joined", "emp_dept_id","gender","salary")
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
C:\spark\python\pyspark\sql\context.py:113: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
warnings.warn(
empDF = sqlc.createDataFrame(emp, empColumns)
empDF.show()
+------+--------+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
| 1| Smith| -1| 2018| 10| M| 3000|
| 2| Rose| 1| 2010| 20| M| 4000|
| 3|Williams| 1| 2010| 10| M| 1000|
| 4| Jones| 2| 2005| 10| F| 2000|
| 5| Brown| 2| 2010| 40| | -1|
| 6| Brown| 2| 2010| 50| | -1|
+------+--------+---------------+-----------+-----------+------+------+
dept = (("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
deptColumns = ("dept_name","dept_id")
deptDF = sqlc.createDataFrame(dept,deptColumns)
deptDF.show()
+---------+-------+
|dept_name|dept_id|
+---------+-------+
| Finance| 10|
|Marketing| 20|
| Sales| 30|
| IT| 40|
+---------+-------+
# to work with SQL queries on spark we need to create a temporary view on the data frame
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")\
.show()
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
| 1| Smith| -1| 2018| 10| M| 3000| Finance| 10|
| 3|Williams| 1| 2010| 10| M| 1000| Finance| 10|
| 4| Jones| 2| 2005| 10| F| 2000| Finance| 10|
| 2| Rose| 1| 2010| 20| M| 4000|Marketing| 20|
| 5| Brown| 2| 2010| 40| | -1| IT| 40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
Start coding or generate with AI.
Start coding or generate with AI.