[text] python

Viewer

  1. class DictQuery(dict):
  2.     def get(self, path, default=None):
  3.         keys = path.split("/")
  4.         value = None
  5.  
  6.         for key in keys:
  7.             if value:
  8.                 if isinstance(value, list):
  9.                     value = [v.get(key, default) if v else None for v in value]
  10.                 else:
  11.                     value = value.get(key, default)
  12.             else:
  13.                 value = dict.get(self, key, default)
  14.  
  15.             if not value:
  16.                 break
  17.  
  18.         return value
  19.  
  20.  
  21. class Script:
  22.  
  23.     @staticmethod
  24.     def execute(spark, input, execParams):
  25.         from pyspark.sql.functions import udf, col, lit, to_timestamp, from_unixtime, to_date,row_number,when,expr,from_utc_timestamp,date_format
  26.         from datetime import date
  27.         from pyspark.sql.window import Window
  28.  
  29.         # Defining UDF Functions
  30.         policyConfigurationUDF = udf(
  31.             lambda data, dimension: Script.handlePolicyConfiguration(data, dimension))
  32.  
  33.         policyPricingUDF = udf(
  34.             lambda data, field: Script.policyPrice(data, field))
  35.         
  36.         # epochUDF = udf(lambda t: Script.epoch_to_time(t))
  37.             
  38.         windowSpec  = Window.partitionBy("policy_tenure_id","policy_id","addon_id","record_version").orderBy(col("approx_creation_datetime").desc())
  39.  
  40.         output = ['policy_renewal_offer_id', 'record_version', 'current_policy_tenure_id','is_active', 'is_record_active', 'addon_total_price','last_updated_timestamp', 'partner_renewal_offer_id', 'policy_event_id', 'policy_id', 'renewal_action', 'renewal_date_timestamp', 'creation_timestamp', 'renewal_offer_expiry_timestamp', 'contentscover_amount', 'excess_amount', 'payment_frequency', 'base_price_amount', 'total_amount_annual', 'total_amount_monthly', 'firstmonth_amount', 'subsequentmonth_amount', 'apr', 'interest_amount', 'interest_percentage', 'premium', 'auto_renewal_flag', 'existing_contents_cover_limit_amount', 'existing_excess_amount', 'existing_total_amount', 'existing_base_amount', 'existing_payment_frequency', 'existing_addon_id', 'existing_addon_total_amount', 'addon_key', 'creation_date','renewal_date_timestamp_local','creation_timestamp_local','last_updated_timestamp_local','renewal_offer_expiry_timestamp_local', 'addon_id', 'is_prod', 'renewal_no_offer_reason_code']
  41.         df = spark.sql('select * from pr')
  42.         print('read source df')
  43.         print(df.show(),flush=True)
  44.         print("Number of Records (INPUT): " + str(df.count()),flush=True)
  45.         
  46.         df2 = spark.sql("select distinct policy_id as p_policy_id from p where is_prod=0")
  47.  
  48.     #     # Adding Columns to dataframe & handling Data Transformation
  49.        
  50.     #     # Flattening Policy Configuration Column
  51.         df = df.withColumn("currency", policyConfigurationUDF(col("renewal_policy_details"), lit("CURRENCY")))
  52.         df = df.withColumn("excess_amount", policyConfigurationUDF(col("renewal_policy_details"), lit("EXCESS")).cast("double"))
  53.         df = df.withColumn("contentscover_amount", policyConfigurationUDF(col("renewal_policy_details"), lit("CONTENTSCOVER")).cast("double"))
  54.         df = df.withColumn("payment_frequency", policyConfigurationUDF(col("renewal_policy_details"), lit("PAYMENTFREQUENCY")))
  55.         df = df.withColumn("base_price_amount", policyConfigurationUDF(col("renewal_policy_details"),lit("BASE_AMOUNT")).cast("double"))
  56.  
  57.         
  58.         print('policyConfigurationUDF')
  59.         print(df.show(truncate=False))
  60.  
  61.         
  62.         df = df.withColumn("renewal_date_timestamp", from_unixtime(col("renewal_date_timestamp")).cast("timestamp"))
  63.         df = df.withColumn("creation_timestamp", from_unixtime(col("creation_timestamp")).cast("timestamp"))
  64.         df = df.withColumn("renewal_offer_expiry_timestamp", from_unixtime(col("renewal_offer_expiry_timestamp")).cast("timestamp"))
  65.         df = df.withColumn("creation_date", to_date(col("creation_timestamp").cast("timestamp")))
  66.         df = df.withColumn("last_updated_timestamp", from_unixtime(col("last_updated_timestamp")/1000).cast("timestamp"))
  67.        
  68.  
  69.         df = df.withColumn('dw_load_date', lit(date.today()))
  70.  
  71.         renewalDF = Script.createRenewalDeatilsDF(spark,df)
  72.         print("renewalDF")
  73.         print(renewalDF.show(truncate=False))
  74.         existingPolicyPriceDF = Script.createexistingPolicyDetailsDF(spark,df)
  75.         print("existingPolicyPriceDF")
  76.         print(existingPolicyPriceDF.show(truncate=False))
  77.         
  78.         # Joing Addons Dataframe with main Dataframe, checking that PK do not contain null or None (replecing with ""), removing duplicate records by distinct()
  79.         pricedf = renewalDF.join(existingPolicyPriceDF, (renewalDF.id == existingPolicyPriceDF.id) & (renewalDF.version ==existingPolicyPriceDF.version))
  80.         print("join df")
  81.         print(pricedf.show(truncate=False))  
  82.         pricedf=pricedf.drop(renewalDF.id)
  83.         pricedf=pricedf.drop(renewalDF.version)
  84.         print(pricedf.show(truncate=False))
  85.         
  86.         addOnDF = Script.addOnsDF(spark,df)
  87.         pricedf = pricedf.join(addOnDF, (pricedf.id == addOnDF.id) & (pricedf.version == addOnDF.version))
  88.         print(pricedf.show(truncate=False))  
  89.         pricedf=pricedf.drop(addOnDF.id)
  90.         pricedf=pricedf.drop(addOnDF.version)
  91.         print(pricedf.show(truncate=False))
  92.         
  93.         df=df.join(pricedf,(df.policy_renewal_offer_id == pricedf.id) & (df.record_version == pricedf.version))
  94.         print('df')
  95.         print(df.show(truncate=False))
  96.         
  97.         df = df.withColumn("total_amount_annual", col("total_amount_annual").cast("double")).withColumn("total_amount_monthly", col("total_amount_monthly").cast("double")).withColumn("firstmonth_amount", col("firstmonth_amount").cast("double")).withColumn("subsequentmonth_amount", col("subsequentmonths_amount").cast("double")).withColumn("apr", col("apr").cast("double")).withColumn("existing_contents_cover_limit_amount", col("existing_contents_cover_limit_amount").cast("double")).withColumn("existing_excess_amount", col("existing_excess_amount").cast("double")).withColumn("existing_base_amount", col("existing_base_amount").cast("double")).withColumn("existing_total_amount", col("existing_total_amount").cast("double")).withColumn("existing_addon_id", col("existing_addon_id").cast("string")).withColumn("existing_addon_total_amount", col("existing_addon_total_amount").cast("double")) .withColumn("interest_amount", col("interest_amount").cast("double")).withColumn("interest_percentage", col("interest_percentage").cast("double")).withColumn("premium", col("premium").cast("double")).withColumn("addon_total_price", col("addon_amount").cast("double"))
  98.         
  99.         df = df.withColumn('auto_renewal_flag',expr("case when auto_renewal_flag='True' then 1 else 0 end"))
  100.         
  101.     #   df = df.withColumn("approx_creation_datetime",when(col("approx_creation_datetime")==None ,None).otherwise(from_unixtime(epochUDF("approx_creation_datetime")).cast('timestamp'))).withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number")==1)
  102.     
  103.         df= df.join(df2, df.policy_id == df2.p_policy_id, "left") 
  104.         
  105.         df = df.withColumn("is_prod", expr("case when (creation_date >'2023-09-22' or policy_id=p_policy_id) then 0 " + "else 1 end"))
  106.         df.slect(col("is_prpd")).show()
  107.         
  108.         df = df.withColumn("creation_timestamp_local",from_utc_timestamp(col("creation_timestamp"),"Europe/London"))
  109.         df = df.withColumn("last_updated_timestamp_local",from_utc_timestamp(col("last_updated_timestamp"),"Europe/London"))
  110.         df = df.withColumn("renewal_date_timestamp_local",from_utc_timestamp(col("renewal_date_timestamp"),"Europe/London"))
  111.         df = df.withColumn("renewal_offer_expiry_timestamp_local",from_utc_timestamp(col("renewal_offer_expiry_timestamp"),"Europe/London"))
  112.         df = df.withColumn('addon_key',expr("case when addon_id is not null or trim(addon_id)='' then addon_id when existing_addon_id is not null or existing_addon_id='' then existing_addon_id else 'NA' end"))
  113.         df = df.withColumn("addon_key", expr("CASE WHEN  trim(addon_key)='' then 'NA' else addon_key END"))
  114.         
  115.         df = df.select(*output).na.fill("NA", ["addon_key"]).distinct().filter(col("is_record_active") == 1)
  116.         print(df.printSchema())
  117.         
  118.         print('output')
  119.         print("Number of Records (OUTPUT): " + str(df.count()),flush=True)
  120.         print(df.show(),flush=True)
  121.         return df
  122.         
  123.     # @staticmethod
  124.     # def epoch_to_time(time):
  125.     #     if time==None:
  126.     #         return None
  127.     #     time = str(time)[0:10]
  128.     #     return int(time)
  129.         
  130.     # Function to handle values for currency,excess_amount,contentscover_amount,payment_frequency (For no value: it would return None for String & 0 for Number)
  131.     @staticmethod
  132.     def handlePolicyConfiguration(data, dimension):
  133.         # handle if data is None
  134.         if data==None and (dimension=='CURRENCY' or dimension=='PAYMENTFREQUENCY'):
  135.             return None
  136.         elif data==None:
  137.             return 0
  138.             
  139.         import json
  140.         data = json.loads(data)
  141.         if dimension == "CONTENTSCOVER":
  142.             return(DictQuery(data).get('contentsCoverLimit/amountValue',0))
  143.         elif dimension == "EXCESS":
  144.             return(DictQuery(data).get('excessLevel/amountValue',0))
  145.         elif dimension == "PAYMENTFREQUENCY":
  146.             data = data.get('renewalPolicyPrice', [])[0]
  147.             return(DictQuery(data).get('renewalPolicyPrice/paymentFrequency',None))
  148.         elif dimension == "CURRENCY":
  149.             return(DictQuery(data).get('excessLevel/currency',DictQuery(data).get('contentsCoverLimit/currency',None)))
  150.         elif dimension == "BASE_AMOUNT":
  151.             data = data.get('renewalPolicyPrice', [])[0]
  152.             return(DictQuery(data).get('basePrice/totalPrice/amountValue',None))
  153.             
  154.             
  155.     # Handle AddOns
  156.     @staticmethod
  157.     def createexistingPolicyDetailsDF(spark,df):
  158.         import json
  159.         from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DecimalType
  160.         
  161.       
  162.         existingPolicyDetailsSchema =  StructType([StructField('id',StringType(),False),StructField('version',IntegerType(),False), StructField('auto_renewal_flag',StringType(),True),
  163.                                     StructField('existing_contents_cover_limit_amount',StringType(),True),StructField('existing_excess_amount',StringType(),True),
  164.                                     StructField('existing_base_amount',StringType(),True),StructField('existing_payment_frequency',StringType(),True),
  165.                                     StructField('existing_total_amount',StringType(),True)])
  166.         data_collect = df.collect()  
  167.         rows=[]
  168.         for record in data_collect:
  169.             amt_list = []
  170.             data = record["existing_policy_details"]
  171.             amt_list.append(record["policy_renewal_offer_id"])
  172.             amt_list.append(record["record_version"])
  173.             if data == None:
  174.                 amt_list.extend(["False","0","0","0","", "0"])
  175.                 rows.append(tuple(amt_list))
  176.             else:
  177.                 data = json.loads(data)
  178.                 amt_list.extend([str(DictQuery(data).get('autoRenewalFlag', False)),
  179.                                 str(DictQuery(data).get('contentsCoverLimit/amountValue', 0)),
  180.                                 str(DictQuery(data).get('excessLevel/amountValue', 0)),
  181.                                 str(DictQuery(data).get('existingPolicyPrice/basePrice/totalPrice/amountValue', 0)),
  182.                                 str(DictQuery(data).get('existingPolicyPrice/currentPaymentFrequency', 0)),
  183.                                 str(DictQuery(data).get('existingPolicyPrice/totalPrice/totalPrice/amountValue', 0))])
  184.                 rows.append(tuple(amt_list))
  185.         print('rows')
  186.         print(rows)
  187.                 
  188.         new_df = spark.createDataFrame(rows, existingPolicyDetailsSchema)
  189.         return new_df        
  190.  
  191.     # Handle renewal details
  192.     @staticmethod
  193.     def createRenewalDeatilsDF(spark,df):
  194.             import json
  195.             from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DecimalType
  196.           
  197.             addOn_schema =  StructType([StructField('id',StringType(),False),StructField('version',IntegerType(),False),StructField('total_amount_annual',StringType(),True),
  198.                                         StructField('total_amount_monthly',StringType(),True),StructField('firstmonth_amount',StringType(),True),
  199.                                         StructField('subsequentmonths_amount',StringType(),True),StructField('apr',StringType(),True),
  200.                                         StructField('interest_amount',StringType(),True),StructField('interest_percentage',StringType(),True),
  201.                                         StructField('premium',StringType(),True)])
  202.             data_collect = df.collect()  
  203.             rows=[]
  204.             for record in data_collect:
  205.                 amt_list = []
  206.                 data = record["renewal_policy_details"]
  207.                 amt_list.append(record["policy_renewal_offer_id"])
  208.                 amt_list.append(record["record_version"])
  209.                 if data == None:
  210.                     amt_list.extend(["0","0","0","0","0","0","0","0"])
  211.                     rows.append(tuple(amt_list))
  212.                 else:
  213.                     data = json.loads(data)
  214.                     policyPriceData = data.get('renewalPolicyPrice', [])
  215.                     annual_amount, monthly_amount = [], []
  216.                     for obj in policyPriceData:
  217.                         amt = obj.get("totalPrice")
  218.                         paymentFrequency = obj.get("paymentFrequency")
  219.                         if(paymentFrequency == "Annual"):
  220.                             annual_amount = [str(DictQuery(amt).get('annualPrice/totalPrice/amountValue', 0))]
  221.                         elif(paymentFrequency=="EqualMonthly12"):
  222.                             monthly_amount = [str(DictQuery(amt).get('equalMonthlyPriceWithInterest/totalPrice/amountValue', 0)),
  223.                                     str(DictQuery(amt).get('equalMonthlyPriceWithInterest/firstMonthPrice/amountValue', 0)),
  224.                                     str(DictQuery(amt).get('equalMonthlyPriceWithInterest/subsequentMonthsPrice/amountValue', 0)),
  225.                                     str(DictQuery(amt).get('equalMonthlyPriceWithInterest/apr', 0)),
  226.                                     str(DictQuery(amt).get('equalMonthlyPriceWithInterest/interestAmount/amountValue', 0)),
  227.                                     str(DictQuery(amt).get('equalMonthlyPriceWithInterest/interestPercentage', 0)),
  228.                                     str(DictQuery(amt).get('equalMonthlyPriceWithInterest/premium/amountValue', 0))]
  229.                     if(len(annual_amount)==0 and len(monthly_amount)>0):
  230.                         amt_list.extend([0] + monthly_amount)
  231.                     elif(len(annual_amount)>0 and len(monthly_amount)==0):
  232.                         amt_list.extend(annual_amount + [0,0,0,0,0,0,0])
  233.                     elif(len(annual_amount)==0 and len(monthly_amount)==0):
  234.                         amt_list.extend([0,0,0,0,0,0,0,0])
  235.                     else:
  236.                         amt_list.extend(annual_amount + monthly_amount)
  237.                     rows.append(tuple(amt_list))
  238.  
  239.             print('rows')
  240.             print(rows)
  241.                     
  242.             new_df = spark.createDataFrame(rows, addOn_schema)
  243.             return new_df
  244.     
  245.     
  246.     @staticmethod
  247.     def addOnsDF(spark,df):
  248.         import json
  249.         from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DecimalType
  250.           
  251.         addOn_schema =  StructType([StructField('id',StringType(),False),StructField('version',IntegerType(),False),
  252.                                     StructField('addon_id',StringType(),True), StructField('addon_amount',StringType(),True),
  253.                                     StructField('existing_addon_id',StringType(),True),StructField('existing_addon_total_amount',StringType(),True)
  254.                                 ])
  255.         data_collect = df.collect()  
  256.         rows=[]
  257.         for record in data_collect:
  258.             renewal = record["renewal_policy_details"]
  259.             existing = record["existing_policy_details"]
  260.             renewal_map = {}
  261.             existing_map = {}
  262.             if renewal == None and existing == None:
  263.                 rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], "", "0", "", "0"]))
  264.             
  265.             else:
  266.                 if renewal==None and existing != None:
  267.                     existing = json.loads(existing)
  268.                     renewaladdOnList = []
  269.                     existingaddOnList = existing.get('existingPolicyPrice').get("addOnPrices",[])
  270.                 elif renewal!=None and existing == None:
  271.                     renewal = json.loads(renewal)
  272.                     renewal = renewal.get('renewalPolicyPrice', [])[0]
  273.                     renewaladdOnList = DictQuery(renewal).get("addOnPrices", [])
  274.                     existingaddOnList = []
  275.                 else:
  276.                     renewal = json.loads(renewal)
  277.                     existing = json.loads(existing)
  278.                     renewal = renewal.get('renewalPolicyPrice', [])[0]
  279.                     renewaladdOnList = DictQuery(renewal).get("addOnPrices", [])
  280.                     existingaddOnList = existing.get('existingPolicyPrice').get("addOnPrices",[])
  281.                 
  282.                 if len(renewaladdOnList) == 0 and len(existingaddOnList) == 0:
  283.                     rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], "", "0", "", "0"]))
  284.                 
  285.                 else:   
  286.                     for row in renewaladdOnList:
  287.                         renewal_map[row.get('addOnType')] = str(DictQuery(row).get('price/totalPrice/amountValue', 0))
  288.                     for row in existingaddOnList:
  289.                         existing_map[row.get('addOnType')] = str(DictQuery(row).get('price/totalPrice/amountValue', 0))
  290.                     for k in renewal_map:
  291.                         if k in existing_map.keys():
  292.                             rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], k ,renewal_map[k], k, existing_map[k]]))
  293.                         else:
  294.                             rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], k ,renewal_map[k], "", "0"]))
  295.                     for k in existing_map:
  296.                         if k not in renewal_map.keys():
  297.                             rows.append(tuple([record["policy_renewal_offer_id"], record["record_version"], "" ,"0", k, existing_map[k]]))
  298.                     
  299.         print('rows')
  300.         print(rows)
  301.                     
  302.         new_df = spark.createDataFrame(rows, addOn_schema)
  303.         return new_df  
  304.             
  305.         

Editor

You can edit this paste and save as new:


File Description
  • python
  • Paste Code
  • 03 Oct-2023
  • 18.81 Kb
You can Share it: