Skip to content

Commit 8916dc2

Browse files
Merge pull request #7 from DataLabTechTV/dev
feat: support for end-to-end ML workflows
2 parents a9999d3 + f14d51e commit 8916dc2

39 files changed

Lines changed: 3260 additions & 100 deletions

.env.example

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ S3_REGION=eu-west-1
1717
S3_BUCKET=lakehouse
1818
S3_INGEST_PREFIX=raw
1919
S3_STAGE_PREFIX=stage
20+
S3_SECURE_STAGE_PREFIX=secure-stage
2021
S3_GRAPHS_MART_PREFIX=marts/graphs
2122
S3_ANALYTICS_MART_PREFIX=marts/analytics
2223
S3_EXPORTS_PREFIX=exports
@@ -30,6 +31,7 @@ S3_BACKUPS_PREFIX=backups
3031

3132
ENGINE_DB=engine.duckdb
3233
STAGE_DB=stage.sqlite
34+
SECURE_STAGE_DB=secure_stage.sqlite
3335
GRAPHS_MART_DB=marts/graphs.sqlite
3436
ANALYTICS_MART_DB=marts/analytics.sqlite
3537

@@ -43,3 +45,17 @@ ECON_COMP_GRAPH_DB=graphs/econ_comp.kuzu
4345
# =====================
4446

4547
OLLAMA_MODELS=gemma3:latest,phi4:latest
48+
49+
# MLflow configurations
50+
# =====================
51+
52+
MLFLOW_TRACKING_URI=http://localhost:5000
53+
MLFLOW_TRACKING_USERNAME=datalabtech
54+
S3_MLFLOW_BUCKET=mlflow
55+
S3_MLFLOW_ARTIFACTS_PREFIX=artifacts
56+
57+
# Kafka configurations
58+
# ====================
59+
60+
KAFKA_BROKER_ENDPOINT=localhost:9092
61+
KAFKA_GROUP_TOPIC_LIST=ml_inference_results:lakehouse-inference-result-consumer,ml_inference_feedback:lakehouse-inference-feedback-consumer

