Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
902 changes: 862 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ lang-rust = ["dep:tree-sitter-rust"]
lang-python = ["dep:tree-sitter-python"]
lang-js-ts = ["dep:tree-sitter-typescript"]
lang-go = ["dep:tree-sitter-go"]
greptimedb = ["dep:greptimedb-ingester"]

[dependencies]
clap = { version = "4", features = ["derive"] }
Expand Down Expand Up @@ -82,6 +83,7 @@ devbase-registry-relation = { path = "crates/devbase-registry-relation" }
devbase-registry-call-graph = { path = "crates/devbase-registry-call-graph" }
devbase-registry-dead-code = { path = "crates/devbase-registry-dead-code" }
devbase-registry-code-symbols = { path = "crates/devbase-registry-code-symbols" }
greptimedb-ingester = { version = "0.18", optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
Expand Down
75 changes: 75 additions & 0 deletions docs/plans/greptimedb-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# GreptimeDB 集成计划

## 目标
将 devbase 的时序数据(health、code_metrics、stars)从 SQLite 单表迁移到 GreptimeDB,实现趋势分析、Burn-rate 监控与大时间窗口聚合。

## 架构原则
- **SQLite 保留为 Registry OLTP**:repo 元数据、关系图谱、vault notes 继续存 SQLite。
- **GreptimeDB 作为 OLAP 时序库**:health、metrics、stars 等时间序列数据双写。
- **Feature-gated**:`--features greptimedb` 才编译接入层,零成本抽象。
- **异步写入**:使用 `greptimedb-ingester` gRPC 客户端,批量异步提交。

## Schema 设计

### health_metrics
```sql
CREATE TABLE health_metrics (
repo_id STRING,
status STRING,
ahead INT,
behind INT,
checked_at TIMESTAMP,
TIME INDEX (checked_at),
PRIMARY KEY (repo_id, checked_at)
);
```

### code_metrics
```sql
CREATE TABLE code_metrics (
repo_id STRING,
total_lines INT,
source_lines INT,
test_lines INT,
comment_lines INT,
file_count INT,
language_breakdown STRING,
updated_at TIMESTAMP,
TIME INDEX (updated_at),
PRIMARY KEY (repo_id, updated_at)
);
```

### stars_history
```sql
CREATE TABLE stars_history (
repo_id STRING,
stars INT,
fetched_at TIMESTAMP,
TIME INDEX (fetched_at),
PRIMARY KEY (repo_id, fetched_at)
);
```

## 实施阶段

### Phase A: 基础架构(当前)
- [x] Cargo.toml feature gate + `greptimedb-ingester` 依赖
- [x] `GreptimeConfig` 配置结构
- [x] `src/greptime.rs` 空模块与连接管理

### Phase B: Health 双写 PoC
- [ ] `save_health` 后调用 `greptime::write_health`
- [ ] CLI `health` 命令增加 `--write-greptime` 标志

### Phase C: Metrics & Stars
- [ ] `run_metrics` 双写
- [ ] `github-info` stars 双写

### Phase D: 查询适配
- [ ] `query` 命令支持 `trend:` 前缀(从 GreptimeDB 读取时序趋势)
- [ ] Dashboard SQL 模板

## 兼容性
- 无 `greptimedb` feature 时,100% 保持现有 SQLite-only 行为。
- 连接失败时降级为仅 SQLite,打印 warning,不阻塞主流程。
3 changes: 3 additions & 0 deletions src/commands/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn run_health(
};

let conn = ctx.conn()?;
let greptime_client = crate::greptime::GreptimeClient::new(&ctx.config.greptime);
if json {
let output = health::run_json(
&conn,
Expand All @@ -49,6 +50,7 @@ pub async fn run_health(
ctx.config.cache.ttl_seconds,
&ctx.i18n,
&env_cache,
Some(&greptime_client),
)
.await?;
println!("{}", serde_json::to_string_pretty(&output)?);
Expand All @@ -61,6 +63,7 @@ pub async fn run_health(
ctx.config.cache.ttl_seconds,
&ctx.i18n,
&env_cache,
Some(&greptime_client),
)
.await?;
}
Expand Down
44 changes: 44 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub struct Config {
pub arxiv: ArxivConfig,
#[serde(default)]
pub scan: ScanConfig,
#[serde(default)]
pub greptime: GreptimeConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -490,3 +492,45 @@ max_tokens = 400
assert_eq!(cfg.daemon.interval_seconds, 3600);
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GreptimeConfig {
#[serde(default = "default_greptime_enabled")]
pub enabled: bool,
#[serde(default = "default_greptime_endpoint")]
pub endpoint: String,
#[serde(default = "default_greptime_dbname")]
pub dbname: String,
#[serde(default = "default_greptime_username")]
pub username: String,
#[serde(default)]
pub password: Option<String>,
}

impl Default for GreptimeConfig {
fn default() -> Self {
Self {
enabled: default_greptime_enabled(),
endpoint: default_greptime_endpoint(),
dbname: default_greptime_dbname(),
username: default_greptime_username(),
password: None,
}
}
}

fn default_greptime_enabled() -> bool {
false
}

fn default_greptime_endpoint() -> String {
"127.0.0.1:4001".to_string()
}

fn default_greptime_dbname() -> String {
"devbase".to_string()
}

fn default_greptime_username() -> String {
"root".to_string()
}
118 changes: 118 additions & 0 deletions src/greptime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-License-Identifier: MIT
// Copyright (c) 2026 juice094
//! GreptimeDB integration layer (feature-gated).
//!
//! Provides optional async ingestion of time-series data:
//! - repo health (ahead/behind/status)
//! - code metrics (LOC/language breakdown)
//! - GitHub stars history
//!
//! When the `greptimedb` feature is disabled this module compiles to no-ops.

use crate::config::GreptimeConfig;

/// Shared GreptimeDB client handle.
pub struct GreptimeClient {
#[cfg(feature = "greptimedb")]
inner: Option<greptimedb_ingester::database::Database>,
#[cfg(not(feature = "greptimedb"))]
_placeholder: (),
}

impl GreptimeClient {
/// Create a new client from configuration.
/// Returns a no-op client if disabled or if the feature is not compiled.
pub fn new(config: &GreptimeConfig) -> Self {
#[cfg(feature = "greptimedb")]
{
if !config.enabled {
return Self { inner: None };
}
let client = greptimedb_ingester::client::Client::with_urls(&[&config.endpoint]);
let db =
greptimedb_ingester::database::Database::new_with_dbname(&config.dbname, client);
Self { inner: Some(db) }
}
#[cfg(not(feature = "greptimedb"))]
{
let _ = config;
Self { _placeholder: () }
}
}

/// Write a health entry. No-op when feature is disabled.
pub async fn write_health(
&self,
repo_id: &str,
entry: &crate::registry::HealthEntry,
) -> anyhow::Result<()> {
#[cfg(feature = "greptimedb")]
{
if let Some(db) = &self.inner {
use greptimedb_ingester::ColumnDataType;
use greptimedb_ingester::api::v1::{
Row, RowInsertRequest, RowInsertRequests, Rows,
};
use greptimedb_ingester::helpers::schema::{field, tag, timestamp};
use greptimedb_ingester::helpers::values::{
i64_value, string_value, timestamp_millisecond_value,
};

let schema = vec![
tag("repo_id", ColumnDataType::String),
timestamp("checked_at", ColumnDataType::TimestampMillisecond),
field("status", ColumnDataType::String),
field("ahead", ColumnDataType::Int64),
field("behind", ColumnDataType::Int64),
];

let checked_at_ms = entry.checked_at.timestamp_millis();
let rows = vec![Row {
values: vec![
string_value(repo_id.to_string()),
timestamp_millisecond_value(checked_at_ms),
string_value(entry.status.clone()),
i64_value(entry.ahead as i64),
i64_value(entry.behind as i64),
],
}];

let req = RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: "health_metrics".to_string(),
rows: Some(Rows { schema, rows }),
}],
};

if let Err(e) = db.insert(req).await {
tracing::warn!("GreptimeDB write_health failed for {}: {}", repo_id, e);
}
}
}
let _ = repo_id;
let _ = entry;
Ok(())
}

/// Write code metrics. No-op when feature is disabled.
pub async fn write_metrics(
&self,
_repo_id: &str,
_metrics: &crate::registry::CodeMetrics,
) -> anyhow::Result<()> {
#[cfg(feature = "greptimedb")]
{
// Phase C: convert CodeMetrics to GreptimeDB row batch.
}
Ok(())
}

/// Write stars snapshot. No-op when feature is disabled.
pub async fn write_stars(&self, _repo_id: &str, _stars: u64) -> anyhow::Result<()> {
#[cfg(feature = "greptimedb")]
{
// Phase C: convert stars to GreptimeDB row batch.
}
Ok(())
}
}
24 changes: 22 additions & 2 deletions src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub fn compute_workspace_hash(root: &Path) -> anyhow::Result<String> {
Ok(hasher.finalize().to_hex().to_string())
}

