|
| 1 | +from networksecurity.exception.exception import NetworkSecurityException |
| 2 | +from networksecurity.logging.logger import logging |
| 3 | +from networksecurity.entity.config_entity import DataIngestionConfig |
| 4 | +from networksecurity.entity.artifact_entity import DataIngestionArtifact |
| 5 | +import os |
| 6 | +import sys |
| 7 | +import numpy as np |
| 8 | +import pandas as pd |
| 9 | +import pymongo |
| 10 | +from typing import List |
| 11 | +from sklearn.model_selection import train_test_split |
| 12 | +from dotenv import load_dotenv |
| 13 | + |
| 14 | +load_dotenv() |
| 15 | +uri = os.getenv('MONGODB_URI') |
| 16 | + |
| 17 | +class DataIngestion: |
| 18 | + def __init__(self, data_ingestion_config: DataIngestionConfig): |
| 19 | + try: |
| 20 | + self.data_ingestion_config = data_ingestion_config |
| 21 | + except Exception as e: |
| 22 | + raise NetworkSecurityException(e, sys) |
| 23 | + |
| 24 | + def export_collection_as_dataframe(self): |
| 25 | + try: |
| 26 | + database_name = self.data_ingestion_config.database_name |
| 27 | + collection_name = self.data_ingestion_config.collection_name |
| 28 | + self.mongo_client = pymongo.MongoClient(uri) |
| 29 | + collection = self.mongo_client[database_name][collection_name] |
| 30 | + |
| 31 | + df = pd.DataFrame(list(collection.find())) |
| 32 | + if "_id" in df.columns.to_list(): |
| 33 | + df = df.drop(columns=["_id"], axis=1) |
| 34 | + |
| 35 | + df.replace("", np.nan, inplace=True) |
| 36 | + return df |
| 37 | + |
| 38 | + except Exception as e: |
| 39 | + raise NetworkSecurityException(e, sys) |
| 40 | + |
| 41 | + def export_data_into_feature_store(self, df: pd.DataFrame, feature_store_file_path: str): |
| 42 | + try: |
| 43 | + dir_path = os.path.dirname(feature_store_file_path) |
| 44 | + os.makedirs(dir_path, exist_ok=True) |
| 45 | + df.to_csv(feature_store_file_path, index=False, header=True) |
| 46 | + return df # Ensure df is returned |
| 47 | + except Exception as e: |
| 48 | + raise NetworkSecurityException(e, sys) |
| 49 | + |
| 50 | + def split_data_as_train_test(self, df: pd.DataFrame): |
| 51 | + try: |
| 52 | + train_set, test_set = train_test_split( |
| 53 | + df, test_size=self.data_ingestion_config.train_test_split_ratio |
| 54 | + ) |
| 55 | + logging.info("Performed train test split on the dataframe") |
| 56 | + |
| 57 | + dir_path = os.path.dirname(self.data_ingestion_config.training_file_path) |
| 58 | + os.makedirs(dir_path, exist_ok=True) |
| 59 | + |
| 60 | + train_set.to_csv( |
| 61 | + self.data_ingestion_config.training_file_path, index=False, header=True |
| 62 | + ) |
| 63 | + test_set.to_csv( |
| 64 | + self.data_ingestion_config.test_file_path, index=False, header=True # Changed from testing_file_path to test_file_path |
| 65 | + ) |
| 66 | + except Exception as e: |
| 67 | + raise NetworkSecurityException(e, sys) |
| 68 | + |
| 69 | + def initiate_data_ingestion(self): |
| 70 | + logging.info("Entered the data ingestion method or component") |
| 71 | + try: |
| 72 | + dataframe = self.export_collection_as_dataframe() |
| 73 | + dataframe = self.export_data_into_feature_store( |
| 74 | + dataframe, self.data_ingestion_config.feature_store_file_path |
| 75 | + ) |
| 76 | + self.split_data_as_train_test(dataframe) |
| 77 | + data_ingestion_artifact = DataIngestionArtifact( |
| 78 | + feature_store_file_path=self.data_ingestion_config.feature_store_file_path, |
| 79 | + train_file_path=self.data_ingestion_config.training_file_path, |
| 80 | + test_file_path=self.data_ingestion_config.test_file_path, # Changed from testing_file_path to test_file_path |
| 81 | + ) |
| 82 | + return data_ingestion_artifact |
| 83 | + except Exception as e: |
| 84 | + raise NetworkSecurityException(e, sys) |
0 commit comments