diff --git a/Cargo.lock b/Cargo.lock index 91e977c..2a1deb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1013,17 +1013,19 @@ dependencies = [ [[package]] name = "tlparse" -version = "0.4.8" +version = "0.4.9" dependencies = [ "anyhow", "assert_cmd", "base16ct", "chrono", "clap", + "flate2", "fxhash", "html-escape", "indexmap", "indicatif", + "libc", "md-5", "once_cell", "opener", diff --git a/Cargo.toml b/Cargo.toml index 44bb7eb..91b7c4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tlparse" -version = "0.4.8" +version = "0.4.9" edition = "2021" authors = ["Edward Z. Yang "] description = "Parse TORCH_LOG logs produced by PyTorch torch.compile" @@ -33,8 +33,14 @@ serde = { version = "1.0.185", features = ["serde_derive"] } serde_json = "1.0.100" tinytemplate = "1.1.0" tiny_http = "0.12" +flate2 = "1.0" [dev-dependencies] assert_cmd = "2.0" +libc = "0.2" predicates = "3.1.0" tempfile = "3.10.1" + +[[bench]] +name = "parse_benchmark" +harness = false diff --git a/benches/parse_benchmark.rs b/benches/parse_benchmark.rs new file mode 100644 index 0000000..6f5e4f6 --- /dev/null +++ b/benches/parse_benchmark.rs @@ -0,0 +1,180 @@ +//! Benchmark for tlparse: measures wall time and peak memory (RSS). +//! +//! Usage: +//! TLPARSE_BENCH_INPUT=/path/to/file cargo bench --bench parse_benchmark +//! cargo bench --bench parse_benchmark -- /path/to/file # custom input via CLI arg + +use std::io::BufRead; +use std::path::PathBuf; +use std::time::Instant; +use tempfile::tempdir; + +const WARMUP_ITERS: u32 = 2; +const BENCH_ITERS: u32 = 5; + +fn get_peak_rss_bytes() -> Option { + #[cfg(target_os = "macos")] + { + use std::mem::MaybeUninit; + unsafe { + let mut usage = MaybeUninit::::zeroed(); + if libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr()) == 0 { + // macOS reports ru_maxrss in bytes + Some(usage.assume_init().ru_maxrss as u64) + } else { + None + } + } + } + #[cfg(target_os = "linux")] + { + use std::mem::MaybeUninit; + unsafe { + let mut usage = MaybeUninit::::zeroed(); + if libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr()) == 0 { + // Linux reports ru_maxrss in kilobytes + Some(usage.assume_init().ru_maxrss as u64 * 1024) + } else { + None + } + } + } + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + { + None + } +} + +fn format_bytes(bytes: u64) -> String { + if bytes >= 1024 * 1024 * 1024 { + format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) + } else if bytes >= 1024 * 1024 { + format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0)) + } else if bytes >= 1024 { + format!("{:.2} KB", bytes as f64 / 1024.0) + } else { + format!("{} B", bytes) + } +} + +fn run_parse(input: &PathBuf) -> std::time::Duration { + let config = tlparse::ParseConfig::default(); + let out_dir = tempdir().expect("failed to create temp dir"); + let start = Instant::now(); + let output = tlparse::parse_path(input, &config).expect("parse_path failed"); + let elapsed = start.elapsed(); + + // Write output to exercise the full pipeline + for (path, content) in &output { + let full_path = out_dir.path().join(path); + if let Some(parent) = full_path.parent() { + std::fs::create_dir_all(parent).expect("failed to create output subdirectory"); + } + std::fs::write(&full_path, content).expect("failed to write output file"); + } + elapsed +} + +fn main() { + // Determine input path: CLI arg > env var (no default — must be explicit) + let args: Vec = std::env::args().collect(); + let input_path = if args.len() > 1 && !args[1].starts_with('-') { + PathBuf::from(&args[1]) + } else if let Ok(env_path) = std::env::var("TLPARSE_BENCH_INPUT") { + PathBuf::from(env_path) + } else { + eprintln!("Error: no input file specified."); + eprintln!(); + eprintln!("Provide a TORCH_LOG file via one of:"); + eprintln!(" TLPARSE_BENCH_INPUT=/path/to/file cargo bench --bench parse_benchmark"); + eprintln!(" cargo bench --bench parse_benchmark -- /path/to/file"); + std::process::exit(1); + }; + + if !input_path.exists() { + eprintln!("Error: input file not found: {}", input_path.display()); + std::process::exit(1); + } + + let file_size = std::fs::metadata(&input_path).map(|m| m.len()).unwrap_or(0); + let line_count = std::io::BufReader::new( + std::fs::File::open(&input_path).expect("failed to open input file for line counting"), + ) + .lines() + .count(); + + println!("=== tlparse benchmark ==="); + println!( + "Input: {} ({}, {} lines)", + input_path.display(), + format_bytes(file_size), + line_count + ); + println!(); + + // Cold-run RSS: measure peak RSS after a single parse before any warmup. + // This captures the first-run memory footprint before caches are populated. + let rss_cold_before = get_peak_rss_bytes(); + run_parse(&input_path); + let rss_cold_after = get_peak_rss_bytes(); + + // Warmup + print!("Warming up ({WARMUP_ITERS} iterations)..."); + for _ in 0..WARMUP_ITERS { + run_parse(&input_path); + } + println!(" done"); + + // NOTE: ru_maxrss reports the *lifetime* peak RSS of the process, so the + // value after warmup already includes the high-water mark from earlier + // iterations. The "RSS delta (during bench)" below therefore only captures + // *new* peaks that exceed the warmup maximum — it will be zero if the + // warmup already reached the true peak. The cold-run measurement above + // provides a more meaningful single-iteration memory figure. + let rss_before = get_peak_rss_bytes(); + + // Benchmark + println!("Running {BENCH_ITERS} iterations..."); + let mut durations = Vec::with_capacity(BENCH_ITERS as usize); + for i in 0..BENCH_ITERS { + let elapsed = run_parse(&input_path); + println!(" iter {}: {:.3}ms", i + 1, elapsed.as_secs_f64() * 1000.0); + durations.push(elapsed); + } + + let rss_after = get_peak_rss_bytes(); + + // Stats + durations.sort(); + let total: std::time::Duration = durations.iter().sum(); + let mean = total / BENCH_ITERS; + let median = durations[durations.len() / 2]; + let min = durations[0]; + let max = durations[durations.len() - 1]; + + println!(); + println!("--- Results ---"); + println!(" mean: {:.3}ms", mean.as_secs_f64() * 1000.0); + println!(" median: {:.3}ms", median.as_secs_f64() * 1000.0); + println!(" min: {:.3}ms", min.as_secs_f64() * 1000.0); + println!(" max: {:.3}ms", max.as_secs_f64() * 1000.0); + + // Cold-run RSS (single iteration, no prior warmup) + if let (Some(before), Some(after)) = (rss_cold_before, rss_cold_after) { + println!(" cold-run peak RSS: {}", format_bytes(after)); + if after > before { + println!(" cold-run RSS delta: {}", format_bytes(after - before)); + } + } + + if let Some(rss) = rss_after { + println!(" lifetime peak RSS: {}", format_bytes(rss)); + if let Some(before) = rss_before { + if rss > before { + println!(" RSS delta (during bench): {}", format_bytes(rss - before)); + } + } + } else { + println!(" peak RSS: unavailable on this platform"); + } +} diff --git a/src/cli.rs b/src/cli.rs index 587de96..0254d7b 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,8 +1,10 @@ use clap::Parser; use anyhow::{bail, Context}; +use flate2::write::GzEncoder; +use flate2::Compression; use std::fs; -use std::io::Read; +use std::io::{self, Read}; use std::path::PathBuf; use tlparse::{ @@ -162,6 +164,21 @@ fn parse_and_write_output( } fs::write(out_path, content)?; } + + // Copy the raw log file directly instead of reading it into memory. + // This avoids holding the entire input file as a String in ParseOutput. + if log_path.extension().map_or(false, |ext| ext == "gz") { + fs::copy(log_path, output_dir.join("raw.log.gz"))?; + } else { + fs::copy(log_path, output_dir.join("raw.log"))?; + // Also store a gzip-compressed copy alongside raw.log + let mut in_file = fs::File::open(log_path)?; + let gz_file = fs::File::create(output_dir.join("raw.log.gz"))?; + let mut encoder = GzEncoder::new(gz_file, Compression::default()); + io::copy(&mut in_file, &mut encoder)?; + encoder.finish()?; + } + Ok(output_dir.join("index.html")) } @@ -226,9 +243,11 @@ fn handle_all_ranks( return None; } let filename = path.file_name()?.to_str()?; - filename - .strip_prefix("dedicated_log_torch_trace_rank_")? - .strip_suffix(".log")? + let after_prefix = filename.strip_prefix("dedicated_log_torch_trace_rank_")?; + let after_suffix = after_prefix + .strip_suffix(".log.gz") + .or_else(|| after_prefix.strip_suffix(".log"))?; + after_suffix .split('_') .next()? .parse::() diff --git a/src/lib.rs b/src/lib.rs index f147837..a43b7ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ use serde_json::Value; use std::cell::RefCell; use std::fmt::Write as FmtWrite; use std::fs::{self, File}; -use std::io::{self, BufRead}; +use std::io::{self, BufRead, Read}; use std::path::{Path, PathBuf}; use std::time::Instant; use tinytemplate::TinyTemplate; @@ -29,7 +29,7 @@ pub mod vllm; pub use types::{ ArtifactFlags, CollectiveSchedule, CollectivesParityReport, Diagnostics, DivergenceFlags, DivergenceGroup, ExecOrderSummary, GraphAnalysis, GraphCollectivesParity, GraphRuntime, - MultiRankContext, RankMetaData, RuntimeAnalysis, RuntimeRankDetail, + MultiRankContext, OpRuntime, RankMetaData, RuntimeAnalysis, RuntimeRankDetail, }; pub use execution_order::{ @@ -408,6 +408,108 @@ fn handle_guard( }); } +/// Write a JSON line to shortraw (raw.jsonl) by parsing into serde_json::Value, +/// inserting glog metadata fields (timestamp, thread, pathname, lineno), and +/// re-serializing with BTreeMap-sorted keys for deterministic output. +fn write_to_shortraw( + content: &mut String, + json_line: &str, + payload_filename: Option<&str>, + timestamp: &str, + caps: ®ex::Captures, + multi: &MultiProgress, + stats: &mut Stats, +) { + let trimmed = json_line.trim_end(); + if !trimmed.ends_with('}') { + multi.suspend(|| { + eprintln!("JSON payload is not an object, dropping line from raw.jsonl"); + }); + stats.fail_json += 1; + return; + } + + // Parse as serde_json::Value (BTreeMap-backed) so keys are alphabetically sorted, + // matching the baseline output format. + let mut value: serde_json::Value = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(_) => { + multi.suspend(|| { + eprintln!("Failed to parse JSON for raw.jsonl, dropping line"); + }); + stats.fail_json += 1; + return; + } + }; + + let obj = value.as_object_mut().unwrap(); + + // Check for key conflicts after parsing, so we check real keys not string patterns in values. + let conflict_keys: &[&str] = if payload_filename.is_some() { + &[ + "timestamp", + "thread", + "pathname", + "lineno", + "payload_filename", + ] + } else { + &["timestamp", "thread", "pathname", "lineno"] + }; + for key in conflict_keys { + if obj.contains_key(*key) { + multi.suspend(|| { + eprintln!( + "Key conflict: \"{}\" already exists in JSON payload, skipping raw.jsonl JSONL conversion", + key + ); + }); + stats.fail_key_conflict += 1; + return; + } + } + + let thread = caps.name("thread").unwrap().as_str(); + let pathname = caps.name("pathname").unwrap().as_str(); + let lineno_str = caps.name("line").unwrap().as_str(); + + // Parse lineno as a number to match baseline (it was inserted as raw numeric in old code) + let lineno_value: serde_json::Value = if let Ok(n) = lineno_str.parse::() { + serde_json::Value::Number(n.into()) + } else { + serde_json::Value::String(lineno_str.to_string()) + }; + + // Parse thread as a number to match baseline + let thread_value: serde_json::Value = if let Ok(n) = thread.parse::() { + serde_json::Value::Number(n.into()) + } else { + serde_json::Value::String(thread.to_string()) + }; + + obj.insert( + "timestamp".to_string(), + serde_json::Value::String(timestamp.to_string()), + ); + obj.insert("thread".to_string(), thread_value); + obj.insert( + "pathname".to_string(), + serde_json::Value::String(pathname.to_string()), + ); + obj.insert("lineno".to_string(), lineno_value); + + if let Some(pf) = payload_filename { + obj.insert( + "payload_filename".to_string(), + serde_json::Value::String(pf.to_string()), + ); + } + + // Serialize with sorted keys (BTreeMap guarantees alphabetical order) + content.push_str(&serde_json::to_string(&value).unwrap()); + content.push('\n'); +} + pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result { let strict = config.strict; if !path.is_file() { @@ -426,7 +528,13 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result-")); let spinner = multi.add(ProgressBar::new_spinner()); - let reader = io::BufReader::new(file); + let is_gzipped = path.extension().map_or(false, |ext| ext == "gz"); + let reader: Box = if is_gzipped { + Box::new(flate2::read::GzDecoder::new(file)) + } else { + Box::new(file) + }; + let reader = io::BufReader::new(reader); let re_glog = Regex::new(concat!( r"(?[VIWEC])(?\d{2})(?\d{2}) ", @@ -436,15 +544,8 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result.)" ))?; - // Helper functions to reduce repetitive serde_json::Value creation - let make_string_value = |caps: ®ex::Captures, name: &str| -> serde_json::Value { - serde_json::Value::String(caps.name(name).unwrap().as_str().to_string()) - }; - - let make_number_value = |caps: ®ex::Captures, name: &str| -> serde_json::Value { - let parsed: u64 = caps.name(name).unwrap().as_str().parse().unwrap(); - serde_json::Value::Number(serde_json::Number::from(parsed)) - }; + // Compute year once instead of calling chrono::Utc::now().year() per line + let year = chrono::Utc::now().year(); // Helper function to format timestamp as ISO-8601 let format_timestamp = |caps: ®ex::Captures| -> String { @@ -455,9 +556,6 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result anyhow::Result anyhow::Result = Vec::new(); all_parsers.extend(config.custom_parsers.iter()); + // Reuse payload buffer across iterations to avoid repeated allocation + let mut payload_buf = String::new(); + while let Some((lineno, line)) = iter.next() { bytes_read += line.len() as u64; pb.set_position(bytes_read); @@ -587,117 +689,10 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result slowest_time { slowest_time = end; } - let payload = &line[caps.name("payload").unwrap().start()..]; - let original_json_envelope = payload; // Store the original JSON envelope - - // Helper function to safely insert keys and detect conflicts - let try_insert = |obj: &mut serde_json::Map, - key: &str, - value: serde_json::Value, - multi: &MultiProgress, - stats: &mut Stats| - -> bool { - if obj.contains_key(key) { - multi.suspend(|| { - eprintln!("Key conflict: '{}' already exists in JSON payload, skipping raw.jsonl JSONL conversion", key); - }); - stats.fail_key_conflict += 1; - false - } else { - obj.insert(key.to_string(), value); - true - } - }; - - // Create cleanup lambda to handle raw.jsonl writing as JSONL - let write_to_shortraw = |shortraw_content: &mut String, - payload_filename: Option, - multi: &MultiProgress, - stats: &mut Stats| { - match serde_json::from_str::(original_json_envelope) { - Ok(mut json_value) => { - if let Some(obj) = json_value.as_object_mut() { - // Try to add all log fields, abort on any conflict - let success = try_insert( - obj, - "timestamp", - serde_json::Value::String(format_timestamp(&caps)), - multi, - stats, - ) && try_insert( - obj, - "thread", - make_number_value(&caps, "thread"), - multi, - stats, - ) && try_insert( - obj, - "pathname", - make_string_value(&caps, "pathname"), - multi, - stats, - ) && try_insert( - obj, - "lineno", - make_number_value(&caps, "line"), - multi, - stats, - ); + let json_line = &line[caps.name("payload").unwrap().start()..]; - // Try to add payload filename if provided - let success = if let Some(payload_file) = payload_filename { - success - && try_insert( - obj, - "payload_filename", - serde_json::Value::String(payload_file), - multi, - stats, - ) - } else { - success - }; - - if !success { - // Drop line due to key conflict - don't write anything to maintain JSONL format - return; - } - - // Output as JSONL - match serde_json::to_string(&json_value) { - Ok(jsonl_line) => { - shortraw_content.push_str(&jsonl_line); - shortraw_content.push('\n'); - } - Err(e) => { - multi.suspend(|| { - eprintln!("Failed to serialize JSON for raw.jsonl: {}", e); - }); - stats.fail_json_serialization += 1; - // Drop line to maintain JSONL format - don't write anything - } - } - } else { - // Not a JSON object, drop line to maintain JSONL format - multi.suspend(|| { - eprintln!( - "JSON payload is not an object, dropping line from raw.jsonl" - ); - }); - stats.fail_json += 1; - } - } - Err(e) => { - // JSON parsing failed, drop line to maintain JSONL format - multi.suspend(|| { - eprintln!("Failed to parse JSON envelope for raw.jsonl: {}", e); - }); - stats.fail_json += 1; - } - } - }; - - let e = match serde_json::from_str::(payload) { + // Parse Envelope from JSON line (single parse -- no separate Value parse needed) + let e = match serde_json::from_str::(json_line) { Ok(r) => r, Err(err) => { multi.suspend(|| { @@ -707,7 +702,16 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result anyhow::Result anyhow::Result anyhow::Result { if rank != e.rank { stats.other_rank += 1; - write_to_shortraw(&mut shortraw_content, None, &multi, &mut stats); + write_to_shortraw( + &mut shortraw_content, + json_line, + None, + &format_timestamp(&caps), + &caps, + &multi, + &mut stats, + ); continue; } } @@ -780,9 +792,7 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result anyhow::Result anyhow::Result = - Box::new(crate::parsers::CompilationMetricsParser { + // Step 1: construct parser borrowing compile_directory immutably, call parse(). + // The parser + its borrow are dropped at the end of this block. + let metrics_parse_result = { + let parser = crate::parsers::CompilationMetricsParser { tt: &tt, stack_index: &stack_index, symbolic_shape_specialization_index: &symbolic_shape_specialization_index, guard_added_fast_index: &guard_added_fast_index, create_symbol_index: &create_symbol_index, unbacked_symbol_index: &unbacked_symbol_index, - output_files: &copied_directory, + output_files: compile_directory.as_slice(), compile_id_dir: &compile_id_dir, - }); - let result = run_parser( - lineno, - &parser, - &e, - &payload, - &mut output_count, - &mut output, - compile_directory, - &multi, - &mut stats, - &vllm_state, - ); - // Take the last PayloadFilename entry as per the requirement - if matches!(result, ParserResult::PayloadFilename(_)) { - parser_payload_filename = result; + }; + parser + .get_metadata(&e) + .map(|md| parser.parse(lineno, md, e.rank, &e.compile_id, &payload_buf)) + }; + // Step 2: parser is dropped, immutable borrow of compile_directory ends. + // Now we can mutate compile_directory to add results. + if let Some(result) = metrics_parse_result { + match result { + Ok(results) => { + for parser_result in results { + match parser_result { + ParserOutput::File(raw_filename, out) => { + let filename = add_unique_suffix(raw_filename, output_count); + add_file_output( + filename, + out, + &mut output, + compile_directory, + &mut output_count, + &vllm_state, + ); + } + ParserOutput::GlobalFile(filename, out) => { + add_file_output( + filename, + out, + &mut output, + compile_directory, + &mut output_count, + &vllm_state, + ); + } + ParserOutput::PayloadFile(raw_filename) => { + let filename = add_unique_suffix(raw_filename, output_count); + parser_payload_filename = ParserResult::PayloadFilename( + filename.to_string_lossy().to_string(), + ); + add_file_output( + filename, + payload_buf.to_string(), + &mut output, + compile_directory, + &mut output_count, + &vllm_state, + ); + } + ParserOutput::PayloadReformatFile(raw_filename, formatter) => { + let filename = add_unique_suffix(raw_filename, output_count); + match formatter(&payload_buf) { + Ok(formatted_content) => { + parser_payload_filename = ParserResult::PayloadFilename( + filename.to_string_lossy().to_string(), + ); + add_file_output( + filename, + formatted_content, + &mut output, + compile_directory, + &mut output_count, + &vllm_state, + ); + } + Err(err) => { + multi.suspend(|| { + eprintln!( + "Failed to format payload for {}: {}", + filename.to_string_lossy(), + err + ) + }); + stats.fail_parser += 1; + } + } + } + ParserOutput::Link(name, url) => { + compile_directory.push(OutputFile { + url, + name, + number: output_count, + suffix: "".to_string(), + readable_url: None, + }); + output_count += 1; + } + } + } + } + Err(err) => { + multi.suspend(|| eprintln!("Parser compilation_metrics failed: {err}")); + stats.fail_parser += 1; + } + } } // compilation metrics is always the last output, since it just ran @@ -887,10 +975,7 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result anyhow::Result anyhow::Result anyhow::Result anyhow::Result anyhow::Result anyhow::Result chromium_events.push(event), Err(_) => { // Continue processing instead of crashing @@ -1093,10 +1180,10 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result { if let Some(ref expect) = e.has_payload { // Only write payload file if no parser generated PayloadFile/PayloadReformatFile output and not a chromium event - if !payload.is_empty() && e.chromium_event.is_none() { + if !payload_buf.is_empty() && e.chromium_event.is_none() { let hash_str = expect; let payload_path = PathBuf::from(format!("payloads/{}.txt", hash_str)); - output.push((payload_path, payload.clone())); + output.push((payload_path, payload_buf.clone())); Some(format!("payloads/{}.txt", hash_str)) } else { None @@ -1111,7 +1198,10 @@ pub fn parse_path(path: &PathBuf, config: &ParseConfig) -> anyhow::Result anyhow::Result { pub guard_added_fast_index: &'t RefCell, pub create_symbol_index: &'t RefCell, pub unbacked_symbol_index: &'t RefCell, - pub output_files: &'t Vec, + pub output_files: &'t [OutputFile], pub compile_id_dir: &'t PathBuf, } impl StructuredLogParser for CompilationMetricsParser<'_> { @@ -451,10 +451,7 @@ impl StructuredLogParser for CompilationMetricsParser<'_> { .map_or("(unknown) ".to_string(), |c| format!("{cid} ", cid = c)); let mut cid = compile_id.clone(); if let Some(c) = cid.as_mut() { - if let Some(_frame_id) = c.frame_compile_id { - // data migration for old logs that don't have attempt - c.attempt = Some(0); - } + c.collapse_attempt(); } let stack_html = self .stack_index @@ -767,8 +764,9 @@ impl StructuredLogParser for DumpFileParser { } pub fn anchor_source(text: &str) -> String { - let lines: Vec<&str> = text.lines().collect(); - let mut html = String::from( + // Pre-allocate: HTML output is roughly 2x input size plus boilerplate + let mut html = String::with_capacity(text.len() * 2 + 500); + html.push_str( r#" @@ -799,7 +797,7 @@ pub fn anchor_source(text: &str) -> String {
"#,
     );
 
