0% found this document useful (0 votes)
48 views6 pages

Spark

The document shows how to perform data exploration and linear regression on multiple datasets using Spark SQL and MLlib. It loads climate and disaster datasets, cleans and joins the data, builds a linear regression model to predict natural disasters from other variables, and evaluates the model's performance.

Uploaded by

sholyfila
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
48 views6 pages

Spark

The document shows how to perform data exploration and linear regression on multiple datasets using Spark SQL and MLlib. It loads climate and disaster datasets, cleans and joins the data, builds a linear regression model to predict natural disasters from other variables, and evaluates the model's performance.

Uploaded by

sholyfila
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 6

%spark

val sea_level_df = spark.sql("SELECT * FROM csiro_global_mean_sea_level_2013")

val fossil_fuel_df = spark.sql("SELECT * FROM global_carbon_emission_2018")

val temperature_df = spark.sql("SELECT * FROM global_temperature_2019")

val disasters_df = spark.sql("SELECT * FROM global_natural_disaster_events_2018")

%spark

// Assuming 'disasters_df' is your DataFrame

val natural_disasters_df = disasters_df.filter("Entity == 'All natural disasters'")

%spark

// Assuming 'df' is your DataFrame

val nat_disasters_df = natural_disasters_df.filter("Year >= 1959 AND Year <= 2013")

%spark

// Show the result

nat_disasters_df.show()

%spark

val fossil_fuels_df = fossil_fuel_df.filter("Year >= 1959 AND Year <= 2013")

%spark

fossil_fuels_df.show()

%spark

val global_sea_level_df = sea_level_df.filter("Year >= 1959 AND Year <= 2013")

%spark

global_sea_level_df.show()
%spark

val global_temperature_df = temperature_df.filter("Year >= 1959 AND Year <= 2013")

%spark

global_temperature_df.show()

%spark

val selected_disasters_df = nat_disasters_df.select("year", "total_natural_disasters")

selected_disasters_df.show()

%spark

val selected_fossil_fuels_df = fossil_fuels_df.select("year", "carbon_emissions")

selected_fossil_fuels_df.show()

%spark

val selected_sea_level_df = global_sea_level_df.select("year", "mean_sea_level", "month")

selected_sea_level_df.show()

%spark

val selected_temperature_df = global_temperature_df.select("year", "global_temperature",


"month")

selected_temperature_df.show()

%spark

val temp_df = selected_temperature_df.filter("month = 12")

val global_temp_df = temp_df.select("year", "global_temperature")

// Show the result

global_temp_df.show()
%spark

val sealevel_df = selected_sea_level_df.filter("month = 'Dec'")

val global_mean_sea_level_df = sealevel_df.select("year", "mean_sea_level")

// Show the result

global_mean_sea_level_df.show()

%spark

val table_df = global_temp_df

.join(selected_fossil_fuels_df, Seq("year"))

.join(global_mean_sea_level_df, Seq("year"))

.join(selected_disasters_df, Seq("year"))

// Show the result

table_df.show()

%spark

table_df.createOrReplaceTempView("table")

%sql

show tables

%sql

select

year,

global_temperature,

carbon_emissions,

LOG(mean_sea_level) AS log_mean_sea_level,

LOG(total_natural_disasters) AS log_total_natural_disasters

FROM table where year BETWEEN 1990 AND 2013


%spark

// Assuming `table_df` is your DataFrame

table_df.createOrReplaceTempView("my_table")

val plotData = spark.sql("""

SELECT carbon_emissions, total_natural_disasters

FROM my_table

""")

// Register the DataFrame as a temporary table for visualization

plotData.createOrReplaceTempView("plot_data")

%sql

SELECT * FROM plot_data

%sql

SELECT

year,

carbon_emissions

FROM

table where year BETWEEN 1990 AND 2013

%spark

// Import required Spark libraries

import org.apache.spark.ml.regression.LinearRegression

import org.apache.spark.ml.feature.VectorAssembler

import org.apache.spark.ml.Pipeline

import org.apache.spark.ml.evaluation.RegressionEvaluator
// Assuming 'df' is your DataFrame

val cleanedData = table_df.na.drop()

// Assemble features into a single vector column

val assembler = new VectorAssembler()

.setInputCols(Array("year", "global_temperature", "carbon_emissions", "mean_sea_level"))

.setOutputCol("features")

// Linear Regression model

val lr = new LinearRegression()

.setLabelCol("total_natural_disasters")

.setFeaturesCol("features")

// Create a pipeline

val pipeline = new Pipeline().setStages(Array(assembler, lr))

// Split the data into training and testing sets

val Array(trainingData, testData) = cleanedData.randomSplit(Array(0.8, 0.2))

// Fit the model to the training data

val model = pipeline.fit(trainingData)

// Make predictions on the test data

val predictions = model.transform(testData)

// Evaluate the model

val evaluator = new RegressionEvaluator()

.setLabelCol("total_natural_disasters")

.setPredictionCol("prediction")

.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)

println(s"Root Mean Squared Error (RMSE) on test data: $rmse")

// Show the predictions

predictions.select("year", "total_natural_disasters", "prediction").show()

%spark

// Create a temporary view for predictions

predictions.createOrReplaceTempView("predictions_table")

// Query the predictions data

val predictionsTableDF = spark.sql("SELECT year, total_natural_disasters, prediction FROM


predictions_table")

%sql

SELECT

year,

total_natural_disasters,

prediction

FROM predictions_table

You might also like