Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,11 @@ POSTGRES_PWD=
POSTGRES_SEEDS=
POSTGRES_USER=

PROXY_URL=
PROXY_URL=

AWS_ENDPOINT_URL=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_BUCKET=
AWS_REGION=
AWS_SECURE=
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,7 @@ main.ipynb

*.xml

dump_*
dump_*

minio_data/
dumps/*
41 changes: 41 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,17 @@ services:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
networks:
- temporal-network
environment:
- AWS_ENDPOINT_URL=http://minio:9000
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_S3_BUCKET=hivemind-etl
- AWS_REGION=us-east-1
- AWS_SECURE=false

temporal:
image: temporalio/auto-setup:1.25.2.0
Expand Down Expand Up @@ -120,6 +129,38 @@ services:
networks:
- temporal-network

minio:
image: minio/minio:RELEASE.2025-04-22T22-12-26Z
ports:
- "9000:9000" # API
- "9001:9001" # Console
environment:
MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-minioadmin}
MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-minioadmin}
volumes:
- ./minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
networks:
- temporal-network

minio-healthcheck:
image: curlimages/curl:8.11.0
entrypoint: ["/bin/sh", "-c", "--", "while true; do sleep 30; done;"]
depends_on:
- minio
healthcheck:
test: ["CMD", "curl", "-f", "http://minio:9000/minio/health/live"]
interval: 10s
timeout: 2s
retries: 5
networks:
- temporal-network

networks:
temporal-network:
driver: bridge
Expand Down
65 changes: 53 additions & 12 deletions hivemind_etl/mediawiki/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
with workflow.unsafe.imports_passed_through():
from hivemind_etl.mediawiki.module import ModulesMediaWiki
from hivemind_etl.mediawiki.etl import MediawikiETL
from hivemind_etl.storage.s3_client import S3Client
from llama_index.core import Document


Expand Down Expand Up @@ -53,7 +54,9 @@ async def get_hivemind_mediawiki_platforms(

@activity.defn
async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
"""Extract data from MediaWiki API URL."""
"""
Extract data from MediaWiki API URL and store in S3.
"""
try:
Comment thread
amindadgar marked this conversation as resolved.
community_id = mediawiki_platform["community_id"]
api_url = mediawiki_platform["base_url"]
Expand All @@ -69,7 +72,8 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
platform_id=platform_id,
)
mediawiki_etl.extract(api_url=api_url)
logging.info(f"Completed extraction for community {community_id}")

logging.info(f"Completed extraction for community {community_id}!")
except Exception as e:
community_id = mediawiki_platform["community_id"]
logging.error(f"Error in extraction for community {community_id}: {str(e)}")
Expand All @@ -79,9 +83,20 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
@activity.defn
async def transform_mediawiki_data(
mediawiki_platform: dict[str, Any],
) -> list[Document]:
"""Transform the extracted MediaWiki data."""
) -> str:
"""
Transform the extracted MediaWiki data and store in S3.

Parameters
----------
mediawiki_platform : dict[str, Any]
The platform configuration

Returns
-------
str
The S3 key where the transformed data is stored
"""
community_id = mediawiki_platform["community_id"]
platform_id = mediawiki_platform["platform_id"]
try:
Expand All @@ -93,25 +108,51 @@ async def transform_mediawiki_data(
namespaces=namespaces,
platform_id=platform_id,
)
result = mediawiki_etl.transform()
logging.info(f"Completed transformation for community {community_id}")
return result

# Transform data using the extracted data from S3
documents = mediawiki_etl.transform()

s3_client = S3Client()
# Store transformed data in S3
transformed_key = s3_client.store_transformed_data(community_id, documents)

logging.info(
f"Completed transformation for community {community_id} and stored in S3 with key: {transformed_key}"
)
return transformed_key
except Exception as e:
logging.error(f"Error in transformation for community {community_id}: {str(e)}")
raise


@activity.defn
async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
"""Load the transformed MediaWiki data into the database."""
async def load_mediawiki_data(
mediawiki_platform: dict[str, Any],
) -> None:
"""
Load the transformed MediaWiki data into the database.

Parameters
----------
mediawiki_platform : dict[str, Any]
The platform configuration
"""
community_id = mediawiki_platform["community_id"]
platform_id = mediawiki_platform["platform_id"]
namespaces = mediawiki_platform["namespaces"]
transformed_data_key = mediawiki_platform["transformed_data_key"]

try:
documents_dict = mediawiki_platform["documents"]
# temporal had converted them to dicts, so we need to convert them back to Document objects
documents = [Document.from_dict(doc) for doc in documents_dict]
# Get transformed data from S3
s3_client = S3Client()
transformed_data = s3_client.get_data_by_key(transformed_data_key)
if not transformed_data:
raise ValueError(
f"No transformed data found in S3 for community {community_id}"
)

# Convert dict data back to Document objects
documents = [Document.from_dict(doc) for doc in transformed_data]

Comment thread
amindadgar marked this conversation as resolved.
logging.info(f"Starting data load for community {community_id}")
mediawiki_etl = MediawikiETL(
Expand Down
11 changes: 6 additions & 5 deletions hivemind_etl/mediawiki/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async def run(self, platform_id: str | None = None) -> None:
"namespaces": platform["namespaces"],
"platform_id": platform["platform_id"],
}
# Extract data from MediaWiki

# Extract data from MediaWiki and store in S3
await workflow.execute_activity(
extract_mediawiki,
mediawiki_platform,
Expand All @@ -57,8 +58,8 @@ async def run(self, platform_id: str | None = None) -> None:
),
)

# Transform the extracted data
documents = await workflow.execute_activity(
# Transform the extracted data and store in S3
transformed_data_key = await workflow.execute_activity(
transform_mediawiki_data,
mediawiki_platform,
start_to_close_timeout=timedelta(hours=6),
Expand All @@ -68,8 +69,8 @@ async def run(self, platform_id: str | None = None) -> None:
),
)

mediawiki_platform["documents"] = documents
# Load the transformed data
mediawiki_platform["transformed_data_key"] = transformed_data_key
# Load the transformed data from S3
await workflow.execute_activity(
load_mediawiki_data,
mediawiki_platform,
Expand Down
145 changes: 145 additions & 0 deletions hivemind_etl/storage/s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import os
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from llama_index.core import Document


class S3Client:
def __init__(self):
# Get AWS S3 environment variables
self.endpoint_url = os.getenv("AWS_ENDPOINT_URL")
self.access_key = os.getenv("AWS_ACCESS_KEY_ID")
self.secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
self.bucket_name = os.getenv("AWS_S3_BUCKET")
self.region = os.getenv("AWS_REGION", "us-east-1")
self.secure = os.getenv("AWS_SECURE", "true").lower() == "true"

# Check each required variable and log if missing
missing_vars = []
if not self.endpoint_url:
missing_vars.append("AWS_ENDPOINT_URL")
if not self.access_key:
missing_vars.append("AWS_ACCESS_KEY_ID")
if not self.secret_key:
missing_vars.append("AWS_SECRET_ACCESS_KEY")
if not self.bucket_name:
missing_vars.append("AWS_S3_BUCKET")

if missing_vars:
error_msg = (
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logging.error(error_msg)
raise ValueError(error_msg)

logging.info(
f"Initializing S3 client with endpoint: {self.endpoint_url}, "
f"bucket: {self.bucket_name}, region: {self.region}, secure: {self.secure}"
)

# Configure S3 client
config = Config(
signature_version="s3v4",
region_name=self.region,
)

self.s3_client = boto3.client(
"s3",
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=config,
verify=self.secure,
)

# Ensure bucket exists
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
logging.info(f"Successfully connected to bucket: {self.bucket_name}")
except ClientError as e:
if e.response["Error"]["Code"] == "404":
logging.info(f"Creating bucket: {self.bucket_name}")
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={"LocationConstraint": self.region},
)
Comment thread
amindadgar marked this conversation as resolved.
logging.info(f"Successfully created bucket: {self.bucket_name}")
else:
logging.error(f"Error accessing bucket {self.bucket_name}: {str(e)}")
raise

def _get_key(self, community_id: str, activity_type: str, timestamp: str) -> str:
"""Generate a unique S3 key for the data."""
return f"{community_id}/{activity_type}/{timestamp}.json"

def store_extracted_data(self, community_id: str, data: Dict[str, Any]) -> str:
"""Store extracted data in S3."""
timestamp = datetime.now(tz=timezone.utc).isoformat()
key = self._get_key(community_id, "extracted", timestamp)

self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(data),
ContentType="application/json",
)
Comment thread
amindadgar marked this conversation as resolved.
return key

def store_transformed_data(
self, community_id: str, documents: List[Document]
) -> str:
"""Store transformed documents in S3."""
timestamp = datetime.now(tz=timezone.utc).isoformat()
key = self._get_key(community_id, "transformed", timestamp)

# Convert Documents to dict for JSON serialization
docs_data = [doc.to_dict() for doc in documents]

self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(docs_data),
ContentType="application/json",
)
Comment thread
amindadgar marked this conversation as resolved.
return key

def get_data_by_key(self, key: str) -> Dict[str, Any]:
"""Get data from S3 using a specific key."""
try:
obj = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
return json.loads(obj["Body"].read().decode("utf-8"))
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
logging.error(f"No data found for key: {key}")
raise ValueError(f"No data found for key: {key}")
logging.error(f"Error retrieving data for key {key}: {str(e)}")
raise
Comment thread
amindadgar marked this conversation as resolved.

def get_latest_data(self, community_id: str, activity_type: str) -> Dict[str, Any]:
"""Get the most recent data for a community and activity type."""
prefix = f"{community_id}/{activity_type}/"

try:
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix,
MaxKeys=1,
)

if "Contents" not in response:
logging.error(f"No data found for prefix: {prefix}")
return None

latest_key = response["Contents"][0]["Key"]
return self.get_data_by_key(latest_key)

Comment thread
amindadgar marked this conversation as resolved.
Outdated
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
return None
raise
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pydantic==2.9.2
motor>=3.6, <4.0.0
tc-temporal-backend==1.0.0
wikiteam3-fork-proxy==1.0.0
boto3>=1.38.19
botocore>=1.38.19