SCD Type 2
In databases ,there is a implementation of Slow Changing Dimension Type 2 .So below will help us to implement the same in Spark.
I am implementing SCD Type 2.
Say we have a Actual Data(Current) like below :
+---+--------+------+-----+-----+---------------------+---------------------+
|id |fname |lname |zip |amt |start_dt |end_dt |
+---+--------+------+-----+-----+---------------------+---------------------+
|1 |vinyas |shetty|28213|32.23|2017-02-21 00:00:00.0|2018-03-28 00:00:00.0|
|1 |vinyas |shetty|60606|23.34|2018-03-29 00:00:00.0|2099-12-31 00:00:00.0|
|2 |namratha|rao |2811 |22.32|2018-03-31 00:00:00.0|2099-12-31 00:00:00.0|
|3 |harold |finc |2877 |56.43|2018-05-01 00:00:00.0|2099-12-31 00:00:00.0|
+---+--------+------+-----+-----+---------------------+---------------------+
SnapShot Data(ie Latest Data) :
+---+--------+------+-----+-----+---------------------+---------------------+
|id |fname |lname |zip |amt |start_dt |end_dt |
+---+--------+------+-----+-----+---------------------+---------------------+
|1 |vinyas |shetty|60611|23.22|2018-04-01 00:00:00.0|2099-12-31 00:00:00.0|
|2 |namratha|shetty|2811 |30.23|2018-04-01 00:00:00.0|2099-12-31 00:00:00.0|
+---+--------+------+-----+-----+---------------------+---------------------+
End Result :
+---+--------+------+-----+-----+---------------------+---------------------+
|id |fname |lname |zip |amt |start_dt |end_dt |
+---+--------+------+-----+-----+---------------------+---------------------+
|1 |vinyas |shetty|28213|32.23|2017-02-21 00:00:00.0|2018-03-28 00:00:00.0|
|1 |vinyas |shetty|60606|23.34|2018-03-29 00:00:00.0|2018-03-31 23:00:00.0|
|1 |vinyas |shetty|60611|23.22|2018-04-01 00:00:00.0|2099-12-31 00:00:00.0|
|2 |namratha|rao |2811 |22.32|2018-03-31 00:00:00.0|2018-03-31 23:00:00.0|
|2 |namratha|shetty|2811 |30.23|2018-04-01 00:00:00.0|2099-12-31 00:00:00.0|
|3 |harold |finc |2877 |56.43|2018-05-01 00:00:00.0|2099-12-31 00:00:00.0|
+---+--------+------+-----+-----+---------------------+---------------------+
Application conf in resources directory :
cdc-type2 {
start_dt="start_dt"
end_dt="end_dt"
end_value="2099-12-31 00:00:00"
unique_columns="id"
end_dt_logic="1 HOUR"
history_path="<hdfs_location>"
snapshot_path="<hdfs_location>"
output_path="<hdfs_location>"
format="parquet"
}
Certain Rules/Pre- Requisitities:
Schema of history and snapshot data should be same else will fail
history should NOT have dups (ie for a given unique columns there should be only one record where end_dt is end_value).
snapshot should NOT have dups (ie for a given unique columns there should be only one record where end_dt is end_value).
end_dt_logic : basically subtracts the new record start dt value with end_dt_logic value
and puts that value as end_dt of the old record ,which needs to be updated.
we can use MINUTE,HOUR,SECOND,DAY etc.
unique_column : columns which make the records unique,can be comma separated if multiple.Do NOT include start_dt end_dt columns.
If we have active record with same unique column,start_dt in both history and snapshot,then code
will randomly pick one and drop the other because it will NOT know which is the correct one.
Code :
<Need to Provide GIT Code link>
package com.vin.cdc
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import com.vin.cdc.cdcType2.cdcType2Main
import org.apache.spark.sql.functions._
object cdcMain {
def main(args:Array[String])={
val spark = SparkSession.builder().appName("CDC").enableHiveSupport().getOrCreate()
/*
Input Data and SnapShot data expected to have same schema and order
*/
val conf = ConfigFactory.load()
val start_dt = conf.getString("cdc-type2.start_dt")
val end_dt = conf.getString("cdc-type2.end_dt")
val end_value = conf.getString("cdc-type2.end_value")
val unique_columns = conf.getString("cdc-type2.unique_columns")
val end_dt_logic = conf.getString("cdc-type2.end_dt_logic")
val format = conf.getString("cdc-type2.format")
val hist_path = conf.getString("cdc-type2.history_path")
val snap_path = conf.getString("cdc-type2.snapshot_path")
val opt_path = conf.getString("cdc-type2.output_path")
val hist = spark.read.format(format).load(hist_path)
val snap = spark.read.format(format).load(snap_path)
println("INPUT DF1 :" )
hist.show()
println("INPUT DF2 :")
snap.show()
val final_res = cdcType2Main(spark,hist,snap,start_dt,end_dt,end_value,unique_columns,end_dt_logic)
final_res.write.mode("overwrite").format("parquet").save(opt_path)
}
}
package com.vin.cdc
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object cdcType2 {
def cdcType2Main(spark:SparkSession, hist:DataFrame, snap:DataFrame,
start_dt:String,end_dt:String,end_value:String,unique_columns:String,end_dt_logic:String):DataFrame={
validation(hist,snap,start_dt,end_dt,unique_columns)
val end_value_ts = lit(end_value).cast(TimestampType)
val (active_records,closed_records) = splitHistActiveClosed(hist,start_dt,end_dt,end_value_ts)
println("ACTIVE Records ")
active_records.show(false)
println("CLOSED RECORD ")
closed_records.show(false)
//Just a extra check to make sure snapshot has only active records
val active_snap = snap.filter(snap(end_dt) <=> end_value_ts)
val active_with_cdc = cdcLogicImpl(spark,active_records,active_snap,unique_columns,
start_dt,end_dt,end_value,end_dt_logic)
println("ACTIVE WITH CDC ")
active_with_cdc.show(false)
val final_result = active_with_cdc.union(closed_records)
final_result
}
def validation(hist:DataFrame,snap:DataFrame,
start_dt:String,end_dt:String,
unique_columns:String) ={
val hist_schema = hist.schema
val snap_schema = snap.schema
val diff_schema = hist_schema.toSet.union(snap_schema.toSet).diff(hist_schema.toSet.intersect(snap_schema.toSet))
if(!diff_schema.isEmpty){
throw new Exception(
s"""
|Schema between History and SnapShot Did Not Match
|History : ${hist_schema}
|Snapshot : ${snap_schema}
|Difference : ${diff_schema}
""".stripMargin)
}
//We will let Spark throw the error java.lang.IllegalArgumentException if element NOt found
val dummy_val = hist_schema(Set(start_dt,end_dt) ++ unique_columns.split(",").toSet)
}
def splitHistActiveClosed( hist:DataFrame,
start_dt:String,end_dt:String,end_value_ts:Column):(DataFrame,DataFrame)={
val active_records = hist.filter(hist(end_dt) <=> end_value_ts)
val closed_records = hist.filter(!(hist(end_dt) <=> end_value_ts))
(active_records,closed_records)
}
def cdcLogicImpl(spark:SparkSession,active_records:DataFrame,snap:DataFrame,
unique_columns:String,start_dt:String,end_dt:String,
end_value:String,end_dt_logic:String):DataFrame={
import spark.implicits._
/*Need to think about this,do we fail when we have two active records with same start_dt,
currently it just just drops one of them
*/
val union_data = active_records.union(snap)
.dropDuplicates(start_dt :: end_dt :: unique_columns.split(",").map(x=>x.trim).toList)
val uniq = unique_columns.split(",").map(x=> col(x))
val wndw = Window.partitionBy(uniq:_*).orderBy(col(start_dt).asc)
//Code will fail if actual end_dt column is also called as "new_end_date".Need to look at handling end_value null
val cdc_data = union_data.select($"*",
(lead(col(start_dt) - expr(s"INTERVAL ${end_dt_logic}"),1,end_value).over(wndw) ).as("new_end_date"))
cdc_data.drop(end_dt).withColumnRenamed("new_end_date",end_dt)
}
}