%spark
val sea_level_df = spark.sql("SELECT * FROM csiro_global_mean_sea_level_2013")
val fossil_fuel_df = spark.sql("SELECT * FROM global_carbon_emission_2018")
val temperature_df = spark.sql("SELECT * FROM global_temperature_2019")
val disasters_df = spark.sql("SELECT * FROM global_natural_disaster_events_2018")
%spark
// Assuming 'disasters_df' is your DataFrame
val natural_disasters_df = disasters_df.filter("Entity == 'All natural disasters'")
%spark
// Assuming 'df' is your DataFrame
val nat_disasters_df = natural_disasters_df.filter("Year >= 1959 AND Year <= 2013")
%spark
// Show the result
nat_disasters_df.show()
%spark
val fossil_fuels_df = fossil_fuel_df.filter("Year >= 1959 AND Year <= 2013")
%spark
fossil_fuels_df.show()
%spark
val global_sea_level_df = sea_level_df.filter("Year >= 1959 AND Year <= 2013")
%spark
global_sea_level_df.show()
%spark
val global_temperature_df = temperature_df.filter("Year >= 1959 AND Year <= 2013")
%spark
global_temperature_df.show()
%spark
val selected_disasters_df = nat_disasters_df.select("year", "total_natural_disasters")
selected_disasters_df.show()
%spark
val selected_fossil_fuels_df = fossil_fuels_df.select("year", "carbon_emissions")
selected_fossil_fuels_df.show()
%spark
val selected_sea_level_df = global_sea_level_df.select("year", "mean_sea_level", "month")
selected_sea_level_df.show()
%spark
val selected_temperature_df = global_temperature_df.select("year", "global_temperature",
"month")
selected_temperature_df.show()
%spark
val temp_df = selected_temperature_df.filter("month = 12")
val global_temp_df = temp_df.select("year", "global_temperature")
// Show the result
global_temp_df.show()
%spark
val sealevel_df = selected_sea_level_df.filter("month = 'Dec'")
val global_mean_sea_level_df = sealevel_df.select("year", "mean_sea_level")
// Show the result
global_mean_sea_level_df.show()
%spark
val table_df = global_temp_df
.join(selected_fossil_fuels_df, Seq("year"))
.join(global_mean_sea_level_df, Seq("year"))
.join(selected_disasters_df, Seq("year"))
// Show the result
table_df.show()
%spark
table_df.createOrReplaceTempView("table")
%sql
show tables
%sql
select
year,
global_temperature,
carbon_emissions,
LOG(mean_sea_level) AS log_mean_sea_level,
LOG(total_natural_disasters) AS log_total_natural_disasters
FROM table where year BETWEEN 1990 AND 2013
%spark
// Assuming `table_df` is your DataFrame
table_df.createOrReplaceTempView("my_table")
val plotData = spark.sql("""
SELECT carbon_emissions, total_natural_disasters
FROM my_table
""")
// Register the DataFrame as a temporary table for visualization
plotData.createOrReplaceTempView("plot_data")
%sql
SELECT * FROM plot_data
%sql
SELECT
year,
carbon_emissions
FROM
table where year BETWEEN 1990 AND 2013
%spark
// Import required Spark libraries
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
// Assuming 'df' is your DataFrame
val cleanedData = table_df.na.drop()
// Assemble features into a single vector column
val assembler = new VectorAssembler()
.setInputCols(Array("year", "global_temperature", "carbon_emissions", "mean_sea_level"))
.setOutputCol("features")
// Linear Regression model
val lr = new LinearRegression()
.setLabelCol("total_natural_disasters")
.setFeaturesCol("features")
// Create a pipeline
val pipeline = new Pipeline().setStages(Array(assembler, lr))
// Split the data into training and testing sets
val Array(trainingData, testData) = cleanedData.randomSplit(Array(0.8, 0.2))
// Fit the model to the training data
val model = pipeline.fit(trainingData)
// Make predictions on the test data
val predictions = model.transform(testData)
// Evaluate the model
val evaluator = new RegressionEvaluator()
.setLabelCol("total_natural_disasters")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data: $rmse")
// Show the predictions
predictions.select("year", "total_natural_disasters", "prediction").show()
%spark
// Create a temporary view for predictions
predictions.createOrReplaceTempView("predictions_table")
// Query the predictions data
val predictionsTableDF = spark.sql("SELECT year, total_natural_disasters, prediction FROM
predictions_table")
%sql
SELECT
year,
total_natural_disasters,
prediction
FROM predictions_table