- class DictQuery(dict):
- def get(self, path, default=None):
- keys = path.split("/")
- value = None
- for key in keys:
- if value:
- if isinstance(value, list):
- value = [v.get(key, default) if v else None for v in value]
- else:
- value = value.get(key, default)
- else:
- value = dict.get(self, key, default)
- if not value:
- break
- return value
- class Script:
- @staticmethod
- def execute(spark, input, execParams):
- from pyspark.sql.functions import udf, col, lit, to_timestamp, from_unixtime, to_date,row_number,when,expr,from_utc_timestamp,date_format
- from datetime import date
- from pyspark.sql.window import Window
- # Defining UDF Functions
- policyConfigurationUDF = udf(
- lambda data, dimension: Script.handlePolicyConfiguration(data, dimension))
- policyPricingUDF = udf(
- lambda data, field: Script.policyPrice(data, field))
- # epochUDF = udf(lambda t: Script.epoch_to_time(t))
- windowSpec = Window.partitionBy("policy_tenure_id","policy_id","addon_id","record_version").orderBy(col("approx_creation_datetime").desc())
- output = ['policy_renewal_offer_id', 'record_version', 'current_policy_tenure_id','is_active', 'is_record_active', 'addon_total_price','last_updated_timestamp', 'partner_renewal_offer_id', 'policy_event_id', 'policy_id', 'renewal_action', 'renewal_date_timestamp', 'creation_timestamp', 'renewal_offer_expiry_timestamp', 'contentscover_amount', 'excess_amount', 'payment_frequency', 'base_price_amount', 'total_amount_annual', 'total_amount_monthly', 'firstmonth_amount', 'subsequentmonth_amount', 'apr', 'interest_amount', 'interest_percentage', 'premium', 'auto_renewal_flag', 'existing_contents_cover_limit_amount', 'existing_excess_amount', 'existing_total_amount', 'existing_base_amount', 'existing_payment_frequency', 'existing_addon_id', 'existing_addon_total_amount', 'addon_key', 'creation_date','renewal_date_timestamp_local','creation_timestamp_local','last_updated_timestamp_local','renewal_offer_expiry_timestamp_local', 'addon_id', 'is_prod', 'renewal_no_offer_reason_code']
- df = spark.sql('select * from pr')
- print('read source df')
- print(df.show(),flush=True)
- print("Number of Records (INPUT): " + str(df.count()),flush=True)
- df2 = spark.sql("select distinct policy_id as p_policy_id from p where is_prod=0")
- # # Adding Columns to dataframe & handling Data Transformation
- # # Flattening Policy Configuration Column
- df = df.withColumn("currency", policyConfigurationUDF(col("renewal_policy_details"), lit("CURRENCY")))
- df = df.withColumn("excess_amount", policyConfigurationUDF(col("renewal_policy_details"), lit("EXCESS")).cast("double"))
- df = df.withColumn("contentscover_amount", policyConfigurationUDF(col("renewal_policy_details"), lit("CONTENTSCOVER")).cast("double"))
- df = df.withColumn("payment_frequency", policyConfigurationUDF(col("renewal_policy_details"), lit("PAYMENTFREQUENCY")))
- df = df.withColumn("base_price_amount", policyConfigurationUDF(col("renewal_policy_details"),lit("BASE_AMOUNT")).cast("double"))
- print('policyConfigurationUDF')
- print(df.show(truncate=False))
- df = df.withColumn("renewal_date_timestamp", from_unixtime(col("renewal_date_timestamp")).cast("timestamp"))
- df = df.withColumn("creation_timestamp", from_unixtime(col("creation_timestamp")).cast("timestamp"))
- df = df.withColumn("renewal_offer_expiry_timestamp", from_unixtime(col("renewal_offer_expiry_timestamp")).cast("timestamp"))
- df = df.withColumn("creation_date", to_date(col("creation_timestamp").cast("timestamp")))
- df = df.withColumn("last_updated_timestamp", from_unixtime(col("last_updated_timestamp")/1000).cast("timestamp"))
- df = df.withColumn('dw_load_date', lit(date.today()))
- renewalDF = Script.createRenewalDeatilsDF(spark,df)
- print("renewalDF")
- print(renewalDF.show(truncate=False))
- existingPolicyPriceDF = Script.createexistingPolicyDetailsDF(spark,df)
- print("existingPolicyPriceDF")
- print(existingPolicyPriceDF.show(truncate=False))
- # Joing Addons Dataframe with main Dataframe, checking that PK do not contain null or None (replecing with ""), removing duplicate records by distinct()
- pricedf = renewalDF.join(existingPolicyPriceDF, (renewalDF.id == existingPolicyPriceDF.id) & (renewalDF.version ==existingPolicyPriceDF.version))
- print("join df")
- print(pricedf.show(truncate=False))
- pricedf=pricedf.drop(renewalDF.id)
- pricedf=pricedf.drop(renewalDF.version)
- print(pricedf.show(truncate=False))
- addOnDF = Script.addOnsDF(spark,df)
- pricedf = pricedf.join(addOnDF, (pricedf.id == addOnDF.id) & (pricedf.version == addOnDF.version))
- print(pricedf.show(truncate=False))
- pricedf=pricedf.drop(addOnDF.id)
- pricedf=pricedf.drop(addOnDF.version)
- print(pricedf.show(truncate=False))
- df=df.join(pricedf,(df.policy_renewal_offer_id == pricedf.id) & (df.record_version == pricedf.version))
- print('df')
- print(df.show(truncate=False))
- df = df.withColumn("total_amount_annual", col("total_amount_annual").cast("double")).withColumn("total_amount_monthly", col("total_amount_monthly").cast("double")).withColumn("firstmonth_amount", col("firstmonth_amount").cast("double")).withColumn("subsequentmonth_amount", col("subsequentmonths_amount").cast("double")).withColumn("apr", col("apr").cast("double")).withColumn("existing_contents_cover_limit_amount", col("existing_contents_cover_limit_amount").cast("double")).withColumn("existing_excess_amount", col("existing_excess_amount").cast("double")).withColumn("existing_base_amount", col("existing_base_amount").cast("double")).withColumn("existing_total_amount", col("existing_total_amount").cast("double")).withColumn("existing_addon_id", col("existing_addon_id").cast("string")).withColumn("existing_addon_total_amount", col("existing_addon_total_amount").cast("double")) .withColumn("interest_amount", col("interest_amount").cast("double")).withColumn("interest_percentage", col("interest_percentage").cast("double")).withColumn("premium", col("premium").cast("double")).withColumn("addon_total_price", col("addon_amount").cast("double"))
- df = df.withColumn('auto_renewal_flag',expr("case when auto_renewal_flag='True' then 1 else 0 end"))
- # df = df.withColumn("approx_creation_datetime",when(col("approx_creation_datetime")==None ,None).otherwise(from_unixtime(epochUDF("approx_creation_datetime")).cast('timestamp'))).withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number")==1)
- df= df.join(df2, df.policy_id == df2.p_policy_id, "left")
- df = df.withColumn("is_prod", expr("case when (creation_date >'2023-09-22' or policy_id=p_policy_id) then 0 " + "else 1 end"))
- df.slect(col("is_prpd")).show()
- df = df.withColumn("creation_timestamp_local",from_utc_timestamp(col("creation_timestamp"),"Europe/London"))
- df = df.withColumn("last_updated_timestamp_local",from_utc_timestamp(col("last_updated_timestamp"),"Europe/London"))
- df = df.withColumn("renewal_date_timestamp_local",from_utc_timestamp(col("renewal_date_timestamp"),"Europe/London"))
- df = df.withColumn("renewal_offer_expiry_timestamp_local",from_utc_timestamp(col("renewal_offer_expiry_timestamp"),"Europe/London"))
- df = df.withColumn('addon_key',expr("case when addon_id is not null or trim(addon_id)='' then addon_id when existing_addon_id is not null or existing_addon_id='' then existing_addon_id else 'NA' end"))
- df = df.withColumn("addon_key", expr("CASE WHEN trim(addon_key)='' then 'NA' else addon_key END"))
- df = df.select(*output).na.fill("NA", ["addon_key"]).distinct().filter(col("is_record_active") == 1)
- print(df.printSchema())
- print('output')
- print("Number of Records (OUTPUT): " + str(df.count()),flush=True)
- print(df.show(),flush=True)
- return df
- # @staticmethod
- # def epoch_to_time(time):
- # if time==None:
- # return None
- # time = str(time)[0:10]
- # return int(time)
- # Function to handle values for currency,excess_amount,contentscover_amount,payment_frequency (For no value: it would return None for String & 0 for Number)
- @staticmethod
- def handlePolicyConfiguration(data, dimension):
- # handle if data is None
- if data==None and (dimension=='CURRENCY' or dimension=='PAYMENTFREQUENCY'):
- return None
- elif data==None:
- return 0
- import json
- data = json.loads(data)
- if dimension == "CONTENTSCOVER":
- return(DictQuery(data).get('contentsCoverLimit/amountValue',0))
- elif dimension == "EXCESS":
- return(DictQuery(data).get('excessLevel/amountValue',0))
- elif dimension == "PAYMENTFREQUENCY":
- data = data.get('renewalPolicyPrice', [])[0]
- return(DictQuery(data).get('renewalPolicyPrice/paymentFrequency',None))
- elif dimension == "CURRENCY":
- return(DictQuery(data).get('excessLevel/currency',DictQuery(data).get('contentsCoverLimit/currency',None)))
- elif dimension == "BASE_AMOUNT":
- data = data.get('renewalPolicyPrice', [])[0]
- return(DictQuery(data).get('basePrice/totalPrice/amountValue',None))
- # Handle AddOns
- @staticmethod
- def createexistingPolicyDetailsDF(spark,df):
- import json
- from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DecimalType
- existingPolicyDetailsSchema = StructType([StructField('id',StringType(),False),StructField('version',IntegerType(),False), StructField('auto_renewal_flag',StringType(),True),
- StructField('existing_contents_cover_limit_amount',StringType(),True),StructField('existing_excess_amount',StringType(),True),
- StructField('existing_base_amount',StringType(),True),StructField('existing_payment_frequency',StringType(),True),
- StructField('existing_total_amount',StringType(),True)])
- data_collect = df.collect()
- rows=[]
- for record in data_collect:
- amt_list = []
- data = record["existing_policy_details"]
- amt_list.append(record["policy_renewal_offer_id"])
- amt_list.append(record["record_version"])
- if data == None:
- amt_list.extend(["False","0","0","0","", "0"])
- rows.append(tuple(amt_list))
- else:
- data = json.loads(data)
- amt_list.extend([str(DictQuery(data).get('autoRenewalFlag', False)),
- str(DictQuery(data).get('contentsCoverLimit/amountValue', 0)),
- str(DictQuery(data).get('excessLevel/amountValue', 0)),
- str(DictQuery(data).get('existingPolicyPrice/basePrice/totalPrice/amountValue', 0)),
- str(DictQuery(data).get('existingPolicyPrice/currentPaymentFrequency', 0)),
- str(DictQuery(data).get('existingPolicyPrice/totalPrice/totalPrice/amountValue', 0))])
- rows.append(tuple(amt_list))
- print('rows')
- print(rows)
- new_df = spark.createDataFrame(rows, existingPolicyDetailsSchema)
- return new_df
- # Handle renewal details
- @staticmethod
- def createRenewalDeatilsDF(spark,df):
- import json
- from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DecimalType
- addOn_schema = StructType([StructField('id',StringType(),False),StructField('version',IntegerType(),False),StructField('total_amount_annual',StringType(),True),
- StructField('total_amount_monthly',StringType(),True),StructField('firstmonth_amount',StringType(),True),
- StructField('subsequentmonths_amount',StringType(),True),StructField('apr',StringType(),True),
- StructField('interest_amount',StringType(),True),StructField('interest_percentage',StringType(),True),
- StructField('premium',StringType(),True)])
- data_collect = df.collect()
- rows=[]
- for record in data_collect:
- amt_list = []
- data = record["renewal_policy_details"]
- amt_list.append(record["policy_renewal_offer_id"])
- amt_list.append(record["record_version"])
- if data == None:
- amt_list.extend(["0","0","0","0","0","0","0","0"])
- rows.append(tuple(amt_list))
- else:
- data = json.loads(data)
- policyPriceData = data.get('renewalPolicyPrice', [])
- annual_amount, monthly_amount = [], []
- for obj in policyPriceData:
- amt = obj.get("totalPrice")
- paymentFrequency = obj.get("paymentFrequency")
- if(paymentFrequency == "Annual"):
- annual_amount = [str(DictQuery(amt).get('annualPrice/totalPrice/amountValue', 0))]
- elif(paymentFrequency=="EqualMonthly12"):
- monthly_amount = [str(DictQuery(amt).get('equalMonthlyPriceWithInterest/totalPrice/amountValue', 0)),
- str(DictQuery(amt).get('equalMonthlyPriceWithInterest/firstMonthPrice/amountValue', 0)),
- str(DictQuery(amt).get('equalMonthlyPriceWithInterest/subsequentMonthsPrice/amountValue', 0)),
- str(DictQuery(amt).get('equalMonthlyPriceWithInterest/apr', 0)),
- str(DictQuery(amt).get('equalMonthlyPriceWithInterest/interestAmount/amountValue', 0)),
- str(DictQuery(amt).get('equalMonthlyPriceWithInterest/interestPercentage', 0)),
- str(DictQuery(amt).get('equalMonthlyPriceWithInterest/premium/amountValue', 0))]
- if(len(annual_amount)==0 and len(monthly_amount)>0):
- amt_list.extend([0] + monthly_amount)
- elif(len(annual_amount)>0 and len(monthly_amount)==0):
- amt_list.extend(annual_amount + [0,0,0,0,0,0,0])
- elif(len(annual_amount)==0 and len(monthly_amount)==0):
- amt_list.extend([0,0,0,0,0,0,0,0])
- else:
- amt_list.extend(annual_amount + monthly_amount)
- rows.append(tuple(amt_list))
- print('rows')
- print(rows)
- new_df = spark.createDataFrame(rows, addOn_schema)
- return new_df
- @staticmethod
- def addOnsDF(spark,df):
- import json
- from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DecimalType
- addOn_schema = StructType([StructField('id',StringType(),False),StructField('version',IntegerType(),False),
- StructField('addon_id',StringType(),True), StructField('addon_amount',StringType(),True),
- StructField('existing_addon_id',StringType(),True),StructField('existing_addon_total_amount',StringType(),True)
- ])
- data_collect = df.collect()
- rows=[]
- for record in data_collect:
- renewal = record["renewal_policy_details"]
- existing = record["existing_policy_details"]
- renewal_map = {}
- existing_map = {}
- if renewal == None and existing == None:
- rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], "", "0", "", "0"]))
- else:
- if renewal==None and existing != None:
- existing = json.loads(existing)
- renewaladdOnList = []
- existingaddOnList = existing.get('existingPolicyPrice').get("addOnPrices",[])
- elif renewal!=None and existing == None:
- renewal = json.loads(renewal)
- renewal = renewal.get('renewalPolicyPrice', [])[0]
- renewaladdOnList = DictQuery(renewal).get("addOnPrices", [])
- existingaddOnList = []
- else:
- renewal = json.loads(renewal)
- existing = json.loads(existing)
- renewal = renewal.get('renewalPolicyPrice', [])[0]
- renewaladdOnList = DictQuery(renewal).get("addOnPrices", [])
- existingaddOnList = existing.get('existingPolicyPrice').get("addOnPrices",[])
- if len(renewaladdOnList) == 0 and len(existingaddOnList) == 0:
- rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], "", "0", "", "0"]))
- else:
- for row in renewaladdOnList:
- renewal_map[row.get('addOnType')] = str(DictQuery(row).get('price/totalPrice/amountValue', 0))
- for row in existingaddOnList:
- existing_map[row.get('addOnType')] = str(DictQuery(row).get('price/totalPrice/amountValue', 0))
- for k in renewal_map:
- if k in existing_map.keys():
- rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], k ,renewal_map[k], k, existing_map[k]]))
- else:
- rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], k ,renewal_map[k], "", "0"]))
- for k in existing_map:
- if k not in renewal_map.keys():
- rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], "" ,"0", k, existing_map[k]]))
- print('rows')
- print(rows)
- new_df = spark.createDataFrame(rows, addOn_schema)
- return new_df
[text] python
Viewer
*** This page was generated with the meta tag "noindex, nofollow". This happened because you selected this option before saving or the system detected it as spam. This means that this page will never get into the search engines and the search bot will not crawl it. There is nothing to worry about, you can still share it with anyone.
Editor
You can edit this paste and save as new: