-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paths3_client.py
More file actions
129 lines (112 loc) · 4.66 KB
/
s3_client.py
File metadata and controls
129 lines (112 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from llama_index.core import Document
from dotenv import load_dotenv
class S3Client:
def __init__(self):
loaded = load_dotenv()
if not loaded:
raise ValueError("Failed to load environment variables")
# Get AWS S3 environment variables
self.endpoint_url = os.getenv("AWS_ENDPOINT_URL")
self.access_key = os.getenv("AWS_ACCESS_KEY_ID")
self.secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
self.bucket_name = os.getenv("AWS_S3_BUCKET")
self.region = os.getenv("AWS_REGION")
self.secure = os.getenv("AWS_SECURE", "true").lower() == "true"
# Check each required variable and log if missing
missing_vars = []
if not self.endpoint_url:
missing_vars.append("AWS_ENDPOINT_URL")
if not self.access_key:
missing_vars.append("AWS_ACCESS_KEY_ID")
if not self.secret_key:
missing_vars.append("AWS_SECRET_ACCESS_KEY")
if not self.bucket_name:
missing_vars.append("AWS_S3_BUCKET")
if not self.region:
missing_vars.append("AWS_REGION")
if missing_vars:
error_msg = (
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logging.error(error_msg)
raise ValueError(error_msg)
logging.info(
f"Initializing S3 client with endpoint: {self.endpoint_url}, "
f"bucket: {self.bucket_name}, region: {self.region}, secure: {self.secure}"
)
# Configure S3 client
config = Config(
signature_version="s3v4",
region_name=self.region,
)
self.s3_client = boto3.client(
"s3",
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=config,
verify=self.secure,
)
# Ensure bucket exists
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
logging.info(f"Successfully connected to bucket: {self.bucket_name}")
except ClientError as e:
if e.response["Error"]["Code"] == "404":
logging.info(f"Creating bucket: {self.bucket_name}")
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={"LocationConstraint": self.region},
)
logging.info(f"Successfully created bucket: {self.bucket_name}")
else:
logging.error(f"Error accessing bucket {self.bucket_name}: {str(e)}")
raise
def _get_key(self, community_id: str, activity_type: str, timestamp: str) -> str:
"""Generate a unique S3 key for the data."""
return f"{community_id}/{activity_type}/{timestamp}.json"
def store_extracted_data(self, community_id: str, data: Dict[str, Any]) -> str:
"""Store extracted data in S3."""
timestamp = datetime.now(tz=timezone.utc).isoformat()
key = self._get_key(community_id, "extracted", timestamp)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(data),
ContentType="application/json",
)
return key
def store_transformed_data(
self, community_id: str, documents: List[Document]
) -> str:
"""Store transformed documents in S3."""
timestamp = datetime.now(tz=timezone.utc).isoformat()
key = self._get_key(community_id, "transformed", timestamp)
# Convert Documents to dict for JSON serialization
docs_data = [doc.to_dict() for doc in documents]
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(docs_data),
ContentType="application/json",
)
return key
def get_data_by_key(self, key: str) -> Dict[str, Any]:
"""Get data from S3 using a specific key."""
try:
obj = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
return json.loads(obj["Body"].read().decode("utf-8"))
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
logging.error(f"No data found for key: {key}")
raise ValueError(f"No data found for key: {key}")
logging.error(f"Error retrieving data for key {key}: {str(e)}")
raise