diff --git a/agtech-ops/.dockerignore b/agtech-ops/.dockerignore new file mode 100644 index 0000000000..b55811d1b8 --- /dev/null +++ b/agtech-ops/.dockerignore @@ -0,0 +1,8 @@ +.venv/ +__pycache__/ +*.pyc +*.db +*.db-journal +*.egg-info/ +.pytest_cache/ +tests/ diff --git a/agtech-ops/.gitignore b/agtech-ops/.gitignore new file mode 100644 index 0000000000..7e2898127b --- /dev/null +++ b/agtech-ops/.gitignore @@ -0,0 +1,6 @@ +.venv/ +__pycache__/ +*.pyc +*.db +*.egg-info/ +.pytest_cache/ diff --git a/agtech-ops/Dockerfile b/agtech-ops/Dockerfile new file mode 100644 index 0000000000..8faf72d94d --- /dev/null +++ b/agtech-ops/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install the package with file-processing + dashboard extras. +COPY pyproject.toml README.md ./ +COPY agtech_ops ./agtech_ops +RUN pip install --no-cache-dir -e ".[files,dashboard]" + +ENV AGTECH_FORCE_RULE_BASED=1 \ + AGTECH_DATABASE_URL=sqlite:////data/agtech_ops.db + +# SQLite lives on a volume so data survives container restarts. +VOLUME ["/data"] +EXPOSE 8501 + +# $PORT is honored when present (e.g. on Render), else default to 8501. +CMD ["sh", "-c", "streamlit run agtech_ops/dashboard.py --server.port ${PORT:-8501} --server.address 0.0.0.0 --server.headless true --browser.gatherUsageStats false"] diff --git a/agtech-ops/README.md b/agtech-ops/README.md new file mode 100644 index 0000000000..fc4262e5d8 --- /dev/null +++ b/agtech-ops/README.md @@ -0,0 +1,140 @@ +# AgTech Ops Hub + +A contextualized operations hub for farming businesses. It ingests data from +several messy real-world sources, resolves them onto shared farm entities, and +turns the combined picture into **summaries and action items** for ops teams. + +``` +INGEST → NORMALIZE + STORE → CONTEXTUALIZE + SUMMARIZE → DELIVER to ops +``` + +This is **Sprint 1**: a working vertical slice that proves the full loop +end-to-end with zero external services or API keys required. + +## What works today (Sprint 1) + +- **Ingest a range of files** — one intake that dispatches on file type: + - **Tabular** (carry their own farm/asset columns): `.csv`, `.tsv`, + `.xlsx`/`.xls`, `.json`. Flexible/aliased column names, per-row error + reporting (one bad row never aborts the file), extra columns preserved. + - **Free text** (converted into data): `.txt`, `.md`, `.log`, `.pdf`, + `.docx`. Each paragraph becomes an event; an optional leading date is used + as its timestamp; the asset is inferred from known asset names. + - **WhatsApp** chat exports are auto-detected (both `[date, time] Name:` and + `date, time - Name:` formats, multi-line messages supported). +- **Video/clip metadata & tags** — clip metadata (from an upstream vision model, + e.g. YOLO on a Jetson) with `tags`, `duration`, `camera` columns is + auto-detected and ingested as `media` events. Tags become **workflow signals** + and feed action generation (a clip tagged `lame` raises a vet task). +- **Compile & aggregate** — a cross-source roll-up: totals, per-source and + per-asset counts, **top tags**, clip counts, and numeric metric **time series** + assembled from every file (e.g. milk yield from a CSV + a JSON partner feed). +- **AI action-item log agent** — a "Haiku agent" turns incoming bridge data + (messages, files, clip tags) into an append-only, prioritized action-item log, + each entry carrying a **rationale** and the agent that produced it. Uses Claude + Haiku via LiteLLM when a key is set, with a deterministic offline fallback. +- **Contextualize** — every record is resolved onto a canonical model so data + from different sources lines up on the same thing: + + ``` + Farm --< Asset (herd | crop | field) --< Event >-- ActionItem + ``` + +- **Summarize** — a batch of events becomes a `{summary, points, action_items}` + result. Two interchangeable backends: + - `rule_based` (default): deterministic, offline, keyword-triggered action + items with priority, owner (from the message author) and a suggested due + date. No keys needed. + - `llm` (optional): LiteLLM + instructor for structured output from any model, + used automatically when the `ai` extra is installed **and** an API key is set. +- **Deliver** + - **FastAPI** JSON API (`/ingest/csv`, `/ingest/whatsapp`, `/summarize`, + `/action-items`, `/health`). + - **Streamlit** dashboard for ingesting data and reviewing open action items. + +## Quick start + +```bash +cd agtech-ops +python3 -m venv .venv && . .venv/bin/activate # or use your environment +pip install -e ".[dev,files]" # core + tests + Excel/PDF/Word support +# optional: pip install -e ".[ai,dashboard]" + +# Run the API +uvicorn agtech_ops.api:app --reload + +# Ingest a range of files at once (incl. video/clip tag metadata) +curl -F "files=@sample_data/herd.csv" \ + -F "files=@sample_data/partner_feed.json" \ + -F "files=@sample_data/field_notes.txt" \ + -F "files=@sample_data/clips.json" \ + -F "farm=Green Acres" \ + http://localhost:8000/ingest/files + +# Compile / aggregate everything ingested (incl. top tags + clip counts) +curl http://localhost:8000/report + +# Run the action-item-log agent, then read the log +curl -X POST "http://localhost:8000/agent/run" +curl http://localhost:8000/agent/log + +# Or the dashboard (needs the 'dashboard' extra) +streamlit run agtech_ops/dashboard.py +``` + +## Tests + +```bash +pytest # 15 tests, fully offline (forces the rule-based summarizer) +``` + +## Configuration + +All optional; sensible defaults mean it runs with nothing set. + +| Env var | Default | Purpose | +|---|---|---| +| `AGTECH_DATABASE_URL` | `sqlite:///agtech_ops.db` | Any SQLAlchemy URL (e.g. Postgres). | +| `AGTECH_LLM_MODEL` | `anthropic/claude-3-5-haiku-latest` | LiteLLM model id for the agent. | +| `AGTECH_FORCE_RULE_BASED` | `false` | Force the offline agent. | +| `ANTHROPIC_API_KEY` / `OPENAI_API_KEY` / … | – | Enables the Haiku/LLM agent. | + +## Architecture (mapped to the awesome-python catalog) + +| Layer | Library (from this repo's list) | +|---|---| +| API / webhooks | FastAPI, uvicorn | +| Tabular parsing | pandas | +| Validation | pydantic | +| Storage / entity resolution | SQLAlchemy | +| Summaries (AI) | LiteLLM, instructor | +| Dashboard | Streamlit, Plotly | +| Scheduling (future) | APScheduler / Prefect / Dagster | + +## Roadmap + +- **Sprint 1 (done):** CSV + WhatsApp ingest → contextualized action items via + API + dashboard, with offline + LLM summarizers. +- **Sprint 2:** Live **Dropbox** sync and **WhatsApp Business API** webhook; + scheduled pulls (APScheduler); push action items back to chat/email. +- **Sprint 3:** Smarter entity resolution (fuzzy asset matching, aliases), + per-farm dashboards, trend charts, action-item status workflow. + +## Open questions — where I need more detail + +These are the decisions that will most shape Sprints 2–3: + +1. **Partner CSV schemas.** What columns do your real partners send? Are they + stable, or do we need per-partner mappings? Right now I infer common aliases. +2. **Dropbox layout.** Folder structure and file naming for herd/crop/operations + data, and whether to use a service account or per-user OAuth. +3. **WhatsApp source.** Live (WhatsApp Business API / Twilio / Meta Cloud API) + or periodic chat-export uploads? Live changes the auth + webhook design. +4. **Entity naming.** How are farms/herds/fields named across sources so we can + match them reliably? Is there a master list/IDs we should sync from? +5. **Action item destination.** Where do ops want items delivered — dashboard + only, back into WhatsApp, email, or an existing task tool? +6. **AI provider + data policy.** Which model/provider is acceptable, and any + constraints on sending farm/staff messages to a third-party LLM. +7. **Deployment + scale.** Single farm vs. multi-tenant ("agrefine network"?), + expected data volume, and where this should run. diff --git a/agtech-ops/agtech_ops/__init__.py b/agtech-ops/agtech_ops/__init__.py new file mode 100644 index 0000000000..b20bc19f3d --- /dev/null +++ b/agtech-ops/agtech_ops/__init__.py @@ -0,0 +1,3 @@ +"""AgTech Ops Hub: contextualize multi-source farm data into action items.""" + +__version__ = "0.1.0" diff --git a/agtech-ops/agtech_ops/agent.py b/agtech-ops/agtech_ops/agent.py new file mode 100644 index 0000000000..b091360912 --- /dev/null +++ b/agtech-ops/agtech_ops/agent.py @@ -0,0 +1,46 @@ +"""Action-item-log agent. + +This is the "Haiku agent" entry point: as data flows in from the bridge +(WhatsApp, Dropbox, partner files, video/clip tags), it distills an append-only +**action-item log** with provenance — what to do, who owns it, why it was +raised, and which agent produced it. + +It delegates to the configured summarizer backend: + - ``llm`` — Claude Haiku (or any LiteLLM model) when the AI extra + an API + key are present. Cheap, fast, good at turning chatter + tags into tasks. + - ``rule_based`` — deterministic offline fallback, so the log always builds. + +Each run appends to the ``action_items`` table (the log); existing entries are +preserved so the log is a running history, not a snapshot. +""" + +from __future__ import annotations + +import datetime as dt + +from .schemas import SummaryResult +from .service import list_action_items, summarize_and_store +from .summarize import get_summarizer + + +def agent_name() -> str: + """Human-readable name of the active agent backend.""" + return get_summarizer().name + + +def build_action_log( + farm: str | None = None, + since_days: int | None = None, +) -> SummaryResult: + """Process incoming data and append to the action-item log.""" + since = None + if since_days is not None: + since = dt.datetime.now() - dt.timedelta(days=since_days) + return summarize_and_store(farm=farm, since=since, persist=True) + + +def action_log(limit: int | None = None) -> list[dict]: + """Return the current open action-item log (newest first).""" + items = list_action_items() + items.sort(key=lambda a: (a.get("logged_at") or ""), reverse=True) + return items[:limit] if limit else items diff --git a/agtech-ops/agtech_ops/api.py b/agtech-ops/agtech_ops/api.py new file mode 100644 index 0000000000..a602f25adb --- /dev/null +++ b/agtech-ops/agtech_ops/api.py @@ -0,0 +1,137 @@ +"""FastAPI surface for the AgTech Ops Hub.""" + +from __future__ import annotations + +import datetime as dt +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +from fastapi import FastAPI, File, Form, HTTPException, Query, UploadFile + +from .agent import action_log, agent_name, build_action_log +from .db import init_db +from .ingest import SUPPORTED_EXTENSIONS, parse_partner_csv, parse_whatsapp_export +from .models import ActionStatus +from .schemas import AggregateReport, FileIngestResult, IngestResult, SummaryResult +from .service import ( + aggregate, + ingest_files, + known_asset_names, + list_action_items, + store_events, + summarize_and_store, +) +from .summarize import get_summarizer + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + init_db() + yield + + +app = FastAPI( + title="AgTech Ops Hub", + version="0.1.0", + description=( + "Ingest partner CSVs, Dropbox exports and WhatsApp chatter, contextualize " + "them onto shared farm assets, and turn them into action items for ops teams." + ), + lifespan=lifespan, +) + + +@app.get("/health") +def health() -> dict: + return { + "status": "ok", + "summarizer": get_summarizer().name, + "supported_files": sorted(SUPPORTED_EXTENSIONS), + } + + +@app.post("/ingest/files", response_model=FileIngestResult) +async def ingest_files_endpoint( + files: list[UploadFile] = File(...), + farm: str | None = Form(None), + default_asset: str = Form("General"), +) -> FileIngestResult: + """Ingest a range of files (CSV/TSV/Excel/JSON/TXT/MD/LOG/PDF/DOCX). + + Tabular files carry their own farm/asset columns; free-text files use the + optional ``farm`` for context resolution. + """ + payload = [(f.filename or "upload", await f.read()) for f in files] + result = ingest_files(payload, farm=farm, default_asset=default_asset) + if result.events_ingested == 0 and result.errors: + raise HTTPException(status_code=422, detail=result.errors) + return result + + +@app.get("/report", response_model=AggregateReport) +def report(farm: str | None = Query(None)) -> AggregateReport: + """Compiled, aggregated view across all ingested sources.""" + return aggregate(farm=farm) + + +@app.post("/ingest/csv", response_model=IngestResult) +async def ingest_csv(file: UploadFile = File(...)) -> IngestResult: + raw = await file.read() + events, errors = parse_partner_csv(raw) + if not events and errors: + raise HTTPException(status_code=422, detail=errors) + return store_events(events, errors) + + +@app.post("/ingest/whatsapp", response_model=IngestResult) +async def ingest_whatsapp( + text: str = Form(...), + farm: str = Form(...), + default_asset: str = Form("General"), +) -> IngestResult: + events, errors = parse_whatsapp_export( + text, + farm=farm, + known_assets=known_asset_names(farm), + default_asset=default_asset, + ) + if not events and errors: + raise HTTPException(status_code=422, detail=errors) + return store_events(events, errors) + + +@app.post("/summarize", response_model=SummaryResult) +def summarize( + farm: str | None = Query(None), + since_days: int | None = Query(None, ge=0), + persist: bool = Query(True), +) -> SummaryResult: + since = None + if since_days is not None: + since = dt.datetime.now() - dt.timedelta(days=since_days) + return summarize_and_store(farm=farm, since=since, persist=persist) + + +@app.get("/action-items") +def action_items(status: ActionStatus | None = Query(ActionStatus.open)) -> list[dict]: + return list_action_items(status=status) + + +@app.get("/agent") +def agent_info() -> dict: + """Which action-item-log agent is active (haiku-agent or rule_based).""" + return {"agent": agent_name()} + + +@app.post("/agent/run", response_model=SummaryResult) +def agent_run( + farm: str | None = Query(None), + since_days: int | None = Query(None, ge=0), +) -> SummaryResult: + """Run the agent over incoming data and append to the action-item log.""" + return build_action_log(farm=farm, since_days=since_days) + + +@app.get("/agent/log") +def agent_log(limit: int | None = Query(None, ge=1)) -> list[dict]: + """Read the current action-item log (newest first).""" + return action_log(limit=limit) diff --git a/agtech-ops/agtech_ops/config.py b/agtech-ops/agtech_ops/config.py new file mode 100644 index 0000000000..c2494c03ce --- /dev/null +++ b/agtech-ops/agtech_ops/config.py @@ -0,0 +1,38 @@ +"""Runtime configuration, read from environment variables. + +Everything has a sensible default so the app runs end-to-end with zero setup. +Secrets (LLM keys, Dropbox/WhatsApp tokens) are optional and only needed for +the live integrations. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass + + +def _env_bool(name: str, default: bool) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +@dataclass(frozen=True) +class Settings: + # SQLAlchemy URL. Defaults to a local SQLite file so there is nothing to + # provision for a first run. + database_url: str = os.getenv("AGTECH_DATABASE_URL", "sqlite:///agtech_ops.db") + + # LLM model string understood by LiteLLM. Defaults to a small, cheap model + # (Claude Haiku) for the action-item-log agent. Only used when AI extras + + # a key exist; otherwise the deterministic rule-based agent runs. + llm_model: str = os.getenv("AGTECH_LLM_MODEL", "anthropic/claude-3-5-haiku-latest") + + # Force the deterministic summarizer even if AI deps/keys are available. + # Handy for tests and offline demos. + force_rule_based: bool = _env_bool("AGTECH_FORCE_RULE_BASED", False) + + +def get_settings() -> Settings: + return Settings() diff --git a/agtech-ops/agtech_ops/dashboard.py b/agtech-ops/agtech_ops/dashboard.py new file mode 100644 index 0000000000..060d7824c4 --- /dev/null +++ b/agtech-ops/agtech_ops/dashboard.py @@ -0,0 +1,150 @@ +"""Operator-facing Streamlit dashboard. + +Run with: streamlit run agtech_ops/dashboard.py +Requires the 'dashboard' extra (streamlit, plotly). +""" + +from __future__ import annotations + +import pandas as pd +import streamlit as st + +# Absolute imports so the file works when launched directly via +# `streamlit run agtech_ops/dashboard.py` (Streamlit runs it as a script, not +# as part of the package, so relative imports would fail). +from agtech_ops.agent import action_log, agent_name, build_action_log +from agtech_ops.db import init_db +from agtech_ops.ingest import SUPPORTED_EXTENSIONS, parse_whatsapp_export +from agtech_ops.service import ( + aggregate, + ingest_files, + known_asset_names, + store_events, +) + + +def main() -> None: + init_db() + st.set_page_config(page_title="AgTech Ops Hub", layout="wide") + st.title("AgTech Ops Hub") + st.caption( + f"Compile & aggregate multi-source farm data · action-item agent: " + f"**{agent_name()}**" + ) + + farm_default = "Green Acres" + exts = ", ".join(sorted(e.lstrip(".") for e in SUPPORTED_EXTENSIONS)) + + with st.sidebar: + st.header("Intake") + farm = st.text_input("Farm (for text documents)", value=farm_default) + st.caption(f"Accepted: {exts}") + uploads = st.file_uploader( + "Drop a range of files", + type=[e.lstrip(".") for e in SUPPORTED_EXTENSIONS], + accept_multiple_files=True, + ) + if uploads and st.button("Ingest files", type="primary"): + payload = [(u.name, u.getvalue()) for u in uploads] + res = ingest_files(payload, farm=farm) + st.success( + f"Ingested {res.events_ingested} records from " + f"{res.files_processed} file(s)." + ) + with st.expander("Per-file detail"): + st.dataframe(res.per_file, use_container_width=True) + if res.errors: + st.warning("\n".join(res.errors[:20])) + + st.divider() + wa_text = st.text_area("Or paste WhatsApp / notes text") + if wa_text and st.button("Ingest pasted text"): + events, errors = parse_whatsapp_export( + wa_text, farm=farm, known_assets=known_asset_names(farm) + ) + res = store_events(events, errors) + st.success(f"Ingested {res.events_ingested} messages.") + if res.errors: + st.warning("\n".join(res.errors)) + + report = aggregate() + + # --- Compiled overview --- + st.subheader("Compiled overview") + c1, c2, c3, c4, c5 = st.columns(5) + c1.metric("Events", report.total_events) + c2.metric("Assets", report.total_assets) + c3.metric("Clips", report.media_clips) + c4.metric("Farms", report.total_farms) + c5.metric("Open actions", report.open_action_items) + + if report.by_source: + src_df = pd.DataFrame( + {"source": list(report.by_source), "events": list(report.by_source.values())} + ) + st.bar_chart(src_df.set_index("source")) + + # --- Media metadata & tags (workflow signals) --- + if report.top_tags: + st.subheader("Media metadata & tags (workflow signals)") + st.caption( + "Tags extracted from video/camera clips — the most frequent tags " + "indicate where the workflow needs attention." + ) + tdf = pd.DataFrame( + [{"tag": t.tag, "count": t.count} for t in report.top_tags] + ).set_index("tag") + st.bar_chart(tdf) + + st.divider() + st.subheader(f"Action-item log · built by {agent_name()}") + st.caption( + "An AI agent (Claude Haiku when a key is set, deterministic rules " + "otherwise) turns incoming bridge data into a logged, prioritized " + "action list with a rationale for each item." + ) + if st.button("Run agent on incoming data", type="primary"): + result = build_action_log() + st.write(result.summary) + for p in result.points: + st.markdown(f"- {p}") + + log = action_log() + if log: + st.dataframe( + [ + { + "priority": a["priority"], + "task": a["task"], + "owner": a["owner"], + "due": a["due"], + "rationale": a["rationale"], + "by": a["created_by"], + "logged": a["logged_at"], + } + for a in log + ], + use_container_width=True, + ) + else: + st.info("Log is empty. Ingest data, then run the agent.") + + # --- Aggregated metric trends (native charts; one per metric) --- + if report.metric_series: + st.divider() + st.subheader("Aggregated metrics over time") + for metric in sorted(report.metric_series): + pts = report.metric_series[metric] + mdf = pd.DataFrame( + [{"date": p.occurred_at, "value": p.value, "asset": p.asset} for p in pts] + ) + # Pivot so each asset is its own line; native chart avoids heavy deps. + wide = mdf.pivot_table( + index="date", columns="asset", values="value", aggfunc="mean" + ) + st.caption(metric) + st.line_chart(wide) + + +if __name__ == "__main__": + main() diff --git a/agtech-ops/agtech_ops/db.py b/agtech-ops/agtech_ops/db.py new file mode 100644 index 0000000000..030ea700a8 --- /dev/null +++ b/agtech-ops/agtech_ops/db.py @@ -0,0 +1,84 @@ +"""Database engine/session helpers and entity-resolution upserts.""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager + +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +from .config import get_settings +from .models import Asset, AssetType, Base, Farm + +_engine = None +_SessionLocal: sessionmaker[Session] | None = None + + +def get_engine(): + global _engine + if _engine is None: + url = get_settings().database_url + connect_args = {"check_same_thread": False} if url.startswith("sqlite") else {} + _engine = create_engine(url, connect_args=connect_args, future=True) + return _engine + + +def init_db() -> None: + Base.metadata.create_all(get_engine()) + + +def get_sessionmaker() -> sessionmaker[Session]: + global _SessionLocal + if _SessionLocal is None: + _SessionLocal = sessionmaker( + bind=get_engine(), expire_on_commit=False, future=True + ) + return _SessionLocal + + +@contextmanager +def session_scope() -> Iterator[Session]: + session = get_sessionmaker()() + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +def get_or_create_farm(session: Session, name: str) -> Farm: + name = name.strip() + farm = session.query(Farm).filter(Farm.name == name).one_or_none() + if farm is None: + farm = Farm(name=name) + session.add(farm) + session.flush() + return farm + + +def get_or_create_asset( + session: Session, + farm: Farm, + name: str, + asset_type: AssetType = AssetType.other, +) -> Asset: + """Resolve an asset by (farm, name), the core of cross-source context.""" + name = name.strip() + asset = ( + session.query(Asset) + .filter(Asset.farm_id == farm.id, Asset.name == name) + .one_or_none() + ) + if asset is None: + asset = Asset(farm_id=farm.id, name=name, type=asset_type) + session.add(asset) + session.flush() + elif asset_type is not AssetType.other and asset.type is AssetType.other: + # Upgrade a placeholder type once we learn what the asset really is. + asset.type = asset_type + session.flush() + return asset diff --git a/agtech-ops/agtech_ops/ingest/__init__.py b/agtech-ops/agtech_ops/ingest/__init__.py new file mode 100644 index 0000000000..1ec7a62ff7 --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/__init__.py @@ -0,0 +1,18 @@ +"""Ingestors that turn raw source data into validated ``EventIn`` records.""" + +from .csv_ingest import dataframe_to_events, parse_partner_csv +from .registry import SUPPORTED_EXTENSIONS, ingest_file +from .tabular_ingest import parse_excel, parse_json_records +from .text_ingest import parse_text_document +from .whatsapp_ingest import parse_whatsapp_export + +__all__ = [ + "parse_partner_csv", + "dataframe_to_events", + "parse_excel", + "parse_json_records", + "parse_whatsapp_export", + "parse_text_document", + "ingest_file", + "SUPPORTED_EXTENSIONS", +] diff --git a/agtech-ops/agtech_ops/ingest/csv_ingest.py b/agtech-ops/agtech_ops/ingest/csv_ingest.py new file mode 100644 index 0000000000..b9dd546bae --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/csv_ingest.py @@ -0,0 +1,229 @@ +"""Parse partner / Dropbox CSV exports into normalized events. + +Expected (case-insensitive, flexible) columns: + + farm, asset, date, [asset_type], [category], [metric], [value], [notes] + +Partners are messy, so column names are normalized and unknown extras are kept +in ``raw``. Rows that cannot be salvaged are reported as errors rather than +aborting the whole file. +""" + +from __future__ import annotations + +import datetime as dt +import io +import json +import re + +import pandas as pd + +from ..models import AssetType, Source +from ..schemas import EventIn + +# Map common partner header variants onto our canonical names. +_COLUMN_ALIASES = { + "farm": "farm", + "farm_name": "farm", + "site": "farm", + "asset": "asset", + "herd": "asset", + "crop": "asset", + "field": "asset", + "asset_name": "asset", + "paddock": "asset", + "asset_type": "asset_type", + "type": "asset_type", + "date": "date", + "timestamp": "date", + "datetime": "date", + "recorded_at": "date", + "category": "category", + "event": "category", + "metric": "metric", + "measure": "metric", + "value": "value", + "reading": "value", + "amount": "value", + "notes": "notes", + "note": "notes", + "comment": "notes", + "comments": "notes", + "description": "notes", + # Video / clip metadata (tags drive workflow). + "tags": "tags", + "tag": "tags", + "labels": "tags", + "detections": "tags", + "duration": "duration_s", + "duration_s": "duration_s", + "length": "duration_s", + "camera": "camera", + "cam": "camera", + "clip": "clip", + "clip_id": "clip", + "video": "clip", + "filename": "clip", +} + +_REQUIRED = {"farm", "asset", "date"} +# Presence of any of these implies the rows are video/clip metadata. +_MEDIA_COLUMNS = {"tags", "duration_s", "camera", "clip"} + + +def _split_tags(raw: object) -> list[str]: + if raw is None or (isinstance(raw, float) and pd.isna(raw)): + return [] + if isinstance(raw, (list, tuple)): + items = raw + else: + items = re.split(r"[;,|]", str(raw)) + return [t.strip() for t in items if str(t).strip()] + + +def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: + renamed = {} + for col in df.columns: + key = str(col).strip().lower().replace(" ", "_") + renamed[col] = _COLUMN_ALIASES.get(key, key) + return df.rename(columns=renamed) + + +def _coerce_asset_type(raw: object) -> AssetType: + if raw is None or (isinstance(raw, float) and pd.isna(raw)): + return AssetType.other + try: + return AssetType(str(raw).strip().lower()) + except ValueError: + return AssetType.other + + +def parse_partner_csv( + data: str | bytes, + *, + source: Source = Source.csv_partner, + sep: str | None = None, +) -> tuple[list[EventIn], list[str]]: + """Return ``(events, errors)`` parsed from CSV/TSV ``data``.""" + + if isinstance(data, bytes): + buffer: io.StringIO | io.BytesIO = io.BytesIO(data) + else: + buffer = io.StringIO(data) + + try: + read_kwargs = {"sep": sep} if sep is not None else {} + df = pd.read_csv(buffer, **read_kwargs) + except Exception as exc: # noqa: BLE001 - surfaced to caller + return [], [f"could not read CSV: {exc}"] + + return dataframe_to_events(df, source=source) + + +def dataframe_to_events( + df: pd.DataFrame, + *, + source: Source = Source.csv_partner, +) -> tuple[list[EventIn], list[str]]: + """Convert an already-loaded tabular frame into normalized events. + + Shared by the CSV, Excel and JSON ingestors so column aliasing and row + coercion behave identically regardless of the original file format. + """ + + errors: list[str] = [] + if df is None or df.empty: + return [], ["no rows found"] + + df = _normalize_columns(df) + + missing = _REQUIRED - set(df.columns) + if missing: + return [], [f"missing required column(s): {', '.join(sorted(missing))}"] + + known = { + "farm", "asset", "asset_type", "date", "category", "metric", "value", + "notes", "tags", "duration_s", "camera", "clip", + } + extra_cols = [c for c in df.columns if c not in known] + + # Clip-metadata tables (tags/duration/camera/clip) are auto-tagged as media + # unless the caller explicitly asked for a different source. + is_media = bool(_MEDIA_COLUMNS & set(df.columns)) + if is_media and source is Source.csv_partner: + source = Source.media + + events: list[EventIn] = [] + for idx, row in df.iterrows(): + rownum = int(idx) + 2 # +1 for header, +1 for 1-based humans + try: + occurred_at = pd.to_datetime(row["date"], errors="coerce") + if pd.isna(occurred_at): + errors.append(f"row {rownum}: unparseable date {row['date']!r}") + continue + + value = row.get("value") + if value is not None and not pd.isna(value): + try: + value = float(value) + except (TypeError, ValueError): + value = None + else: + value = None + + tags = _split_tags(row.get("tags")) + + # Clip duration becomes a numeric metric so it aggregates like data. + metric = _clean(row.get("metric")) + duration = row.get("duration_s") + if metric is None and duration is not None and not pd.isna(duration): + try: + value = float(duration) + metric = "clip_duration_s" + except (TypeError, ValueError): + pass + + text = _clean(row.get("notes")) + if text is None and tags: + text = "Clip tagged: " + ", ".join(tags) + + raw_extra = {c: _jsonable(row.get(c)) for c in extra_cols} + + events.append( + EventIn( + farm=str(row["farm"]), + asset=str(row["asset"]), + asset_type=_coerce_asset_type(row.get("asset_type")), + source=source, + occurred_at=occurred_at.to_pydatetime() + if hasattr(occurred_at, "to_pydatetime") + else dt.datetime.fromisoformat(str(occurred_at)), + category=_clean(row.get("category")) + or ("media" if is_media else None), + metric=metric, + value=value, + author=_clean(row.get("camera")), + text=text, + tags=tags, + raw=json.dumps(raw_extra) if raw_extra else None, + ) + ) + except Exception as exc: # noqa: BLE001 - per-row resilience + errors.append(f"row {rownum}: {exc}") + + return events, errors + + +def _clean(v: object) -> str | None: + if v is None or (isinstance(v, float) and pd.isna(v)): + return None + s = str(v).strip() + return s or None + + +def _jsonable(v: object): + if v is None or (isinstance(v, float) and pd.isna(v)): + return None + if isinstance(v, (int, float, str, bool)): + return v + return str(v) diff --git a/agtech-ops/agtech_ops/ingest/registry.py b/agtech-ops/agtech_ops/ingest/registry.py new file mode 100644 index 0000000000..549cff269f --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/registry.py @@ -0,0 +1,96 @@ +"""Single entry point that ingests *any* supported file by dispatching on type. + +Supported today: + - Tabular: .csv, .tsv, .xlsx, .xls, .json + - Free text: .txt, .md, .log, .pdf, .docx +WhatsApp exports (.txt) are auto-detected and routed to the chat parser. + +Tabular files carry their own farm/asset columns. Free-text files do not, so a +``farm`` (and optional known asset list) is supplied for context resolution. +""" + +from __future__ import annotations + +import os + +from ..models import Source +from ..schemas import EventIn +from .csv_ingest import parse_partner_csv +from .tabular_ingest import parse_excel, parse_json_records +from .text_ingest import ( + extract_text_from_docx, + extract_text_from_pdf, + parse_text_document, +) +from .whatsapp_ingest import _LINE_RE, parse_whatsapp_export + +SUPPORTED_EXTENSIONS = { + ".csv", ".tsv", ".xlsx", ".xls", ".json", + ".txt", ".md", ".log", ".pdf", ".docx", +} + + +def _looks_like_whatsapp(text: str) -> bool: + lines = [ln for ln in text.splitlines() if ln.strip()][:30] + if not lines: + return False + hits = sum(1 for ln in lines if _LINE_RE.match(ln.strip())) + return hits >= max(2, len(lines) // 3) + + +def ingest_file( + filename: str, + data: bytes, + *, + farm: str | None = None, + known_assets: list[str] | None = None, + default_asset: str = "General", +) -> tuple[list[EventIn], list[str]]: + """Return ``(events, errors)`` for a single uploaded file.""" + ext = os.path.splitext(filename)[1].lower() + known_assets = known_assets or [] + + # --- Tabular (self-describing: farm/asset in columns) --- + if ext == ".csv": + return parse_partner_csv(data) + if ext == ".tsv": + return parse_partner_csv(data, sep="\t") + if ext in {".xlsx", ".xls"}: + return parse_excel(data) + if ext == ".json": + return parse_json_records(data) + + # --- Free text (needs a farm for context) --- + if ext in {".txt", ".md", ".log", ".pdf", ".docx"}: + if not farm: + return [], [f"{filename}: a farm name is required for text documents"] + + if ext == ".pdf": + text, errors = extract_text_from_pdf(data) + elif ext == ".docx": + text, errors = extract_text_from_docx(data) + else: + errors = [] + try: + text = data.decode("utf-8", errors="replace") + except Exception as exc: # noqa: BLE001 + return [], [f"{filename}: could not decode text ({exc})"] + + if errors: + return [], [f"{filename}: {e}" for e in errors] + + if ext in {".txt", ".log"} and _looks_like_whatsapp(text): + return parse_whatsapp_export( + text, farm=farm, known_assets=known_assets, default_asset=default_asset + ) + + return parse_text_document( + text, + farm=farm, + known_assets=known_assets, + default_asset=default_asset, + source=Source.manual, + doc_name=filename, + ) + + return [], [f"{filename}: unsupported file type '{ext}'"] diff --git a/agtech-ops/agtech_ops/ingest/tabular_ingest.py b/agtech-ops/agtech_ops/ingest/tabular_ingest.py new file mode 100644 index 0000000000..347edd2eee --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/tabular_ingest.py @@ -0,0 +1,68 @@ +"""Excel and JSON ingestion, reusing the shared tabular -> events core.""" + +from __future__ import annotations + +import io +import json as jsonlib + +import pandas as pd + +from ..models import Source +from ..schemas import EventIn +from .csv_ingest import dataframe_to_events + + +def parse_excel( + data: bytes, + *, + source: Source = Source.csv_partner, +) -> tuple[list[EventIn], list[str]]: + """Parse every sheet of an .xlsx/.xls workbook into events.""" + try: + sheets = pd.read_excel(io.BytesIO(data), sheet_name=None) + except ImportError: + return [], [ + "Excel support requires the 'files' extra (pip install -e '.[files]')." + ] + except Exception as exc: # noqa: BLE001 + return [], [f"could not read Excel file: {exc}"] + + all_events: list[EventIn] = [] + all_errors: list[str] = [] + for sheet_name, df in sheets.items(): + events, errors = dataframe_to_events(df, source=source) + all_events.extend(events) + all_errors.extend(f"[sheet {sheet_name}] {e}" for e in errors) + if not all_events and not all_errors: + all_errors.append("workbook contained no rows") + return all_events, all_errors + + +def parse_json_records( + data: str | bytes, + *, + source: Source = Source.csv_partner, +) -> tuple[list[EventIn], list[str]]: + """Parse a JSON array (or {records|data|items: [...]}) of record objects.""" + try: + payload = jsonlib.loads(data) + except Exception as exc: # noqa: BLE001 + return [], [f"could not parse JSON: {exc}"] + + if isinstance(payload, dict): + for key in ("records", "data", "items", "rows"): + if isinstance(payload.get(key), list): + payload = payload[key] + break + else: + payload = [payload] + + if not isinstance(payload, list) or not payload: + return [], ["JSON did not contain a list of records"] + + try: + df = pd.DataFrame(payload) + except Exception as exc: # noqa: BLE001 + return [], [f"could not tabulate JSON records: {exc}"] + + return dataframe_to_events(df, source=source) diff --git a/agtech-ops/agtech_ops/ingest/text_ingest.py b/agtech-ops/agtech_ops/ingest/text_ingest.py new file mode 100644 index 0000000000..7b7f44f0e8 --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/text_ingest.py @@ -0,0 +1,119 @@ +"""Free-text document ingestion: plain text, Markdown, PDF, and Word. + +These formats have no tabular structure, so each meaningful chunk (a paragraph) +becomes a text ``Event`` that the summarizer can mine for action items. An +optional leading date in a chunk is used as its timestamp; otherwise the +ingestion time is used. The asset is inferred from known asset names, falling +back to a default so nothing is dropped. +""" + +from __future__ import annotations + +import datetime as dt +import re + +from ..models import Source +from ..schemas import EventIn + +# Matches a date at the very start of a chunk, e.g. "2026-06-26", "26/06/2026". +_LEADING_DATE = re.compile( + r"^\s*(\d{1,4}[-/.]\d{1,2}[-/.]\d{1,4})\b[\s:,-]*" +) +_DATE_FORMATS = ( + "%Y-%m-%d", "%d-%m-%Y", "%m-%d-%Y", + "%Y/%m/%d", "%d/%m/%Y", "%m/%d/%Y", + "%d.%m.%Y", "%Y.%m.%d", +) + + +def _parse_leading_date(chunk: str) -> tuple[dt.datetime | None, str]: + m = _LEADING_DATE.match(chunk) + if not m: + return None, chunk + raw = m.group(1) + for fmt in _DATE_FORMATS: + try: + return dt.datetime.strptime(raw, fmt), chunk[m.end():] + except ValueError: + continue + return None, chunk + + +def _infer_asset(text: str, known_assets: list[str]) -> str | None: + low = text.lower() + for asset in sorted(known_assets, key=len, reverse=True): + if asset.lower() in low: + return asset + return None + + +def _split_chunks(text: str) -> list[str]: + # Prefer blank-line-separated paragraphs; fall back to non-empty lines. + paras = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()] + if len(paras) <= 1: + paras = [ln.strip() for ln in text.splitlines() if ln.strip()] + return paras + + +def parse_text_document( + text: str, + *, + farm: str, + known_assets: list[str] | None = None, + default_asset: str = "General", + source: Source = Source.manual, + doc_name: str | None = None, +) -> tuple[list[EventIn], list[str]]: + known_assets = known_assets or [] + now = dt.datetime.now() + events: list[EventIn] = [] + + for chunk in _split_chunks(text): + when, body = _parse_leading_date(chunk) + body = body.strip() or chunk + events.append( + EventIn( + farm=farm, + asset=_infer_asset(body, known_assets) or default_asset, + source=source, + occurred_at=when or now, + category="document", + author=doc_name, + text=body, + raw=doc_name, + ) + ) + + if not events: + return [], ["document contained no readable text"] + return events, [] + + +def extract_text_from_pdf(data: bytes) -> tuple[str, list[str]]: + try: + from pypdf import PdfReader + except ImportError: + return "", ["PDF support requires the 'files' extra (pip install -e '.[files]')."] + import io + + try: + reader = PdfReader(io.BytesIO(data)) + pages = [page.extract_text() or "" for page in reader.pages] + except Exception as exc: # noqa: BLE001 + return "", [f"could not read PDF: {exc}"] + return "\n\n".join(pages), [] + + +def extract_text_from_docx(data: bytes) -> tuple[str, list[str]]: + try: + import docx # python-docx + except ImportError: + return "", ["DOCX support requires the 'files' extra (pip install -e '.[files]')."] + import io + + try: + document = docx.Document(io.BytesIO(data)) + paras = [p.text for p in document.paragraphs if p.text.strip()] + except Exception as exc: # noqa: BLE001 + return "", [f"could not read DOCX: {exc}"] + return "\n\n".join(paras), [] diff --git a/agtech-ops/agtech_ops/ingest/whatsapp_ingest.py b/agtech-ops/agtech_ops/ingest/whatsapp_ingest.py new file mode 100644 index 0000000000..8575e9699a --- /dev/null +++ b/agtech-ops/agtech_ops/ingest/whatsapp_ingest.py @@ -0,0 +1,121 @@ +"""Parse a WhatsApp chat export into normalized events. + +WhatsApp "Export chat" produces lines like:: + + [2026-06-28, 8:56:01 PM] Alice: North Herd cow #42 looks lame, call vet + 2026/06/28, 20:56 - Bob: Ordered more feed for Field 3 + +Both bracketed and dash formats are supported. Multi-line messages (a line +without a new timestamp header) are appended to the previous message. + +Because a chat does not name the farm/asset in a structured way, the caller +supplies a default ``farm``; the asset is inferred from the message text when a +known asset name is provided, otherwise it falls back to a "General" asset so +nothing is dropped. +""" + +from __future__ import annotations + +import datetime as dt +import re + +from ..models import Source +from ..schemas import EventIn + +# [2026-06-28, 8:56:01 PM] Name: message OR +# 2026/06/28, 20:56 - Name: message +_LINE_RE = re.compile( + r"^\[?\s*" + r"(?P\d{1,4}[-/.]\d{1,2}[-/.]\d{1,4})" + r"[,]?\s+" + r"(?P