Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agtech-ops/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.venv/
__pycache__/
*.pyc
*.db
*.db-journal
*.egg-info/
.pytest_cache/
tests/
6 changes: 6 additions & 0 deletions agtech-ops/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.venv/
__pycache__/
*.pyc
*.db
*.egg-info/
.pytest_cache/
18 changes: 18 additions & 0 deletions agtech-ops/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.12-slim

WORKDIR /app

# Install the package with file-processing + dashboard extras.
COPY pyproject.toml README.md ./
COPY agtech_ops ./agtech_ops
RUN pip install --no-cache-dir -e ".[files,dashboard]"

ENV AGTECH_FORCE_RULE_BASED=1 \
AGTECH_DATABASE_URL=sqlite:////data/agtech_ops.db

# SQLite lives on a volume so data survives container restarts.
VOLUME ["/data"]
EXPOSE 8501

# $PORT is honored when present (e.g. on Render), else default to 8501.
CMD ["sh", "-c", "streamlit run agtech_ops/dashboard.py --server.port ${PORT:-8501} --server.address 0.0.0.0 --server.headless true --browser.gatherUsageStats false"]
140 changes: 140 additions & 0 deletions agtech-ops/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# AgTech Ops Hub

A contextualized operations hub for farming businesses. It ingests data from
several messy real-world sources, resolves them onto shared farm entities, and
turns the combined picture into **summaries and action items** for ops teams.

```
INGEST → NORMALIZE + STORE → CONTEXTUALIZE + SUMMARIZE → DELIVER to ops
```

This is **Sprint 1**: a working vertical slice that proves the full loop
end-to-end with zero external services or API keys required.

## What works today (Sprint 1)

- **Ingest a range of files** — one intake that dispatches on file type:
- **Tabular** (carry their own farm/asset columns): `.csv`, `.tsv`,
`.xlsx`/`.xls`, `.json`. Flexible/aliased column names, per-row error
reporting (one bad row never aborts the file), extra columns preserved.
- **Free text** (converted into data): `.txt`, `.md`, `.log`, `.pdf`,
`.docx`. Each paragraph becomes an event; an optional leading date is used
as its timestamp; the asset is inferred from known asset names.
- **WhatsApp** chat exports are auto-detected (both `[date, time] Name:` and
`date, time - Name:` formats, multi-line messages supported).
- **Video/clip metadata & tags** — clip metadata (from an upstream vision model,
e.g. YOLO on a Jetson) with `tags`, `duration`, `camera` columns is
auto-detected and ingested as `media` events. Tags become **workflow signals**
and feed action generation (a clip tagged `lame` raises a vet task).
- **Compile & aggregate** — a cross-source roll-up: totals, per-source and
per-asset counts, **top tags**, clip counts, and numeric metric **time series**
assembled from every file (e.g. milk yield from a CSV + a JSON partner feed).
- **AI action-item log agent** — a "Haiku agent" turns incoming bridge data
(messages, files, clip tags) into an append-only, prioritized action-item log,
each entry carrying a **rationale** and the agent that produced it. Uses Claude
Haiku via LiteLLM when a key is set, with a deterministic offline fallback.
- **Contextualize** — every record is resolved onto a canonical model so data
from different sources lines up on the same thing:

```
Farm --< Asset (herd | crop | field) --< Event >-- ActionItem
```

- **Summarize** — a batch of events becomes a `{summary, points, action_items}`
result. Two interchangeable backends:
- `rule_based` (default): deterministic, offline, keyword-triggered action
items with priority, owner (from the message author) and a suggested due
date. No keys needed.
- `llm` (optional): LiteLLM + instructor for structured output from any model,
used automatically when the `ai` extra is installed **and** an API key is set.
- **Deliver**
- **FastAPI** JSON API (`/ingest/csv`, `/ingest/whatsapp`, `/summarize`,
`/action-items`, `/health`).
- **Streamlit** dashboard for ingesting data and reviewing open action items.

## Quick start