#[allow(clippy::too_many_arguments)]
pub async fn run_json(
conn: &rusqlite::Connection,
detail: bool,
Expand All @@ -71,6 +72,7 @@ pub async fn run_json(
ttl_seconds: i64,
i18n: &I18n,
env_cache: &EnvVersionCache,
greptime_client: Option<&crate::greptime::GreptimeClient>,
) -> anyhow::Result<serde_json::Value> {
let start = std::time::Instant::now();
let (total_repos, dirty_repos, behind_upstream, no_upstream_count, repo_details) = {
Expand Down Expand Up @@ -133,6 +135,15 @@ pub async fn run_json(
if let Err(e) = reg_health::save_health(conn, &repo.id, &new_health) {
tracing::warn!("Failed to save health for {}: {}", repo.id, e);
}
if let Some(client) = greptime_client
&& let Err(e) = client.write_health(&repo.id, &new_health).await
{
tracing::warn!(
"GreptimeDB health write failed for {}: {}",
repo.id,
e
);
}
(status, ahead, behind)
}
}
Expand All @@ -153,6 +164,11 @@ pub async fn run_json(
{
tracing::warn!("Failed to save health for {}: {}", repo.id, e);
}
if let Some(client) = greptime_client
&& let Err(e) = client.write_health(&repo.id, &new_health).await
{
tracing::warn!("GreptimeDB health write failed for {}: {}", repo.id, e);
}
(status, ahead, behind)
}
}
Expand Down Expand Up @@ -288,6 +304,7 @@ pub async fn run_json(
}))
}

