33import sys
44import logging
55from pathlib import Path
6+ from typing import Dict , Any
67
78APP_NAME = "DataPipeline"
89VERSION = "1.0.0"
1314import numpy as np
1415import requests
1516from sqlalchemy import create_engine
17+ from sqlalchemy .engine import Engine
1618import boto3
19+ from botocore .client import BaseClient
1720from dotenv import load_dotenv
1821
1922load_dotenv ()
4043class DataPipeline :
4144 def __init__ (self , name : str ):
4245 self .name = name
43- self .engine = create_engine (DATABASE_URL )
44- self .s3_client = boto3 .client ('s3' ) if AWS_ACCESS_KEY else None
46+ self .engine : Engine = create_engine (DATABASE_URL )
47+ self .s3_client : BaseClient | None = boto3 .client ('s3' ) if AWS_ACCESS_KEY else None
4548 logger .info (f"Initialized pipeline: { name } " )
4649
4750 def extract_data (self , source : str ) -> pd .DataFrame :
4851 """Extract data from various sources"""
4952 if source .startswith ('http' ):
5053 headers = {'Authorization' : f'Bearer { API_KEY } ' }
5154 response = requests .get (source , headers = headers )
55+ response .raise_for_status () # Check for HTTP errors
5256 data = response .json ()
5357 return pd .DataFrame (data )
5458 elif source .endswith ('.csv' ):
@@ -68,13 +72,16 @@ def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
6872 logger .info (f"Transformed { len (df )} rows" )
6973 return df
7074
71- def load_data (self , df : pd .DataFrame , destination : str ):
75+ def load_data (self , df : pd .DataFrame , destination : str ) -> None :
7276 """Load data to destination"""
7377 if destination .startswith ('s3://' ):
78+ if self .s3_client is None :
79+ raise ValueError ("S3 client not initialized. Check AWS credentials." )
7480 bucket = destination .replace ('s3://' , '' ).split ('/' )[0 ]
7581 key = '/' .join (destination .replace ('s3://' , '' ).split ('/' )[1 :])
76- df .to_parquet (f'/tmp/{ key } ' )
77- self .s3_client .upload_file (f'/tmp/{ key } ' , bucket , key )
82+ temp_file = f'/tmp/{ key } '
83+ df .to_parquet (temp_file )
84+ self .s3_client .upload_file (temp_file , bucket , key )
7885 elif destination .endswith ('.csv' ):
7986 df .to_csv (destination , index = False )
8087 else :
@@ -83,7 +90,7 @@ def load_data(self, df: pd.DataFrame, destination: str):
8390
8491 logger .info (f"Loaded data to: { destination } " )
8592
86- def run_pipeline (config : dict ) :
93+ def run_pipeline (config : Dict [ str , Any ]) -> None :
8794 """Execute a complete ETL pipeline"""
8895 pipeline = DataPipeline (config ['name' ])
8996
@@ -97,7 +104,7 @@ def run_pipeline(config: dict):
97104 logger .info (f"Pipeline { config ['name' ]} completed successfully" )
98105
99106# 3. Deploy ----
100- def main ():
107+ def main () -> None :
101108 """Main deployment function"""
102109 print (f"🚀 Starting { APP_NAME } v{ VERSION } " )
103110
@@ -122,6 +129,8 @@ def main():
122129 for config in pipelines :
123130 try :
124131 run_pipeline (config )
132+ except Exception as e :
133+ logger .error (f"Pipeline { config ['name' ]} failed: { e } " )
125134
126135if __name__ == "__main__" :
127136 main ()
0 commit comments