```bash
cd agtech-ops
python3 -m venv .venv && . .venv/bin/activate # or use your environment
pip install -e ".[dev,files]" # core + tests + Excel/PDF/Word support
# optional: pip install -e ".[ai,dashboard]"

# Run the API
uvicorn agtech_ops.api:app --reload

# Ingest a range of files at once (incl. video/clip tag metadata)
curl -F "files=@sample_data/herd.csv" \
-F "files=@sample_data/partner_feed.json" \
-F "files=@sample_data/field_notes.txt" \
-F "files=@sample_data/clips.json" \
-F "farm=Green Acres" \
http://localhost:8000/ingest/files

# Compile / aggregate everything ingested (incl. top tags + clip counts)
curl http://localhost:8000/report

# Run the action-item-log agent, then read the log
curl -X POST "http://localhost:8000/agent/run"
curl http://localhost:8000/agent/log

# Or the dashboard (needs the 'dashboard' extra)
streamlit run agtech_ops/dashboard.py
```

## Tests

```bash
pytest # 15 tests, fully offline (forces the rule-based summarizer)
```

## Configuration

All optional; sensible defaults mean it runs with nothing set.

| Env var | Default | Purpose |
|---|---|---|
| `AGTECH_DATABASE_URL` | `sqlite:///agtech_ops.db` | Any SQLAlchemy URL (e.g. Postgres). |
| `AGTECH_LLM_MODEL` | `anthropic/claude-3-5-haiku-latest` | LiteLLM model id for the agent. |
| `AGTECH_FORCE_RULE_BASED` | `false` | Force the offline agent. |
| `ANTHROPIC_API_KEY` / `OPENAI_API_KEY` / … | – | Enables the Haiku/LLM agent. |

## Architecture (mapped to the awesome-python catalog)