#[allow(clippy::too_many_arguments)]
pub async fn run(
conn: &rusqlite::Connection,
detail: bool,
Expand All @@ -296,8 +313,10 @@ pub async fn run(
ttl_seconds: i64,
i18n: &I18n,
env_cache: &EnvVersionCache,
greptime_client: Option<&crate::greptime::GreptimeClient>,
) -> anyhow::Result<()> {
let result = run_json(conn, detail, limit, page, ttl_seconds, i18n, env_cache).await?;
let result =
run_json(conn, detail, limit, page, ttl_seconds, i18n, env_cache, greptime_client).await?;

let summary = result["summary"]
.as_object()
Expand Down Expand Up @@ -471,7 +490,8 @@ impl HealthClient for AppContext {
self.set_env_cache(fresh.clone())?;
fresh
};
run_json(&conn, detail, 0, 1, self.config.cache.ttl_seconds, &self.i18n, &env_cache).await
run_json(&conn, detail, 0, 1, self.config.cache.ttl_seconds, &self.i18n, &env_cache, None)
.await
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod dependency_graph;
pub mod digest;
pub mod discovery_engine;
pub mod embedding;
pub mod greptime;
pub mod health;
pub mod i18n;
pub mod knowledge_engine;
Expand Down
Loading
Loading