[text] val

Viewer

  1. from pyspark.sql import SparkSession
  2. import argparse
  3.  
  4. import argparse
  5. parser=argparse.ArgumentParser(description="args for validation script")
  6.  
  7. parser.add_argument('--input_gcs_path',type=str)
  8. parser.add_argument('--input_header',type=str) //True or False
  9. parser.add_argument('--dest_bq_project',type=str)
  10. parser.add_argument('--dest_bq_dataset',type=str)
  11. parser.add_argument('--dest_bq_table',type=str)
  12. parser.add_argument('--dest_bq_filter',type=str)
  13. parser.add_argument('--col_list',type=str)
  14. parser.add_argument('--bq_drop_col_list',type=str) //'ed_ts', 'ed_date'
  15. parser.add_argument('--compare_columns_flag',type=str)
  16. parser.add_argument('--compare_rows_flag',type=str)
  17. parser.add_argument('--bad_rec_gcs_dir',type=str)
  18.  
  19.  
  20. args=parser.parse_args()
  21. input_gcs_path=args.input_gcs_path
  22. input_header=args.input_header
  23. dest_bq_project=args.dest_bq_project
  24. dest_bq_dataset=args.dest_bq_dataset
  25. dest_bq_table=args.dest_bq_table
  26. dest_bq_filter=args.dest_bq_filter
  27. col_list=args.col_list
  28. bq_drop_col_list=args.bq_drop_col_list
  29. compare_columns_flag=args.compare_columns_flag
  30. compare_rows_flag=args.compare_rows_flag
  31. bad_rec_gcs_dir=args.bad_rec_gcs_dir
  32.  
  33.  
  34. # Create a Spark session
  35. spark = SparkSession.builder \
  36.     .appName("data validation") \
  37.     .getOrCreate()
  38.  
  39. if compare_rows_flag=="True":
  40.     print("comparing rowwise dataframe")
  41.     df_gcs = spark.read.csv(input_gcs_path, header=input_header)
  42.     df_gcs.show()
  43.  
  44.     # Read data from BigQuery into a DataFrame
  45.     df_bq = spark.read \
  46.         .format("bigquery") \
  47.         .option("table", dest_bq_table) \
  48.         .option("filter", dest_bq_filter) \
  49.         .option("project", dest_bq_project) \
  50.         .option("dataset", dest_bq_dataset) \
  51.         .load()
  52.     df_bq.show()
  53.     df_bq_new=df_bq.drop(bq_drop_col_list)
  54.     df_bq_new.show()
  55.  
  56.     diff_df_gcs_bq=df_gcs.subtract(df_bq_new)
  57.     diff_df_bq_gcs=df_bq_new.subtract(df_gcs)
  58.     # Save DataFrame to GCS as CSV
  59.     diff_df_gcs_bq.write.csv(bad_rec_gcs_dir+"/"+dest_bq_dataset+"/"+dest_bq_table+"/diff_df_gcs_bq/", mode="overwrite", header=True)
  60.     diff_df_bq_gcs.write.csv(bad_rec_gcs_dir+"/"dest_bq_dataset+"/"+dest_bq_table+"/diff_df_bq_gcs/", mode="overwrite", header=True)
  61.  
  62. if compare_columns_flag=="True":
  63.  
  64.  
  65.  
  66.  

Editor

You can edit this paste and save as new:


File Description
  • val
  • Paste Code
  • 20 May-2024
  • 2.23 Kb
You can Share it: