From f40765f84c2305dec15d3f8dcc9dc9481077c648 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 28 Jun 2026 21:01:13 +0000 Subject: [PATCH 1/7] =?UTF-8?q?Add=20AgTech=20Ops=20Hub:=20contextualized?= =?UTF-8?q?=20farm=20data=20=E2=86=92=20action=20items=20(Sprint=201)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: leanmachine1209-ui --- agtech-ops/.gitignore | 6 + agtech-ops/README.md | 117 +++++++++++++ agtech-ops/agtech_ops/__init__.py | 3 + agtech-ops/agtech_ops/api.py | 85 ++++++++++ agtech-ops/agtech_ops/config.py | 37 +++++ agtech-ops/agtech_ops/dashboard.py | 72 ++++++++ agtech-ops/agtech_ops/db.py | 84 ++++++++++ agtech-ops/agtech_ops/ingest/__init__.py | 6 + agtech-ops/agtech_ops/ingest/csv_ingest.py | 157 ++++++++++++++++++ .../agtech_ops/ingest/whatsapp_ingest.py | 121 ++++++++++++++ agtech-ops/agtech_ops/models.py | 135 +++++++++++++++ agtech-ops/agtech_ops/schemas.py | 57 +++++++ agtech-ops/agtech_ops/service.py | 142 ++++++++++++++++ agtech-ops/agtech_ops/summarize/__init__.py | 37 +++++ agtech-ops/agtech_ops/summarize/base.py | 32 ++++ agtech-ops/agtech_ops/summarize/llm.py | 48 ++++++ agtech-ops/agtech_ops/summarize/rule_based.py | 130 +++++++++++++++ agtech-ops/pyproject.toml | 41 +++++ agtech-ops/sample_data/herd.csv | 6 + agtech-ops/sample_data/whatsapp_export.txt | 6 + agtech-ops/tests/conftest.py | 31 ++++ agtech-ops/tests/test_api.py | 55 ++++++ agtech-ops/tests/test_csv_ingest.py | 53 ++++++ agtech-ops/tests/test_summarize.py | 51 ++++++ agtech-ops/tests/test_whatsapp_ingest.py | 39 +++++ 25 files changed, 1551 insertions(+) create mode 100644 agtech-ops/.gitignore create mode 100644 agtech-ops/README.md create mode 100644 agtech-ops/agtech_ops/__init__.py create mode 100644 agtech-ops/agtech_ops/api.py create mode 100644 agtech-ops/agtech_ops/config.py create mode 100644 agtech-ops/agtech_ops/dashboard.py create mode 100644 agtech-ops/agtech_ops/db.py create mode 100644 agtech-ops/agtech_ops/ingest/__init__.py create mode 100644 agtech-ops/agtech_ops/ingest/csv_ingest.py create mode 100644 agtech-ops/agtech_ops/ingest/whatsapp_ingest.py create mode 100644 agtech-ops/agtech_ops/models.py create mode 100644 agtech-ops/agtech_ops/schemas.py create mode 100644 agtech-ops/agtech_ops/service.py create mode 100644 agtech-ops/agtech_ops/summarize/__init__.py create mode 100644 agtech-ops/agtech_ops/summarize/base.py create mode 100644 agtech-ops/agtech_ops/summarize/llm.py create mode 100644 agtech-ops/agtech_ops/summarize/rule_based.py create mode 100644 agtech-ops/pyproject.toml create mode 100644 agtech-ops/sample_data/herd.csv create mode 100644 agtech-ops/sample_data/whatsapp_export.txt create mode 100644 agtech-ops/tests/conftest.py create mode 100644 agtech-ops/tests/test_api.py create mode 100644 agtech-ops/tests/test_csv_ingest.py create mode 100644 agtech-ops/tests/test_summarize.py create mode 100644 agtech-ops/tests/test_whatsapp_ingest.py diff --git a/agtech-ops/.gitignore b/agtech-ops/.gitignore new file mode 100644 index 0000000000..7e2898127b --- /dev/null +++ b/agtech-ops/.gitignore @@ -0,0 +1,6 @@ +.venv/ +__pycache__/ +*.pyc +*.db +*.egg-info/ +.pytest_cache/ diff --git a/agtech-ops/README.md b/agtech-ops/README.md new file mode 100644 index 0000000000..270ea037c4 --- /dev/null +++ b/agtech-ops/README.md @@ -0,0 +1,117 @@ +# AgTech Ops Hub + +A contextualized operations hub for farming businesses. It ingests data from +several messy real-world sources, resolves them onto shared farm entities, and +turns the combined picture into **summaries and action items** for ops teams. + +``` +INGEST → NORMALIZE + STORE → CONTEXTUALIZE + SUMMARIZE → DELIVER to ops +``` + +This is **Sprint 1**: a working vertical slice that proves the full loop +end-to-end with zero external services or API keys required. + +## What works today (Sprint 1) + +- **Ingest** + - Partner / Dropbox **CSV** uploads, with flexible/aliased column names and + per-row error reporting (one bad row never aborts the file). + - **WhatsApp** chat exports (both `[date, time] Name:` and `date, time - Name:` + formats, multi-line messages supported). +- **Contextualize** — every record is resolved onto a canonical model so data + from different sources lines up on the same thing: + + ``` + Farm --< Asset (herd | crop | field) --< Event >-- ActionItem + ``` + +- **Summarize** — a batch of events becomes a `{summary, points, action_items}` + result. Two interchangeable backends: + - `rule_based` (default): deterministic, offline, keyword-triggered action + items with priority, owner (from the message author) and a suggested due + date. No keys needed. + - `llm` (optional): LiteLLM + instructor for structured output from any model, + used automatically when the `ai` extra is installed **and** an API key is set. +- **Deliver** + - **FastAPI** JSON API (`/ingest/csv`, `/ingest/whatsapp`, `/summarize`, + `/action-items`, `/health`). + - **Streamlit** dashboard for ingesting data and reviewing open action items. + +## Quick start + +```bash +cd agtech-ops +python3 -m venv .venv && . .venv/bin/activate # or use your environment +pip install -e ".[dev]" # core + test deps +# optional: pip install -e ".[ai,dashboard]" + +# Run the API +uvicorn agtech_ops.api:app --reload + +# Try it with the bundled sample data +curl -F "file=@sample_data/herd.csv" http://localhost:8000/ingest/csv +curl -F "text=$(cat sample_data/whatsapp_export.txt)" -F "farm=Green Acres" \ + http://localhost:8000/ingest/whatsapp +curl -X POST "http://localhost:8000/summarize" +curl http://localhost:8000/action-items + +# Or the dashboard (needs the 'dashboard' extra) +streamlit run agtech_ops/dashboard.py +``` + +## Tests + +```bash +pytest # 15 tests, fully offline (forces the rule-based summarizer) +``` + +## Configuration + +All optional; sensible defaults mean it runs with nothing set. + +| Env var | Default | Purpose | +|---|---|---| +| `AGTECH_DATABASE_URL` | `sqlite:///agtech_ops.db` | Any SQLAlchemy URL (e.g. Postgres). | +| `AGTECH_LLM_MODEL` | `gpt-4o-mini` | LiteLLM model id for the AI backend. | +| `AGTECH_FORCE_RULE_BASED` | `false` | Force the offline summarizer. | +| `OPENAI_API_KEY` / `ANTHROPIC_API_KEY` / … | – | Enables the LLM backend. | + +## Architecture (mapped to the awesome-python catalog) + +| Layer | Library (from this repo's list) | +|---|---| +| API / webhooks | FastAPI, uvicorn | +| Tabular parsing | pandas | +| Validation | pydantic | +| Storage / entity resolution | SQLAlchemy | +| Summaries (AI) | LiteLLM, instructor | +| Dashboard | Streamlit, Plotly | +| Scheduling (future) | APScheduler / Prefect / Dagster | + +## Roadmap + +- **Sprint 1 (done):** CSV + WhatsApp ingest → contextualized action items via + API + dashboard, with offline + LLM summarizers. +- **Sprint 2:** Live **Dropbox** sync and **WhatsApp Business API** webhook; + scheduled pulls (APScheduler); push action items back to chat/email. +- **Sprint 3:** Smarter entity resolution (fuzzy asset matching, aliases), + per-farm dashboards, trend charts, action-item status workflow. + +## Open questions — where I need more detail + +These are the decisions that will most shape Sprints 2–3: + +1. **Partner CSV schemas.** What columns do your real partners send? Are they + stable, or do we need per-partner mappings? Right now I infer common aliases. +2. **Dropbox layout.** Folder structure and file naming for herd/crop/operations + data, and whether to use a service account or per-user OAuth. +3. **WhatsApp source.** Live (WhatsApp Business API / Twilio / Meta Cloud API) + or periodic chat-export uploads? Live changes the auth + webhook design. +4. **Entity naming.** How are farms/herds/fields named across sources so we can + match them reliably? Is there a master list/IDs we should sync from? +5. **Action item destination.** Where do ops want items delivered — dashboard + only, back into WhatsApp, email, or an existing task tool? +6. **AI provider + data policy.** Which model/provider is acceptable, and any + constraints on sending farm/staff messages to a third-party LLM. +7. **Deployment + scale.** Single farm vs. multi-tenant ("agrefine network"?), + expected data volume, and where this should run. diff --git a/agtech-ops/agtech_ops/__init__.py b/agtech-ops/agtech_ops/__init__.py new file mode 100644 index 0000000000..b20bc19f3d --- /dev/null +++ b/agtech-ops/agtech_ops/__init__.py @@ -0,0 +1,3 @@ +"""AgTech Ops Hub: contextualize multi-source farm data into action items.""" + +__version__ = "0.1.0" diff --git a/agtech-ops/agtech_ops/api.py b/agtech-ops/agtech_ops/api.py new file mode 100644 index 0000000000..f1f5dedd93 --- /dev/null +++ b/agtech-ops/agtech_ops/api.py @@ -0,0 +1,85 @@ +"""FastAPI surface for the AgTech Ops Hub.""" + +from __future__ import annotations + +import datetime as dt +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile + +from .db import init_db +from .ingest import parse_partner_csv, parse_whatsapp_export +from .models import ActionStatus +from .schemas import IngestResult, SummaryResult +from .service import ( + known_asset_names, + list_action_items, + store_events, + summarize_and_store, +) +from .summarize import get_summarizer + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + init_db() + yield + + +app = FastAPI( + title="AgTech Ops Hub", + version="0.1.0", + description=( + "Ingest partner CSVs, Dropbox exports and WhatsApp chatter, contextualize " + "them onto shared farm assets, and turn them into action items for ops teams." + ), + lifespan=lifespan, +) + + +@app.get("/health") +def health() -> dict: + return {"status": "ok", "summarizer": get_summarizer().name} + + +@app.post("/ingest/csv", response_model=IngestResult) +async def ingest_csv(file: UploadFile = File(...)) -> IngestResult: + raw = await file.read() + events, errors = parse_partner_csv(raw) + if not events and errors: + raise HTTPException(status_code=422, detail=errors) + return store_events(events, errors) + + +@app.post("/ingest/whatsapp", response_model=IngestResult) +async def ingest_whatsapp( + text: str = Form(...), + farm: str = Form(...), + default_asset: str = Form("General"), +) -> IngestResult: + events, errors = parse_whatsapp_export( + text, + farm=farm, + known_assets=known_asset_names(farm), + default_asset=default_asset, + ) + if not events and errors: + raise HTTPException(status_code=422, detail=errors) + return store_events(events, errors) + + +@app.post("/summarize", response_model=SummaryResult) +def summarize( + farm: str | None = Query(None), + since_days: int | None = Query(None, ge=0), + persist: bool = Query(True), +) -> SummaryResult: + since = None + if since_days is not None: + since = dt.datetime.now() - dt.timedelta(days=since_days) + return summarize_and_store(farm=farm, since=since, persist=persist) + + +@app.get("/action-items") +def action_items(status: ActionStatus | None = Query(ActionStatus.open)) -> list[dict]: + return list_action_items(status=status) diff --git a/agtech-ops/agtech_ops/config.py b/agtech-ops/agtech_ops/config.py new file mode 100644 index 0000000000..00b797fc24 --- /dev/null +++ b/agtech-ops/agtech_ops/config.py @@ -0,0 +1,37 @@ +"""Runtime configuration, read from environment variables. + +Everything has a sensible default so the app runs end-to-end with zero setup. +Secrets (LLM keys, Dropbox/WhatsApp tokens) are optional and only needed for +the live integrations. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass + + +def _env_bool(name: str, default: bool) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +@dataclass(frozen=True) +class Settings: + # SQLAlchemy URL. Defaults to a local SQLite file so there is nothing to + # provision for a first run. + database_url: str = os.getenv("AGTECH_DATABASE_URL", "sqlite:///agtech_ops.db") + + # LLM model string understood by LiteLLM (e.g. "gpt-4o-mini", + # "anthropic/claude-3-5-sonnet"). Only used when AI extras + a key exist. + llm_model: str = os.getenv("AGTECH_LLM_MODEL", "gpt-4o-mini") + + # Force the deterministic summarizer even if AI deps/keys are available. + # Handy for tests and offline demos. + force_rule_based: bool = _env_bool("AGTECH_FORCE_RULE_BASED", False) + + +def get_settings() -> Settings: + return Settings() diff --git a/agtech-ops/agtech_ops/dashboard.py b/agtech-ops/agtech_ops/dashboard.py new file mode 100644 index 0000000000..9516316d4a --- /dev/null +++ b/agtech-ops/agtech_ops/dashboard.py @@ -0,0 +1,72 @@ +"""Operator-facing Streamlit dashboard. + +Run with: streamlit run agtech_ops/dashboard.py +Requires the 'dashboard' extra (streamlit, plotly). +""" + +from __future__ import annotations + +import streamlit as st + +from .db import init_db +from .ingest import parse_partner_csv, parse_whatsapp_export +from .models import ActionStatus +from .service import ( + known_asset_names, + list_action_items, + store_events, + summarize_and_store, +) +from .summarize import get_summarizer + + +def main() -> None: + init_db() + st.set_page_config(page_title="AgTech Ops Hub", layout="wide") + st.title("AgTech Ops Hub") + st.caption( + f"Contextualized farm operations · summarizer: **{get_summarizer().name}**" + ) + + with st.sidebar: + st.header("Ingest data") + csv_file = st.file_uploader("Partner / Dropbox CSV", type=["csv"]) + if csv_file is not None and st.button("Ingest CSV"): + events, errors = parse_partner_csv(csv_file.getvalue()) + res = store_events(events, errors) + st.success(f"Ingested {res.events_ingested} events.") + if res.errors: + st.warning("\n".join(res.errors)) + + st.divider() + wa_farm = st.text_input("Farm name (for WhatsApp)", value="Green Acres") + wa_text = st.text_area("Paste WhatsApp export") + if wa_text and st.button("Ingest WhatsApp"): + events, errors = parse_whatsapp_export( + wa_text, farm=wa_farm, known_assets=known_asset_names(wa_farm) + ) + res = store_events(events, errors) + st.success(f"Ingested {res.events_ingested} messages.") + if res.errors: + st.warning("\n".join(res.errors)) + + col1, col2 = st.columns([1, 1]) + with col1: + st.subheader("Summary") + if st.button("Generate summary + action items"): + result = summarize_and_store() + st.write(result.summary) + for p in result.points: + st.markdown(f"- {p}") + + with col2: + st.subheader("Open action items") + items = list_action_items(status=ActionStatus.open) + if items: + st.dataframe(items, use_container_width=True) + else: + st.info("No open action items yet. Ingest data and generate a summary.") + + +if __name__ == "__main__": + main() diff --git a/agtech-ops/agtech_ops/db.py b/agtech-ops/agtech_ops/db.py new file mode 100644 index 0000000000..030ea700a8 --- /dev/null +++ b/agtech-ops/agtech_ops/db.py @@ -0,0 +1,84 @@ +"""Database engine/session helpers and entity-resolution upserts.""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager + +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +from .config import get_settings +from .models import Asset, AssetType, Base, Farm + +_engine = None +_SessionLocal: sessionmaker[Session] | None = None + + +def get_engine(): + global _engine + if _engine is None: + url = get_settings().database_url + connect_args = {"check_same_thread": False} if url.startswith("sqlite") else {} + _engine = create_engine(url, connect_args=connect_args, future=True) + return _engine + + +def init_db() -> None: + Base.metadata.create_all(get_engine()) + + +def get_sessionmaker() -> sessionmaker[Session]: + global _SessionLocal + if _SessionLocal is None: + _SessionLocal = sessionmaker( + bind=get_engine(), expire_on_commit=False, future=True + ) + return _SessionLocal + + +@contextmanager +def session_scope() -> Iterator[Session]: + session = get_sessionmaker()() + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +def get_or_create_farm(session: Session, name: str) -> Farm: + name = name.strip() + farm = session.query(Farm).filter(Farm.name == name).one_or_none() + if farm is None: + farm = Farm(name=name) + session.add(farm) + session.flush() + return farm + + +def get_or_create_asset( + session: Session, + farm: Farm, + name: str, + asset_type: AssetType = AssetType.other, +) -> Asset: + """Resolve an asset by (farm, name), the core of cross-source context.""" + name = name.strip() + asset = ( + session.query(Asset) + .filter(Asset.farm_id == farm.id, Asset.name == name) + .one_or_none() + ) + if asset is None: + asset = Asset(farm_id=farm.id, name=name, type=asset_type) + session.add(asset) + session.flush() + elif asset_type is not AssetType.other and asset.type is AssetType.other: + # Upgrade a placeholder type once we learn what the asset really is. + asset.type = asset_type + session.flush() + return asset diff --git a/agtech-ops/agtech_ops/ingest/__init__.py b/agtech-ops/agtech_ops/ingest/__init__.py new file mode 100644 index 0000000000..73c900813d --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/__init__.py @@ -0,0 +1,6 @@ +"""Ingestors that turn raw source data into validated ``EventIn`` records.""" + +from .csv_ingest import parse_partner_csv +from .whatsapp_ingest import parse_whatsapp_export + +__all__ = ["parse_partner_csv", "parse_whatsapp_export"] diff --git a/agtech-ops/agtech_ops/ingest/csv_ingest.py b/agtech-ops/agtech_ops/ingest/csv_ingest.py new file mode 100644 index 0000000000..f888b36945 --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/csv_ingest.py @@ -0,0 +1,157 @@ +"""Parse partner / Dropbox CSV exports into normalized events. + +Expected (case-insensitive, flexible) columns: + + farm, asset, date, [asset_type], [category], [metric], [value], [notes] + +Partners are messy, so column names are normalized and unknown extras are kept +in ``raw``. Rows that cannot be salvaged are reported as errors rather than +aborting the whole file. +""" + +from __future__ import annotations + +import datetime as dt +import io +import json + +import pandas as pd + +from ..models import AssetType, Source +from ..schemas import EventIn + +# Map common partner header variants onto our canonical names. +_COLUMN_ALIASES = { + "farm": "farm", + "farm_name": "farm", + "site": "farm", + "asset": "asset", + "herd": "asset", + "crop": "asset", + "field": "asset", + "asset_name": "asset", + "paddock": "asset", + "asset_type": "asset_type", + "type": "asset_type", + "date": "date", + "timestamp": "date", + "datetime": "date", + "recorded_at": "date", + "category": "category", + "event": "category", + "metric": "metric", + "measure": "metric", + "value": "value", + "reading": "value", + "amount": "value", + "notes": "notes", + "note": "notes", + "comment": "notes", + "comments": "notes", +} + +_REQUIRED = {"farm", "asset", "date"} + + +def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: + renamed = {} + for col in df.columns: + key = str(col).strip().lower().replace(" ", "_") + renamed[col] = _COLUMN_ALIASES.get(key, key) + return df.rename(columns=renamed) + + +def _coerce_asset_type(raw: object) -> AssetType: + if raw is None or (isinstance(raw, float) and pd.isna(raw)): + return AssetType.other + try: + return AssetType(str(raw).strip().lower()) + except ValueError: + return AssetType.other + + +def parse_partner_csv( + data: str | bytes, + *, + source: Source = Source.csv_partner, +) -> tuple[list[EventIn], list[str]]: + """Return ``(events, errors)`` parsed from CSV ``data``.""" + + if isinstance(data, bytes): + buffer: io.StringIO | io.BytesIO = io.BytesIO(data) + else: + buffer = io.StringIO(data) + + errors: list[str] = [] + try: + df = pd.read_csv(buffer) + except Exception as exc: # noqa: BLE001 - surfaced to caller + return [], [f"could not read CSV: {exc}"] + + if df.empty: + return [], ["CSV contained no rows"] + + df = _normalize_columns(df) + + missing = _REQUIRED - set(df.columns) + if missing: + return [], [f"missing required column(s): {', '.join(sorted(missing))}"] + + known = {"farm", "asset", "asset_type", "date", "category", "metric", "value", "notes"} + extra_cols = [c for c in df.columns if c not in known] + + events: list[EventIn] = [] + for idx, row in df.iterrows(): + rownum = int(idx) + 2 # +1 for header, +1 for 1-based humans + try: + occurred_at = pd.to_datetime(row["date"], errors="coerce") + if pd.isna(occurred_at): + errors.append(f"row {rownum}: unparseable date {row['date']!r}") + continue + + value = row.get("value") + if value is not None and not pd.isna(value): + try: + value = float(value) + except (TypeError, ValueError): + value = None + else: + value = None + + raw_extra = {c: _jsonable(row.get(c)) for c in extra_cols} + + events.append( + EventIn( + farm=str(row["farm"]), + asset=str(row["asset"]), + asset_type=_coerce_asset_type(row.get("asset_type")), + source=source, + occurred_at=occurred_at.to_pydatetime() + if hasattr(occurred_at, "to_pydatetime") + else dt.datetime.fromisoformat(str(occurred_at)), + category=_clean(row.get("category")), + metric=_clean(row.get("metric")), + value=value, + text=_clean(row.get("notes")), + raw=json.dumps(raw_extra) if raw_extra else None, + ) + ) + except Exception as exc: # noqa: BLE001 - per-row resilience + errors.append(f"row {rownum}: {exc}") + + return events, errors + + +def _clean(v: object) -> str | None: + if v is None or (isinstance(v, float) and pd.isna(v)): + return None + s = str(v).strip() + return s or None + + +def _jsonable(v: object): + if v is None or (isinstance(v, float) and pd.isna(v)): + return None + if isinstance(v, (int, float, str, bool)): + return v + return str(v) diff --git a/agtech-ops/agtech_ops/ingest/whatsapp_ingest.py b/agtech-ops/agtech_ops/ingest/whatsapp_ingest.py new file mode 100644 index 0000000000..8575e9699a --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/whatsapp_ingest.py @@ -0,0 +1,121 @@ +"""Parse a WhatsApp chat export into normalized events. + +WhatsApp "Export chat" produces lines like:: + + [2026-06-28, 8:56:01 PM] Alice: North Herd cow #42 looks lame, call vet + 2026/06/28, 20:56 - Bob: Ordered more feed for Field 3 + +Both bracketed and dash formats are supported. Multi-line messages (a line +without a new timestamp header) are appended to the previous message. + +Because a chat does not name the farm/asset in a structured way, the caller +supplies a default ``farm``; the asset is inferred from the message text when a +known asset name is provided, otherwise it falls back to a "General" asset so +nothing is dropped. +""" + +from __future__ import annotations + +import datetime as dt +import re + +from ..models import Source +from ..schemas import EventIn + +# [2026-06-28, 8:56:01 PM] Name: message OR +# 2026/06/28, 20:56 - Name: message +_LINE_RE = re.compile( + r"^\[?\s*" + r"(?P\d{1,4}[-/.]\d{1,2}[-/.]\d{1,4})" + r"[,]?\s+" + r"(?P