create database ecommerce;
use ecommerce;
add jar /user/acadgild/hive/hive-hcatalog-core-0.14.0.jar;
set hive.exec.dynamic.partition.mode=nonstrict;
****************************************************************************
DDL
****************************************************************************
CREATE EXTERNAL TABLE prod_details(
id string COMMENT 'from deserializer',
prod_id string COMMENT 'from deserializer',
category string COMMENT 'from deserializer')
ROW FORMAT SERDE
'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping'=':key,prod_details:id,prod_details:category',
'serialization.format'='1')
TBLPROPERTIES (
'hbase.table.name'='production_category'
)
CREATE TABLE products_info
_raw(
id STRING,
name STRING,
reseller STRING,
category STRING,
price BIGINT,
discount FLOAT,
profit_percent FLOAT
PARTITIONED BY(
rptg_dt STRING
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerde';
CREATE TABLE products_info_stg
(
product_id STRING,
product_name STRING,
reseller STRING,
category STRING,
price BIGINT,
discount FLOAT,
profit_percent FLOAT
PARTITIONED BY (
rptg_dt STRING
CLUSTERED BY (
product_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE products_info_excp
product_id STRING,
product_name STRING,
reseller STRING,
category STRING,
price BIGINT,
discount FLOAT,
profit_percent FLOAT,
rule_failed STRING
PARTITIONED BY (
rptg_dt STRING
)
CLUSTERED BY (
product_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE products_info_core
product_id STRING,
product_name STRING,
reseller STRING,
category STRING,
price BIGINT,
discount FLOAT,
profit_percent FLOAT
PARTITIONED BY (
rptg_dt STRING
)
CLUSTERED BY
(
product_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE EXTERNAL TABLE user_location(
id string,
user_id string,
city string,
state string
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping'=':key,
user_details:id,
user_details:city,
user_details:state',
'serialization.format'='1'
)
TBLPROPERTIES (
'hbase.table.name'='user_location'
);
CREATE TABLE users_info_raw(
id string,
name string,
location struct<city:string,state:string>,
age bigint,
category string
)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
CREATE TABLE users_info_stg(
user_id string,
name string,
location struct<city:string,state:string>,
age bigint,
occupation string
)
PARTITIONED BY
(
rptg_Dt STRING
)
CLUSTERED BY (
user_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE users_info_excp(
user_id string,
name string,
location struct<city:string,state:string>,
age bigint,
occupation string,
rule_failed STRING
)
PARTITIONED BY
(
rptg_Dt STRING
)
CLUSTERED BY (
user_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE users_info_core(
user_id string,
name string,
location struct<city:string,state:string>,
age bigint,
occupation string
)
PARTITIONED BY
(
rptg_Dt STRING
)
CLUSTERED BY (
user_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE user_activity_raw
(
product_id string,
user_id string,
cancellation string,
return string,
cancellation_reason string,
return_reason string,
order_date string,
shipment_date string,
delivery_date string,
cancellation_date string,
return_date string
)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
CREATE TABLE user_activity_stg
(
product_id string,
user_id string,
cancellation string,
return string,
cancellation_reason string,
return_reason string,
order_date string,
shipment_date string,
delivery_date string,
cancellation_date string,
return_date string
)
PARTITIONED BY
(
rptg_Dt STRING
)
CLUSTERED BY (
product_id,
user_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE user_activity_excp
(
product_id string,
user_id string,
cancellation string,
return string,
cancellation_reason string,
return_reason string,
order_date string,
shipment_date string,
delivery_date string,
cancellation_date string,
return_date string,
rule_failed STRING
)
PARTITIONED BY
(
rptg_Dt STRING
)
CLUSTERED BY (
product_id,
user_id)
INTO 8 BUCKETS
STORED AS ORC;
CREATE TABLE user_activity_core
(
product_id string,
user_id string,
cancellation string,
return string,
cancellation_reason string,
return_reason string,
order_date string,
shipment_date string,
delivery_date string,
cancellation_date string,
return_date string
)
PARTITIONED BY
(
rptg_Dt STRING
)
CLUSTERED BY (
product_id,
user_id)
INTO 8 BUCKETS
STORED AS ORC;
************************************************************************
DATA INSERTION
************************************************************************
LOAD DATA LOCAL INPATH
'/home/acadgild/hive/data/merged_data/product_info_merge.json'
INTO TABLE products_info_raw;
LOAD DATA LOCAL INPATH '/home/acadgild/hive/data/merged_data/user_info_merge.json'
INTO TABLE users_info_raw;
LOAD DATA LOCAL INPATH
'/home/acadgild/hive/data/merged_data/user_activity_merge.json'
INTO TABLE user_activity_raw;
INSERT OVERWRITE TABLE products_info_stg
PARTITION (rptg_dt)
SELECT
id,
name,
reseller,
category,
price,
discount,
profit_percent,
from_unixtime(cast(unix_timestamp() as bigint),'yyyy-MM-dd') as rptg_dt
FROM products_info_raw;
INSERT OVERWRITE TABLE users_info_stg
PARTITION (rptg_dt)
SELECT
id,
name,
location,
age,
category,
from_unixtime(cast(unix_timestamp() as bigint),'yyyy-MM-dd') as rptg_dt
FROM users_info_raw;
INSERT OVERWRITE TABLE user_activity_stg
PARTITION (rptg_dt)
SELECT
product_id,
user_id,
cancellation,
return,
cancellation_reason,
return_reason,
order_date,
shipment_date,
delivery_date,
cancellation_date,
return_date,
--from_unixtime(cast(unix_timestamp() as bigint),'yyyy-MM-dd') as rptg_dt
'2016-10-12' as rptg_dt
FROM user_activity_raw;
===================================================================================
==============================
FROM products_info_stg p
LEFT OUTER JOIN prod_details l ON
p.product_id=l.prod_id AND p.rptg_dt=from_unixtime(cast(unix_timestamp() as
bigint),'yyyy-MM-dd')
INSERT OVERWRITE TABLE products_info_excp
PARTITION (rptg_dt)
SELECT
p.product_id,
p.product_name,
p.reseller,
p.category,
p.price,
p.discount,
p.profit_percent,
CASE WHEN p.product_id IS NULL THEN 'R1'
WHEN p.discount >= p.price THEN 'R2'
END AS rule_failed,
p.rptg_dt
WHERE (p.product_id IS NULL) OR (p.discount >= p.price)
INSERT OVERWRITE TABLE products_info_core
PARTITION (rptg_dt)
SELECT
p.product_id,
p.product_name,
p.reseller,
CASE WHEN p.category IS NULL THEN l.category
ELSE p.category
END AS category,
p.price,
p.discount,
p.profit_percent,
p.rptg_dt
WHERE (p.product_id IS NOT NULL) AND (p.discount <= p.price);
===============================================================================
FROM users_info_stg p
LEFT OUTER JOIN user_location l
ON
p.user_id=l.user_id AND p.rptg_dt=from_unixtime(cast(unix_timestamp() as
bigint),'yyyy-MM-dd')
INSERT OVERWRITE TABLE users_info_excp
PARTITION (rptg_dt)
SELECT
p.user_id,
p.name,
p.location,
p.age,
p.occupation,
CASE WHEN p.user_id IS NULL THEN 'R1'
WHEN p.age <= 0 THEN 'R3'
END AS rule_failed,
p.rptg_dt
WHERE (p.user_id IS NULL) OR (p.age < 1)
INSERT OVERWRITE TABLE users_info_core
PARTITION (rptg_dt)
SELECT
p.user_id,
p.name,
CASE WHEN (p.location.city IS NULL) AND (p.location.state IS NULL) THEN
named_struct('city',l.city,'state',l.state)
WHEN (p.location.city IS NULL) AND (p.location.state IS NOT NULL) THEN
named_struct('city',l.city,'state',p.location.state)
WHEN (p.location.city IS NOT NULL) AND (p.location.state IS NULL) THEN
named_struct('city',p.location.city,'state',l.state)
ELSE p.location
END AS location,
p.age,
p.occupation,
p.rptg_dt
WHERE (p.user_id IS NOT NULL) AND (p.age >= 1);
===============================================================================
FROM user_activity_stg p
LEFT OUTER JOIN user_location l
ON
p.user_id=l.user_id AND p.rptg_dt=from_unixtime(cast(unix_timestamp() as
bigint),'yyyy-MM-dd')
LEFT OUTER JOIN prod_details pd ON p.product_id=pd.id
INSERT OVERWRITE TABLE user_activity_excp
PARTITION (rptg_dt)
SELECT
p.product_id,
p.user_id,
p.cancellation,
p.return,
p.cancellation_reason,
p.return_reason,
p.order_date,
p.shipment_date,
p.delivery_date,
p.cancellation_date,
p.return_date,
CASE WHEN (p.product_id IS NULL) OR (p.user_id IS NULL) THEN 'R1'
WHEN (p.order_date > p.shipment_date) THEN 'R2'
ELSE 'NA'
END AS rule_failed
WHERE (p.user_id IS NULL) OR (p.product_id IS NULL) OR (p.order_date >
p.shipment_date)
INSERT OVERWRITE TABLE user_activity_core
PARTITION (rptg_dt)
SELECT
p.product_id,
p.user_id,
p.cancellation,
p.return,
p.cancellation_reason,
p.return_reason,
p.order_date,
p.shipment_date,
p.delivery_date,
p.cancellation_date,
p.return_date,
p.rptg_dt
WHERE (p.user_id IS NOT NULL) AND (p.product_id IS NOT NULL) AND (p.order_date <=
p.shipment_date);
===================================================================================
====================================