|
1 | 1 | use anyhow::{Result, Context}; |
2 | | -use tracing::{info, debug}; |
| 2 | +use tracing::{info, debug, warn}; |
3 | 3 |
|
4 | | -pub async fn handle(app: Option<String>) -> Result<()> { |
5 | | - let appn = app.unwrap_or_else(|| std::env::var("AETHER_DEFAULT_APP").unwrap_or_else(|_| "sample-app".into())); |
| 4 | +#[derive(Debug, Clone, Default)] |
| 5 | +pub struct LogsOptions { |
| 6 | + pub app: Option<String>, |
| 7 | + pub follow: bool, |
| 8 | + pub since: Option<String>, |
| 9 | + pub container: Option<String>, |
| 10 | + pub format: Option<String>, |
| 11 | + pub color: bool, |
| 12 | +} |
| 13 | + |
| 14 | +pub async fn handle_opts(opts: LogsOptions) -> Result<()> { |
| 15 | + let appn = opts.app.unwrap_or_else(|| std::env::var("AETHER_DEFAULT_APP").unwrap_or_else(|_| "sample-app".into())); |
6 | 16 | let base = std::env::var("AETHER_API_BASE").unwrap_or_else(|_| "http://localhost:8080".into()); |
7 | | - let follow = std::env::var("AETHER_LOGS_FOLLOW").ok().map(|v| v=="1" || v.eq_ignore_ascii_case("true")).unwrap_or(true); |
8 | | - let since = std::env::var("AETHER_LOGS_SINCE").ok(); |
9 | | - let container = std::env::var("AETHER_LOGS_CONTAINER").ok(); |
10 | | - let format = std::env::var("AETHER_LOGS_FORMAT").unwrap_or_else(|_| "text".into()); // default to human text |
| 17 | + let follow_env = std::env::var("AETHER_LOGS_FOLLOW").ok().map(|v| v=="1" || v.eq_ignore_ascii_case("true")); |
| 18 | + let follow = opts.follow || follow_env.unwrap_or(true); |
| 19 | + let since = opts.since.or_else(|| std::env::var("AETHER_LOGS_SINCE").ok()); |
| 20 | + let container = opts.container.or_else(|| std::env::var("AETHER_LOGS_CONTAINER").ok()); |
| 21 | + let format = opts.format.unwrap_or_else(|| std::env::var("AETHER_LOGS_FORMAT").unwrap_or_else(|_| "text".into())); // default to human text |
| 22 | + let color = opts.color || std::env::var("AETHER_COLOR").ok().map(|v| v=="1" || v.eq_ignore_ascii_case("true")).unwrap_or(false); |
11 | 23 | let tail: u32 = std::env::var("AETHER_LOGS_TAIL").ok().and_then(|v| v.parse().ok()).unwrap_or(100); |
12 | 24 |
|
13 | 25 | // Mock mode: allow tests/dev to bypass network entirely. Triggered if: |
@@ -42,25 +54,52 @@ pub async fn handle(app: Option<String>) -> Result<()> { |
42 | 54 | if let Some(c) = container { url.push_str("&container="); url.push_str(&urlencoding::encode(&c)); } |
43 | 55 |
|
44 | 56 | debug!(%url, "logs.request"); |
45 | | - let client = reqwest::Client::builder().build()?; |
46 | | - let resp = client.get(&url).send().await.context("request logs")?; |
47 | | - if !resp.status().is_success() { |
48 | | - anyhow::bail!("logs fetch failed: {}", resp.status()); |
49 | | - } |
50 | | - let ct = resp.headers().get(reqwest::header::CONTENT_TYPE).and_then(|v| v.to_str().ok()).unwrap_or(""); |
51 | | - let is_json_lines = ct.starts_with("application/x-ndjson") || format.eq_ignore_ascii_case("json"); |
52 | | - let mut stream = resp.bytes_stream(); |
53 | | - use futures_util::StreamExt; |
54 | | - use tokio::io::AsyncWriteExt; |
55 | | - let mut stdout = tokio::io::stdout(); |
56 | | - while let Some(chunk) = stream.next().await { |
57 | | - let bytes = chunk.context("read chunk")?; |
58 | | - if is_json_lines { |
59 | | - stdout.write_all(&bytes).await?; // already newline delimited |
60 | | - } else { |
61 | | - stdout.write_all(&bytes).await?; // text lines already framed by server |
| 57 | + let client = reqwest::Client::builder() |
| 58 | + .pool_idle_timeout(std::time::Duration::from_secs(30)) |
| 59 | + .build()?; |
| 60 | + |
| 61 | + // reconnecting loop for follow=true |
| 62 | + let mut attempt: u32 = 0; |
| 63 | + let max_reconnects = std::env::var("AETHER_LOGS_MAX_RECONNECTS").ok().and_then(|v| v.parse::<u32>().ok()); |
| 64 | + loop { |
| 65 | + let resp = client.get(&url).send().await.context("request logs")?; |
| 66 | + if !resp.status().is_success() { |
| 67 | + anyhow::bail!("logs fetch failed: {}", resp.status()); |
62 | 68 | } |
63 | | - stdout.flush().await.ok(); |
| 69 | + let ct = resp.headers().get(reqwest::header::CONTENT_TYPE).and_then(|v| v.to_str().ok()).unwrap_or(""); |
| 70 | + let is_json_lines = ct.starts_with("application/x-ndjson") || format.eq_ignore_ascii_case("json"); |
| 71 | + let mut stream = resp.bytes_stream(); |
| 72 | + use futures_util::StreamExt; |
| 73 | + use tokio::io::AsyncWriteExt; |
| 74 | + let mut stdout = tokio::io::stdout(); |
| 75 | + while let Some(chunk) = stream.next().await { |
| 76 | + match chunk { |
| 77 | + Ok(bytes) => { |
| 78 | + if is_json_lines { |
| 79 | + if color { |
| 80 | + // passthrough for now; colorization could parse JSON and add ANSI later |
| 81 | + stdout.write_all(&bytes).await?; |
| 82 | + } else { |
| 83 | + stdout.write_all(&bytes).await?; |
| 84 | + } |
| 85 | + } else { |
| 86 | + stdout.write_all(&bytes).await?; |
| 87 | + } |
| 88 | + stdout.flush().await.ok(); |
| 89 | + } |
| 90 | + Err(e) => { |
| 91 | + warn!(error=%e, "logs.stream.chunk_error"); |
| 92 | + break; // trigger reconnect if follow |
| 93 | + } |
| 94 | + } |
| 95 | + } |
| 96 | + if !follow { break; } |
| 97 | + attempt = attempt.saturating_add(1); |
| 98 | + if let Some(max) = max_reconnects { if attempt >= max { break; } } |
| 99 | + let backoff_ms = (100u64).saturating_mul((attempt.min(50) + 1) as u64); |
| 100 | + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; |
| 101 | + debug!(attempt, backoff_ms, "logs.stream.reconnect"); |
| 102 | + continue; |
64 | 103 | } |
65 | 104 | info!(app=%appn, "logs.stream.end"); |
66 | 105 | Ok(()) |
@@ -90,7 +129,16 @@ mod tests { |
90 | 129 |
|
91 | 130 | std::env::set_var("AETHER_API_BASE", format!("http://{}:{}", addr.ip(), addr.port())); |
92 | 131 | std::env::set_var("AETHER_LOGS_FOLLOW", "0"); |
93 | | - let res = handle(Some("demo".into())).await; |
| 132 | + let res = handle_opts(LogsOptions{ app: Some("demo".into()), ..Default::default() }).await; |
| 133 | + assert!(res.is_ok()); |
| 134 | + } |
| 135 | + |
| 136 | + #[tokio::test] |
| 137 | + async fn mock_mode_respects_format_and_env() { |
| 138 | + std::env::set_var("AETHER_API_BASE", "http://127.0.0.1:0"); |
| 139 | + std::env::set_var("AETHER_LOGS_MOCK", "1"); |
| 140 | + std::env::set_var("AETHER_LOGS_FORMAT", "json"); |
| 141 | + let res = handle_opts(LogsOptions{ app: Some("demo".into()), ..Default::default() }).await; |
94 | 142 | assert!(res.is_ok()); |
95 | 143 | } |
96 | 144 | } |
0 commit comments