-    for (i, line) in lines.iter().enumerate() {
+    for (i, line) in text.lines().enumerate() {
         let line_number = i + 1;
         html.push_str(&format!(
             r#"{}"#,
diff --git a/src/types.rs b/src/types.rs
index dad361c..9cb6cef 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -107,9 +107,12 @@ pub struct RuntimeAnalysis {
     pub has_mismatched_graph_counts: bool,
 }
 
+static RE_EVAL_WITH_KEY: Lazy =
+    Lazy::new(|| Regex::new(r"\.([0-9]+)").unwrap());
+
 pub fn extract_eval_with_key_id(filename: &str) -> Option {
-    let re = Regex::new(r"\.([0-9]+)").unwrap();
-    re.captures(filename)
+    RE_EVAL_WITH_KEY
+        .captures(filename)
         .and_then(|caps| caps.get(1))
         .and_then(|m| m.as_str().parse::().ok())
 }
@@ -249,6 +252,23 @@ impl fmt::Display for CompileId {
 }
 
 impl CompileId {
+    /// Normalize attempt field: if frame_compile_id is set but attempt is None, default to 0.
+    /// This handles old logs that don't have the attempt field.
+    pub fn normalize_attempt(&mut self) {
+        if self.frame_compile_id.is_some() && self.attempt.is_none() {
+            self.attempt = Some(0);
+        }
+    }
+
+    /// Collapse attempt to 0 for index lookups.
+    /// Stack traces come from dynamo_start (always attempt 0), so all attempts
+    /// must map to the same key when looking up stacks, metrics, etc.
+    pub fn collapse_attempt(&mut self) {
+        if self.frame_compile_id.is_some() {
+            self.attempt = Some(0);
+        }
+    }
+
     pub fn as_directory_name(&self) -> String {
         let compiled_autograd_id_str = self
             .compiled_autograd_id
@@ -335,13 +355,14 @@ pub struct FrameSummary {
     pub uninterned_filename: Option,
 }
 
+static RE_SEED_NSPID: Lazy = Lazy::new(|| Regex::new(r"[^/]+-seed-nspid[^/]+/").unwrap());
+
 pub fn simplify_filename<'a>(filename: &'a str) -> &'a str {
     let parts: Vec<&'a str> = filename.split("#link-tree/").collect();
     if parts.len() > 1 {
         return parts[1];
     }
-    static RE: Lazy = Lazy::new(|| Regex::new(r"[^/]+-seed-nspid[^/]+/").unwrap());
-    if let Some(captures) = RE.captures(filename) {
+    if let Some(captures) = RE_SEED_NSPID.captures(filename) {
         if let Some(capture) = captures.get(0) {
             return &filename[capture.end()..];
         }
diff --git a/tests/integration_test.rs b/tests/integration_test.rs
index 03f58cb..a33dba9 100644
--- a/tests/integration_test.rs
+++ b/tests/integration_test.rs
@@ -6,7 +6,11 @@ use std::fs;
 use std::path::Path;
 use std::path::PathBuf;
 use tempfile::tempdir;
-use tlparse::{self, parsers, CollectivesParityReport};
+use tlparse::{
+    self, analyze_graph_runtime_deltas, generate_multi_rank_landing, parsers,
+    read_chromium_events_with_pid, CollectivesParityReport, GraphRuntime, MultiRankContext,
+    OpRuntime, ParseConfig,
+};
 
 fn prefix_exists(map: &HashMap, prefix: &str) -> bool {
     map.keys()
@@ -2731,3 +2735,613 @@ fn test_parse_vllm_sample() {
     assert!(index_html.contains("submod_0"),);
     assert!(index_html.contains("submod_2"),);
 }
+
+#[test]
+fn test_parse_gzip_input() {
+    // Compress simple.log into a temp .gz file and parse it
+    use flate2::write::GzEncoder;
+    use flate2::Compression;
+    use std::io::Write;
+
+    let original = fs::read("tests/inputs/simple.log").unwrap();
+
+    let temp_dir = tempdir().unwrap();
+    let gz_path = temp_dir.path().join("simple.log.gz");
+    let mut encoder = GzEncoder::new(fs::File::create(&gz_path).unwrap(), Compression::fast());
+    encoder.write_all(&original).unwrap();
+    encoder.finish().unwrap();
+
+    let config = tlparse::ParseConfig {
+        strict: true,
+        ..Default::default()
+    };
+    let output = tlparse::parse_path(&gz_path, &config);
+    assert!(output.is_ok(), "parse_path should succeed on .gz input");
+    let map: HashMap = output.unwrap().into_iter().collect();
+
+    // Same expected files as test_parse_simple
+    let expected_files = [
+        "-_0_0_0/aot_forward_graph",
+        "-_0_0_0/dynamo_output_graph",
+        "index.html",
+        "compile_directory.json",
+        "failures_and_restarts.html",
+        "-_0_0_0/inductor_post_grad_graph",
+        "-_0_0_0/inductor_output_code",
+    ];
+    for prefix in expected_files {
+        assert!(
+            prefix_exists(&map, prefix),
+            "{} not found in gzip output",
+            prefix
+        );
+    }
+}
+
+#[test]
+fn test_gzip_cli_raw_log_copy() -> Result<(), Box> {
+    use flate2::write::GzEncoder;
+    use flate2::Compression;
+    use std::io::Write;
+
+    let original = fs::read("tests/inputs/simple.log").unwrap();
+
+    let temp_dir = tempdir().unwrap();
+    let gz_path = temp_dir.path().join("simple.log.gz");
+    let mut encoder = GzEncoder::new(fs::File::create(&gz_path).unwrap(), Compression::fast());
+    encoder.write_all(&original).unwrap();
+    encoder.finish().unwrap();
+
+    let out_dir = temp_dir.path().join("out");
+
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg(&gz_path)
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    // Both raw.log.gz and raw.log (decompressed) should exist
+    assert!(
+        out_dir.join("raw.log.gz").exists(),
+        "raw.log.gz should exist for gzip input"
+    );
+    assert!(
+        out_dir.join("raw.log").exists(),
+        "raw.log should also exist for gzip input (decompressed for BC)"
+    );
+    Ok(())
+}
+
+#[test]
+fn test_all_ranks_gzip_input() -> Result<(), Box> {
+    use flate2::write::GzEncoder;
+    use flate2::Compression;
+    use std::io::Write;
+
+    let temp_dir = tempdir().unwrap();
+    let input_dir = temp_dir.path().join("gz_ranks");
+    fs::create_dir_all(&input_dir)?;
+
+    // Compress the multi-rank log files into .log.gz
+    for rank in 0..2 {
+        let src = PathBuf::from(format!(
+            "tests/inputs/multi_rank_logs/dedicated_log_torch_trace_rank_{rank}.log"
+        ));
+        let original = fs::read(&src)?;
+        let gz_path = input_dir.join(format!("dedicated_log_torch_trace_rank_{rank}.log.gz"));
+        let mut encoder = GzEncoder::new(fs::File::create(&gz_path)?, Compression::fast());
+        encoder.write_all(&original)?;
+        encoder.finish()?;
+    }
+
+    let out_dir = temp_dir.path().join("out");
+
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg(&input_dir)
+        .arg("--all-ranks-html")
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    assert!(out_dir.join("rank_0/index.html").exists());
+    assert!(out_dir.join("rank_1/index.html").exists());
+    assert!(out_dir.join("index.html").exists());
+
+    // Each rank should have raw.log.gz
+    assert!(out_dir.join("rank_0/raw.log.gz").exists());
+    assert!(out_dir.join("rank_1/raw.log.gz").exists());
+
+    let landing = fs::read_to_string(out_dir.join("index.html"))?;
+    assert!(landing.contains(r#""#));
+    assert!(landing.contains(r#""#));
+    Ok(())
+}
+
+// ============================================================================
+// Library API tests for features previously only tested via CLI
+// ============================================================================
+
+/// Verify that parse_path includes raw.log in ParseOutput for library callers
+#[test]
+fn test_parse_output_contains_raw_log() {
+    let path = Path::new("tests/inputs/simple.log").to_path_buf();
+    let config = ParseConfig {
+        strict: true,
+        ..Default::default()
+    };
+    let output = tlparse::parse_path(&path, &config).unwrap();
+    let map: HashMap = output.into_iter().collect();
+    assert!(
+        map.contains_key(&PathBuf::from("raw.log")),
+        "raw.log should be present in ParseOutput for library callers"
+    );
+    // Verify the content matches the original file
+    let original = fs::read_to_string(&path).unwrap();
+    assert_eq!(
+        map[&PathBuf::from("raw.log")],
+        original,
+        "raw.log content should match the original input file"
+    );
+}
+
+/// Verify that parse_path with gzip input includes raw.log in ParseOutput
+#[test]
+fn test_parse_gzip_output_contains_raw_log() {
+    use flate2::write::GzEncoder;
+    use flate2::Compression;
+    use std::io::Write;
+
+    let original = fs::read_to_string("tests/inputs/simple.log").unwrap();
+    let temp_dir = tempdir().unwrap();
+    let gz_path = temp_dir.path().join("simple.log.gz");
+    let mut encoder = GzEncoder::new(fs::File::create(&gz_path).unwrap(), Compression::fast());
+    encoder.write_all(original.as_bytes()).unwrap();
+    encoder.finish().unwrap();
+
+    let config = ParseConfig {
+        strict: true,
+        ..Default::default()
+    };
+    let output = tlparse::parse_path(&gz_path, &config).unwrap();
+    let map: HashMap = output.into_iter().collect();
+    assert!(
+        map.contains_key(&PathBuf::from("raw.log")),
+        "raw.log should be present in ParseOutput for gzip library callers"
+    );
+}
+
+/// Test analyze_graph_runtime_deltas directly as a library function
+#[test]
+fn test_analyze_graph_runtime_deltas_library() {
+    // Two ranks, same graph, different runtimes
+    let runtimes = vec![
+        GraphRuntime {
+            rank: 0,
+            graph: "-_0_0_0".to_string(),
+            ops: vec![
+                OpRuntime {
+                    name: "op_a".to_string(),
+                    estimated_runtime_ns: 1000.0,
+                },
+                OpRuntime {
+                    name: "op_b".to_string(),
+                    estimated_runtime_ns: 2000.0,
+                },
+            ],
+        },
+        GraphRuntime {
+            rank: 1,
+            graph: "-_0_0_0".to_string(),
+            ops: vec![
+                OpRuntime {
+                    name: "op_a".to_string(),
+                    estimated_runtime_ns: 1500.0,
+                },
+                OpRuntime {
+                    name: "op_b".to_string(),
+                    estimated_runtime_ns: 2500.0,
+                },
+            ],
+        },
+    ];
+
+    let analysis = analyze_graph_runtime_deltas(&runtimes);
+    assert!(analysis.is_some());
+    let analysis = analysis.unwrap();
+    assert!(!analysis.has_mismatched_graph_counts);
+    assert_eq!(analysis.graphs.len(), 1);
+    assert_eq!(analysis.graphs[0].graph_id, "-_0_0_0");
+    // delta_ms should be the max-min total runtime difference across ranks
+    // rank 0 total: 3000 ns = 0.003 ms, rank 1 total: 4000 ns = 0.004 ms
+    assert!(analysis.graphs[0].delta_ms > 0.0);
+}
+
+/// Test analyze_graph_runtime_deltas with mismatched graph counts
+#[test]
+fn test_analyze_graph_runtime_deltas_mismatched() {
+    let runtimes = vec![
+        GraphRuntime {
+            rank: 0,
+            graph: "-_0_0_0".to_string(),
+            ops: vec![OpRuntime {
+                name: "op_a".to_string(),
+                estimated_runtime_ns: 1000.0,
+            }],
+        },
+        GraphRuntime {
+            rank: 0,
+            graph: "-_0_0_1".to_string(),
+            ops: vec![OpRuntime {
+                name: "op_b".to_string(),
+                estimated_runtime_ns: 2000.0,
+            }],
+        },
+        GraphRuntime {
+            rank: 1,
+            graph: "-_0_0_0".to_string(),
+            ops: vec![OpRuntime {
+                name: "op_a".to_string(),
+                estimated_runtime_ns: 1500.0,
+            }],
+        },
+        // rank 1 is missing graph -_0_0_1
+    ];
+
+    let analysis = analyze_graph_runtime_deltas(&runtimes);
+    assert!(analysis.is_some());
+    let analysis = analysis.unwrap();
+    assert!(analysis.has_mismatched_graph_counts);
+}
+
+/// Test read_chromium_events_with_pid directly as a library function
+#[test]
+fn test_read_chromium_events_with_pid_library() {
+    // First, generate output that includes chromium_events.json
+    let path = Path::new("tests/inputs/simple.log").to_path_buf();
+    let config = ParseConfig {
+        strict: true,
+        ..Default::default()
+    };
+    let output = tlparse::parse_path(&path, &config).unwrap();
+    let map: HashMap = output.into_iter().collect();
+
+    // Write the chromium_events.json to a temp dir
+    let temp_dir = tempdir().unwrap();
+    if let Some(events_content) = map.get(&PathBuf::from("chromium_events.json")) {
+        let events_path = temp_dir.path().join("chromium_events.json");
+        fs::write(&events_path, events_content).unwrap();
+
+        let events = read_chromium_events_with_pid(&events_path, 42).unwrap();
+        // All events should have pid set to 42
+        for event in &events {
+            assert_eq!(
+                event.get("pid").and_then(|v| v.as_u64()),
+                Some(42),
+                "All events should have pid set to the provided rank_num"
+            );
+        }
+    }
+}
+
+/// Test generate_multi_rank_landing directly as a library function
+#[test]
+fn test_generate_multi_rank_landing_library() {
+    // Set up per-rank output directories with parsed results
+    let temp_dir = tempdir().unwrap();
+    let out_dir = temp_dir.path();
+
+    let path = Path::new("tests/inputs/simple.log").to_path_buf();
+    let config = ParseConfig::default();
+
+    // Parse for two "ranks"
+    for rank in 0..2 {
+        let rank_dir = out_dir.join(format!("rank_{}", rank));
+        fs::create_dir_all(&rank_dir).unwrap();
+        let output = tlparse::parse_path(&path, &config).unwrap();
+        for (filename, content) in output {
+            let file_path = rank_dir.join(&filename);
+            if let Some(dir) = file_path.parent() {
+                fs::create_dir_all(dir).unwrap();
+            }
+            fs::write(file_path, content).unwrap();
+        }
+    }
+
+    let ctx = MultiRankContext {
+        css: "",
+        custom_header_html: "",
+        num_ranks: 2,
+        ranks: vec!["0".to_string(), "1".to_string()],
+        qps: "",
+        has_chromium_events: false,
+        show_desync_warning: false,
+        compile_id_divergence: false,
+        diagnostics: Default::default(),
+    };
+
+    let landing_path = generate_multi_rank_landing(&config, &ctx, out_dir).unwrap();
+    assert!(landing_path.exists(), "Landing page should be generated");
+
+    let content = fs::read_to_string(&landing_path).unwrap();
+    assert!(content.contains(r#""#));
+    assert!(content.contains(r#""#));
+}
+
+// ============================================================================
+// CLI tests for features previously only tested via library API
+// ============================================================================
+
+/// Basic CLI smoke test for single-file parsing
+#[test]
+fn test_cli_single_file_basic() -> Result<(), Box> {
+    let temp_dir = tempdir()?;
+    let out_dir = temp_dir.path().join("out");
+
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg("tests/inputs/simple.log")
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    assert!(out_dir.join("index.html").exists());
+    assert!(out_dir.join("raw.log").exists());
+    assert!(out_dir.join("raw.log.gz").exists());
+    assert!(out_dir.join("raw.jsonl").exists());
+
+    Ok(())
+}
+
+/// Test --strict flag via CLI causes failure on bad logs
+#[test]
+fn test_cli_strict_flag() -> Result<(), Box> {
+    let temp_dir = tempdir()?;
+    let out_dir = temp_dir.path().join("out");
+
+    // simple.log should pass with --strict
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg("tests/inputs/simple.log")
+        .arg("--strict")
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    Ok(())
+}
+
+/// Test --export flag via CLI
+#[test]
+fn test_cli_export_flag() -> Result<(), Box> {
+    let temp_dir = tempdir()?;
+    let out_dir = temp_dir.path().join("out");
+
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg("tests/inputs/export.log")
+        .arg("--export")
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    assert!(out_dir.join("index.html").exists());
+    // Verify export-specific output exists on disk
+    let index_content = fs::read_to_string(out_dir.join("index.html"))?;
+    assert!(
+        index_content.contains("exported_program")
+            || index_content.contains("symbolic_guard_information"),
+        "Export mode should produce export-specific artifacts"
+    );
+
+    Ok(())
+}
+
+/// Test --inductor-provenance flag via CLI
+#[test]
+fn test_cli_inductor_provenance_flag() -> Result<(), Box> {
+    let temp_dir = tempdir()?;
+    let out_dir = temp_dir.path().join("out");
+
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg("tests/inputs/inductor_provenance_aot_cuda_log.txt")
+        .arg("--inductor-provenance")
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    // Check that provenance tracking HTML was generated on disk
+    let provenance_files: Vec<_> = fs::read_dir(&out_dir)?
+        .filter_map(|e| e.ok())
+        .filter(|e| {
+            e.file_name()
+                .to_str()
+                .map_or(false, |n| n.contains("provenance_tracking"))
+        })
+        .collect();
+    assert!(
+        !provenance_files.is_empty(),
+        "CLI --inductor-provenance should generate provenance tracking files"
+    );
+
+    Ok(())
+}
+
+/// Test --plain-text flag via CLI
+#[test]
+fn test_cli_plain_text_flag() -> Result<(), Box> {
+    let temp_dir = tempdir()?;
+    let out_dir = temp_dir.path().join("out");
+
+    let mut cmd = Command::cargo_bin("tlparse")?;
+    cmd.arg("tests/inputs/simple.log")
+        .arg("--plain-text")
+        .arg("--overwrite")
+        .arg("-o")
+        .arg(&out_dir)
+        .arg("--no-browser");
+    cmd.assert().success();
+
+    assert!(out_dir.join("index.html").exists());
+    Ok(())
+}
+
+/// Test --custom-header-html flag via CLI
+#[test]
+fn test_cli_custom_header_html() -> Result<(), Box> {
+    let temp_dir = tempdir()?;
+    let out_dir = temp_dir.path().join("out");
+
+    let custom_html = "
Test Banner
"; + let mut cmd = Command::cargo_bin("tlparse")?; + cmd.arg("tests/inputs/simple.log") + .arg("--custom-header-html") + .arg(custom_html) + .arg("--overwrite") + .arg("-o") + .arg(&out_dir) + .arg("--no-browser"); + cmd.assert().success(); + + let index_content = fs::read_to_string(out_dir.join("index.html"))?; + assert!( + index_content.contains(custom_html), + "Custom header HTML should appear in the generated index.html" + ); + Ok(()) +} + +/// Test library plain_text config option +#[test] +fn test_library_plain_text_config() { + let path = Path::new("tests/inputs/simple.log").to_path_buf(); + let config = ParseConfig { + plain_text: true, + ..Default::default() + }; + let output = tlparse::parse_path(&path, &config); + assert!(output.is_ok()); + let map: HashMap = output.unwrap().into_iter().collect(); + assert!(map.contains_key(&PathBuf::from("index.html"))); +} + +/// Test library custom_header_html config option +#[test] +fn test_library_custom_header_html() { + let path = Path::new("tests/inputs/simple.log").to_path_buf(); + let custom_html = "
My Custom Header
".to_string(); + let config = ParseConfig { + custom_header_html: custom_html.clone(), + ..Default::default() + }; + let output = tlparse::parse_path(&path, &config).unwrap(); + let map: HashMap = output.into_iter().collect(); + let index = &map[&PathBuf::from("index.html")]; + assert!( + index.contains(&custom_html), + "custom_header_html should appear in the library-generated index.html" + ); +} + +/// Test that CLI produces the same key outputs as library for the same input +#[test] +fn test_cli_and_library_output_parity() -> Result<(), Box> { + // Library + let path = Path::new("tests/inputs/simple.log").to_path_buf(); + let config = ParseConfig { + strict: true, + ..Default::default() + }; + let lib_output = tlparse::parse_path(&path, &config).unwrap(); + let lib_files: std::collections::HashSet = lib_output + .iter() + .map(|(p, _)| p.to_str().unwrap().to_string()) + .collect(); + + // CLI + let temp_dir = tempdir()?; + let out_dir = temp_dir.path().join("out"); + let mut cmd = Command::cargo_bin("tlparse")?; + cmd.arg("tests/inputs/simple.log") + .arg("--strict") + .arg("--overwrite") + .arg("-o") + .arg(&out_dir) + .arg("--no-browser"); + cmd.assert().success(); + + // All library output files should exist on disk after CLI run + for lib_file in &lib_files { + let on_disk = out_dir.join(lib_file); + assert!( + on_disk.exists(), + "Library output file '{}' should exist on disk after CLI run", + lib_file + ); + } + + // CLI should also produce raw.log and raw.log.gz (which are handled outside parse_path) + assert!( + out_dir.join("raw.log").exists(), + "CLI should produce raw.log on disk" + ); + assert!( + out_dir.join("raw.log.gz").exists(), + "CLI should produce raw.log.gz on disk" + ); + + Ok(()) +} + +/// Test that the CLI --overwrite flag works to replace an existing output directory +#[test] +fn test_cli_overwrite_flag() -> Result<(), Box> { + let temp_dir = tempdir()?; + let out_dir = temp_dir.path().join("out"); + fs::create_dir_all(&out_dir)?; + // Create a sentinel file that should be removed by --overwrite + fs::write(out_dir.join("sentinel.txt"), "should be removed")?; + + let mut cmd = Command::cargo_bin("tlparse")?; + cmd.arg("tests/inputs/simple.log") + .arg("--overwrite") + .arg("-o") + .arg(&out_dir) + .arg("--no-browser"); + cmd.assert().success(); + + assert!( + !out_dir.join("sentinel.txt").exists(), + "sentinel file should have been removed by --overwrite" + ); + assert!(out_dir.join("index.html").exists()); + + Ok(()) +} + +/// Test that the CLI fails without --overwrite when output dir already exists +#[test] +fn test_cli_no_overwrite_fails() -> Result<(), Box> { + let temp_dir = tempdir()?; + let out_dir = temp_dir.path().join("out"); + fs::create_dir_all(&out_dir)?; + + let mut cmd = Command::cargo_bin("tlparse")?; + cmd.arg("tests/inputs/simple.log") + .arg("-o") + .arg(&out_dir) + .arg("--no-browser"); + cmd.assert() + .failure() + .stderr(str::contains("already exists")); + + Ok(()) +}