0% found this document useful (0 votes)
3K views2 pages

T15 Hand-On Solution Id 80827

The document contains a Python script that utilizes PySpark for data processing, specifically for loading, cleaning, and analyzing loan data from an S3 bucket. It includes functions to read data, clean it by removing nulls and duplicates, and perform analysis to calculate income-to-installment ratios and default rates based on loan purposes. Additionally, the script provides functionality to load processed data into Redshift and S3, with placeholders for bucket names and JDBC URLs.

Uploaded by

kotresh7477
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
3K views2 pages

T15 Hand-On Solution Id 80827

The document contains a Python script that utilizes PySpark for data processing, specifically for loading, cleaning, and analyzing loan data from an S3 bucket. It includes functions to read data, clean it by removing nulls and duplicates, and perform analysis to calculate income-to-installment ratios and default rates based on loan purposes. Additionally, the script provides functionality to load processed data into Redshift and S3, with placeholders for bucket names and JDBC URLs.

Uploaded by

kotresh7477
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 2

# -*- coding: utf-8 -*-

import os
import shutil
import pyspark
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import traceback

def read_data(spark, customSchema):

#Mention the Bucket name inside the bucket_name variable


bucket_name = "loan-data1234" # Replace with your bucket name
s3_input_path = "s3://" + bucket_name + "/inputfile/loan_data.csv"

df = spark.read.csv(s3_input_path, header=True, schema=customSchema)

return df

def clean_data(input_df):

df = input_df.dropna().dropDuplicates()
df = df.filter(df.purpose != 'null')

return df

def s3_load_data(data,file_name):

#Mention the bucket name inside the bucket_name variable


bucket_name = "loan-data1234"
output_path = "s3://" + bucket_name + "/output"+ file_name

if data.count() != 0:
print("Loading the data", output_path)
#write the s3 load data command here
data.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

else:
print("Empty dataframe, hence cannot save the data", output_path)

def result_1(input_df):

df = input_df.filter((col("purpose") == "educational") | (col("purpose") ==


"small_business"))
df = df.withColumn("income_to_installment_ratio", col("log_annual_inc") /
col("installment"))
df = df.withColumn("int_rate_category",
when(col("int_rate") < 0.1, "low")
.when((col("int_rate") >= 0.1) & (col("int_rate") < 0.15), "medium")
.otherwise("high")
)
df = df.withColumn("high_risk_borrower",
when((col("dti") > 20) | (col("fico") < 700) | (col("revol_util") > 80), 1)
.otherwise(0)
)

return df

def result_2(input_df):

df = input_df.groupBy("purpose").agg(
(sum(col("not_fully_paid")) / count("*")).alias("default_rate")
)
df = df.withColumn("default_rate", round(col("default_rate"), 2))

return df

def redshift_load_data(data):

if data.count() != 0:
print("Loading the data into Redshift...")
jdbcUrl = "your-jdbc-url" # Replace with your Redshift JDBC URL
username = "awsuser"
password = "Awsuser1"
table_name = "result_2"

#Write the redshift load data command here


data.write \
.format("jdbc") \
.option("url", jdbcUrl) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.mode("overwrite") \
.save()

else:
print("Empty dataframe, hence cannot load the data")

You might also like