| Layer | Library (from this repo's list) |
|---|---|
| API / webhooks | FastAPI, uvicorn |
| Tabular parsing | pandas |
| Validation | pydantic |
| Storage / entity resolution | SQLAlchemy |
| Summaries (AI) | LiteLLM, instructor |
| Dashboard | Streamlit, Plotly |
| Scheduling (future) | APScheduler / Prefect / Dagster |

## Roadmap

- **Sprint 1 (done):** CSV + WhatsApp ingest → contextualized action items via
API + dashboard, with offline + LLM summarizers.
- **Sprint 2:** Live **Dropbox** sync and **WhatsApp Business API** webhook;
scheduled pulls (APScheduler); push action items back to chat/email.
- **Sprint 3:** Smarter entity resolution (fuzzy asset matching, aliases),
per-farm dashboards, trend charts, action-item status workflow.

## Open questions — where I need more detail

These are the decisions that will most shape Sprints 2–3:

1. **Partner CSV schemas.** What columns do your real partners send? Are they
stable, or do we need per-partner mappings? Right now I infer common aliases.
2. **Dropbox layout.** Folder structure and file naming for herd/crop/operations
data, and whether to use a service account or per-user OAuth.
3. **WhatsApp source.** Live (WhatsApp Business API / Twilio / Meta Cloud API)
or periodic chat-export uploads? Live changes the auth + webhook design.
4. **Entity naming.** How are farms/herds/fields named across sources so we can
match them reliably? Is there a master list/IDs we should sync from?
5. **Action item destination.** Where do ops want items delivered — dashboard
only, back into WhatsApp, email, or an existing task tool?
6. **AI provider + data policy.** Which model/provider is acceptable, and any
constraints on sending farm/staff messages to a third-party LLM.
7. **Deployment + scale.** Single farm vs. multi-tenant ("agrefine network"?),
expected data volume, and where this should run.
3 changes: 3 additions & 0 deletions agtech-ops/agtech_ops/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""AgTech Ops Hub: contextualize multi-source farm data into action items."""

__version__ = "0.1.0"
46 changes: 46 additions & 0 deletions agtech-ops/agtech_ops/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Action-item-log agent.

This is the "Haiku agent" entry point: as data flows in from the bridge
(WhatsApp, Dropbox, partner files, video/clip tags), it distills an append-only
**action-item log** with provenance — what to do, who owns it, why it was
raised, and which agent produced it.

It delegates to the configured summarizer backend:
- ``llm`` — Claude Haiku (or any LiteLLM model) when the AI extra + an API
key are present. Cheap, fast, good at turning chatter + tags into tasks.
- ``rule_based`` — deterministic offline fallback, so the log always builds.

Each run appends to the ``action_items`` table (the log); existing entries are
preserved so the log is a running history, not a snapshot.
"""

from __future__ import annotations

import datetime as dt

from .schemas import SummaryResult
from .service import list_action_items, summarize_and_store
from .summarize import get_summarizer


def agent_name() -> str:
"""Human-readable name of the active agent backend."""
return get_summarizer().name


def build_action_log(
farm: str | None = None,
since_days: int | None = None,
) -> SummaryResult:
"""Process incoming data and append to the action-item log."""
since = None
if since_days is not None:
since = dt.datetime.now() - dt.timedelta(days=since_days)
return summarize_and_store(farm=farm, since=since, persist=True)


def action_log(limit: int | None = None) -> list[dict]:
"""Return the current open action-item log (newest first)."""
items = list_action_items()
items.sort(key=lambda a: (a.get("logged_at") or ""), reverse=True)
return items[:limit] if limit else items
137 changes: 137 additions & 0 deletions agtech-ops/agtech_ops/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""FastAPI surface for the AgTech Ops Hub."""

from __future__ import annotations

import datetime as dt
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile

from .agent import action_log, agent_name, build_action_log
from .db import init_db
from .ingest import SUPPORTED_EXTENSIONS, parse_partner_csv, parse_whatsapp_export
from .models import ActionStatus
from .schemas import AggregateReport, FileIngestResult, IngestResult, SummaryResult
from .service import (
aggregate,
ingest_files,
known_asset_names,
list_action_items,
store_events,
summarize_and_store,
)
from .summarize import get_summarizer

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
init_db()
yield


app = FastAPI(
title="AgTech Ops Hub",
version="0.1.0",
description=(
"Ingest partner CSVs, Dropbox exports and WhatsApp chatter, contextualize "
"them onto shared farm assets, and turn them into action items for ops teams."
),
lifespan=lifespan,
)


@app.get("/health")
def health() -> dict:
return {
"status": "ok",
"summarizer": get_summarizer().name,
"supported_files": sorted(SUPPORTED_EXTENSIONS),
}


@app.post("/ingest/files", response_model=FileIngestResult)
async def ingest_files_endpoint(
files: list[UploadFile] = File(...),
farm: str | None = Form(None),
default_asset: str = Form("General"),
) -> FileIngestResult:
"""Ingest a range of files (CSV/TSV/Excel/JSON/TXT/MD/LOG/PDF/DOCX).

Tabular files carry their own farm/asset columns; free-text files use the
optional ``farm`` for context resolution.
"""
payload = [(f.filename or "upload", await f.read()) for f in files]
result = ingest_files(payload, farm=farm, default_asset=default_asset)
if result.events_ingested == 0 and result.errors:
raise HTTPException(status_code=422, detail=result.errors)
return result


@app.get("/report", response_model=AggregateReport)
def report(farm: str | None = Query(None)) -> AggregateReport:
"""Compiled, aggregated view across all ingested sources."""
return aggregate(farm=farm)


@app.post("/ingest/csv", response_model=IngestResult)
async def ingest_csv(file: UploadFile = File(...)) -> IngestResult:
raw = await file.read()
events, errors = parse_partner_csv(raw)
if not events and errors:
raise HTTPException(status_code=422, detail=errors)
return store_events(events, errors)


@app.post("/ingest/whatsapp", response_model=IngestResult)
async def ingest_whatsapp(
text: str = Form(...),
farm: str = Form(...),
default_asset: str = Form("General"),
) -> IngestResult:
events, errors = parse_whatsapp_export(
text,
farm=farm,
known_assets=known_asset_names(farm),
default_asset=default_asset,
)
if not events and errors:
raise HTTPException(status_code=422, detail=errors)
return store_events(events, errors)


@app.post("/summarize", response_model=SummaryResult)
def summarize(
farm: str | None = Query(None),
since_days: int | None = Query(None, ge=0),
persist: bool = Query(True),
) -> SummaryResult:
since = None
if since_days is not None:
since = dt.datetime.now() - dt.timedelta(days=since_days)
return summarize_and_store(farm=farm, since=since, persist=persist)


@app.get("/action-items")
def action_items(status: ActionStatus | None = Query(ActionStatus.open)) -> list[dict]:
return list_action_items(status=status)


@app.get("/agent")
def agent_info() -> dict:
"""Which action-item-log agent is active (haiku-agent or rule_based)."""
return {"agent": agent_name()}


@app.post("/agent/run", response_model=SummaryResult)
def agent_run(
farm: str | None = Query(None),
since_days: int | None = Query(None, ge=0),
) -> SummaryResult:
"""Run the agent over incoming data and append to the action-item log."""
return build_action_log(farm=farm, since_days=since_days)


@app.get("/agent/log")
def agent_log(limit: int | None = Query(None, ge=1)) -> list[dict]:
"""Read the current action-item log (newest first)."""
return action_log(limit=limit)
Loading