dlctl/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from export.cli import export
1414
from graph.cli import graph
1515
from ingest.cli import ingest
16+
from ml.cli import ml
1617
from shared.cache import cache_usage, expunge_cache
1718
from shared.settings import LOCAL_DIR, MART_DB_VARS, env
1819
from shared.storage import Storage, StoragePrefix
@@ -70,6 +71,7 @@ def dlctl(ctx: click.Context, debug: bool, logfile_enabled: bool, show_version:
7071
dlctl.add_command(ingest)
7172
dlctl.add_command(export)
7273
dlctl.add_command(graph)
74+
dlctl.add_command(ml)
7375

7476
# Backups
7577
# =======

docker-compose.yml

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,19 @@ services:
99
- MINIO_ROOT_PASSWORD=${S3_SECRET_ACCESS_KEY}
1010
volumes:
1111
- minio:/data
12+
networks:
13+
- minio
1214
healthcheck:
1315
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
1416
interval: 10s
1517
retries: 5
1618
restart: unless-stopped
1719
command: server /data --console-address ":9001"
1820

19-
minio-mc:
21+
minio-init:
2022
image: minio/mc:RELEASE.2025-04-16T18-13-26Z
23+
networks:
24+
- minio
2125
depends_on:
2226
minio:
2327
condition: service_healthy
@@ -34,20 +38,25 @@ services:
3438
- "11434:11434"
3539
volumes:
3640
- ollama:/root/.ollama
41+
networks:
42+
- ollama
3743
deploy:
3844
resources:
3945
reservations:
4046
devices:
41-
- capabilities: [gpu]
42-
runtime: nvidia
47+
- driver: nvidia
48+
count: all
49+
capabilities: [gpu]
4350
healthcheck:
4451
test: ["CMD", "ollama", "ls"]
4552
interval: 10s
4653
retries: 3
4754
restart: unless-stopped
4855

49-
ollama-models:
56+
ollama-init:
5057
image: alpine/curl:latest
58+
networks:
59+
- ollama
5160
depends_on:
5261
ollama:
5362
condition: service_healthy
@@ -64,6 +73,112 @@ services:
6473
done
6574
'
6675
restart: no
76+
77+
mlflow:
78+
build:
79+
context: ./docker/mlflow
80+
dockerfile: Dockerfile
81+
ports:
82+
- "5000:5000"
83+
volumes:
84+
- mlflow:/mlflow
85+
networks:
86+
- mlflow
87+
environment:
88+
MLFLOW_S3_ENDPOINT_URL: http://${S3_ENDPOINT}
89+
AWS_ACCESS_KEY_ID: ${S3_ACCESS_KEY_ID}
90+
AWS_SECRET_ACCESS_KEY: ${S3_SECRET_ACCESS_KEY}
91+
AWS_DEFAULT_REGION: ${S3_REGION}
92+
AWS_S3_ADDRESSING_STYLE: ${S3_URL_STYLE}
93+
command: >
94+
mlflow server
95+
--backend-store-uri sqlite:///mlflow/mlflow.db
96+
--serve-artifacts
97+
--artifacts-destination s3://${S3_MLFLOW_BUCKET}/${S3_MLFLOW_ARTIFACTS_PREFIX}
98+
--host 0.0.0.0
99+
--port 5000
100+
healthcheck:
101+
test: >
102+
python -c "import urllib.request;
103+
urllib.request.urlopen('http://localhost:5000')"
104+
interval: 10s
105+
retries: 5
106+
restart: unless-stopped
107+
108+
kafka:
109+
image: apache/kafka:4.0.0
110+
ports:
111+
- "9092:9092"
112+
environment:
113+
KAFKA_NODE_ID: 1
114+
KAFKA_PROCESS_ROLES: broker,controller
115+
116+
KAFKA_LISTENERS: EXTERNAL://:9092,INTERNAL://:29092,CONTROLLER://:9093
117+
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:9092,INTERNAL://kafka:29092
118+
119+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
120+
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
121+
122+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
123+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
124+
125+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
126+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
127+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
128+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
129+
130+
KAFKA_LOG_DIRS: /var/lib/kafka/data
131+
volumes:
132+
- kafka:/var/lib/kafka/data
133+
networks:
134+
- kafka
135+
healthcheck:
136+
test: [
137+
"CMD", "bash", "-c",
138+
"/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 --list"
139+
]
140+
interval: 10s
141+
retries: 5
142+
restart: unless-stopped
143+
144+
kafka-init:
145+
image: apache/kafka:4.0.0
146+
environment:
147+
KAFKA_GROUP_TOPIC_LIST: ${KAFKA_GROUP_TOPIC_LIST}
148+
networks:
149+
- kafka
150+
depends_on:
151+
kafka:
152+
condition: service_healthy
153+
command: |
154+
/bin/bash -c '
155+
for topic_group in $${KAFKA_GROUP_TOPIC_LIST//,/ }; do
156+
IFS=':' read -r topic group <<< "$$topic_group"
157+
158+
echo "Creating topic: $$topic"
159+
/opt/kafka/bin/kafka-topics.sh \
160+
--bootstrap-server kafka:29092 \
161+
--create --if-not-exists --topic $$topic \
162+
--partitions 1 --replication-factor 1
163+
164+
echo "Initializing consumer for topic $$topic and group $$group"
165+
/opt/kafka/bin/kafka-console-consumer.sh \
166+
--bootstrap-server kafka:29092 \
167+
--topic $$topic \
168+
--group $$group \
169+
--timeout-ms 5000
170+
done
171+
'
172+
restart: no
173+
67174
volumes:
68175
minio:
69176
ollama:
177+
mlflow:
178+
kafka:
179+
180+
networks:
181+
ollama:
182+
minio:
183+
mlflow:
184+
kafka:

docker/mlflow/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
FROM ghcr.io/mlflow/mlflow:v3.2.0
2+
3+
RUN pip install boto3

graph/visualization.py

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,12 @@
33

44
import geopandas as gpd
55
import kagglehub
6-
import matplotlib as mpl
76
import matplotlib.pyplot as plt
87
import networkx as nx
98
import numpy as np
109
import pandas as pd
11-
from matplotlib.colors import LinearSegmentedColormap, to_hex
1210

13-
COLOR_PALETTE = [
14-
"#42b0f9",
15-
"#ff5c92",
16-
"#ffcc00",
17-
"#9900ff",
18-
"#92ff5c",
19-
"#f98242",
20-
]
11+
from shared.color import COLOR_PALETTE, get_palette
2112

2213

2314
def set_labels(G: nx.Graph, label_props: dict[str, str]):
@@ -31,29 +22,6 @@ def set_labels(G: nx.Graph, label_props: dict[str, str]):
3122
data["label"] = data[prop]
3223

3324

34-
def darken_color(color, amount=0.5) -> tuple[float, float, float]:
35-
c = mpl.colors.to_rgb(color)
36-
return tuple(max(0, min(1, channel * (1 - amount))) for channel in c)
37-
38-
39-
def get_palette(n_colors: int = 3, darken: bool = False, reverse: bool = False):
40-
color_palette = list(COLOR_PALETTE)
41-
42-
if reverse:
43-
color_palette = reversed(color_palette)
44-
45-
if darken:
46-
color_palette = [darken_color(c) for c in color_palette]
47-
else:
48-
color_palette = list(color_palette)
49-
50-
if n_colors <= len(color_palette):
51-
return color_palette
52-
53-
cmap = LinearSegmentedColormap.from_list("custom", color_palette)
54-
return [to_hex(cmap(i)) for i in np.linspace(0, 1, n_colors)]
55-
56-
5725
def plot(
5826
G: nx.Graph,
5927
name_prop: str = "label",

ingest/fetcher.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from loguru import logger as log
88
from tqdm import tqdm
99

10-
from shared.cache import get_requests_cache_session
1110
from shared.storage import Storage, StoragePrefix
1211

1312
DATACITE_API_URL = "https://api.datacite.org/"
@@ -17,7 +16,7 @@ class DataCiteFetcher:
1716
def __init__(self, s3_dir_path: str):
1817
self.s3_dir_path = s3_dir_path
1918
self.storage = Storage(StoragePrefix.INGEST)
20-
self.session = get_requests_cache_session("datacite")
19+
self.session = requests.Session()
2120

2221
def to_canonical_doi(self, doi: str) -> str:
2322
rel_path = urlsplit(doi).path.removeprefix("/")

ingest/handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
import git
44
import kagglehub as kh
55
from loguru import logger as log
6-
from slugify import slugify
76

87
from ingest.fetcher import DataCiteFetcher
98
from ingest.parser import DatasetURL
109
from ingest.template.base import DataCiteTemplate, DatasetTemplate, DatasetTemplateID
1110
from shared.cache import get_cache_dir
1211
from shared.storage import Storage, StoragePrefix
12+
from shared.utils import fn_sanitize
1313

1414

1515
def handle_standalone(dataset: str):
16-
ds_name = slugify(dataset, separator="_")
16+
ds_name = fn_sanitize(dataset)
1717
log.info("Standalone detected, creating dataset: {}", ds_name)
1818

1919
try:
@@ -25,7 +25,7 @@ def handle_standalone(dataset: str):
2525

2626

2727
def handle_template(dataset: str, template_id: DatasetTemplateID):
28-
ds_name = slugify(dataset, separator="_")
28+
ds_name = fn_sanitize(dataset)
2929
template = DatasetTemplate.from_id(template_id)
3030

3131
log.info(

ingest/parser.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Self
33
from urllib.parse import urlparse
44

5-
from slugify import slugify
5+
from shared.utils import fn_sanitize
66

77

88
@dataclass
@@ -20,7 +20,7 @@ def parse(cls, dataset_url: str) -> Self:
2020
author = path[-2]
2121
slug = path[-1]
2222
handle = f"{author}/{slug}"
23-
name = slugify(slug, separator="_")
23+
name = fn_sanitize(slug)
2424

2525
ds_url = cls(
2626
author=author,

0 commit comments

Comments
 (0)