-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsbdp.py
More file actions
70 lines (54 loc) · 2.68 KB
/
sbdp.py
File metadata and controls
70 lines (54 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import uuid
PYTHON_EXE = r"D:\codes\py-spark\spark-bulk-data-processing\.venv\Scripts\python.exe"
os.environ["PYSPARK_PYTHON"] = PYTHON_EXE
from pyspark.sql.functions import col, to_json, struct
from src import ConfigLoader, DataLoader, Transformations
from src.config import Utils
from src.logger import Log4J
# Ensure log directory exists
os.makedirs("logs", exist_ok=True)
if __name__ == '__main__':
# if len(sys.argv) < 3:
# print("Usage: SBDP- {dev, qa ,prod} {load_date}- Arg is missing")
# sys.exit(-1)
job_run_env = "LOCAL" # sys.argv[1].upper()
# load_date = sys.argv[2]
job_run_id = "SBDP-" + str(uuid.uuid4())
print(f"[INITIALISED] SBDP Spark JOB for env {job_run_env} and job id {job_run_id}")
config = ConfigLoader.get_config(env=job_run_env)
enable_hive = True if config['enable.hive'] == True else False
hive_db = config['hive.database']
print("Creating Spark Session.. ")
spark = Utils.get_spark_session(job_run_env)
spark.sparkContext.setLogLevel("INFO")
logger = Log4J(spark=spark)
logger.info("[START] :: Spark Bulk Data Processing started")
logger.error("Reading SBDP parties Csv to DF")
parties_df = DataLoader.read_parties(spark=spark, env=job_run_env, enable_hive=enable_hive, hive_db=hive_db)
relations_df = Transformations.get_relations(df=parties_df)
logger.error("Reading SBDP Address Csv to DF")
address_df = DataLoader.read_address(spark=spark, env=job_run_env, enable_hive=enable_hive, hive_db=hive_db)
relation_address_df = Transformations.get_address(df=address_df)
logger.error("Joining Party Relation and Address DF")
party_address_Df = Transformations.join_party_address(party_df=relations_df, address_df=relation_address_df)
logger.error("Reading SBDP Account Csv to DF")
account_df = DataLoader.read_account(spark=spark, env=job_run_env, enable_hive=enable_hive, hive_db=hive_db)
contract_df = Transformations.get_contract(df=account_df)
logger.error("Joining Account and Parties")
data_df = Transformations.join_contract_party(contract_df=contract_df, party_df=party_address_Df)
logger.error("Apply Header and create Event ")
final_df = Transformations.apply_header(spark=spark, df=data_df)
# final_df.write.parquet("data.parquet")
# TODO
kafka_kv_df = final_df.select(col("payload.contractIdentifier.newValue").alias("key"),
to_json(struct("*")).alias("value"))
input("TT")
# kafka_key = config['kafka.key']
# kafka_secret = config['kafka.api_secret']
#
# (final_df.write.format("kakfa")
# .option()
# .save()
# )
logger.info("Finished Data sending data to Kafka")