Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tlparse"
version = "0.4.8"
version = "0.4.9"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the raw.log BC breaking change we should probably bump the minor rev, not just the patch. ("0.5.0")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate what the breaking change you are observing? I tried to make sure these changes do not make need a minor version bump. I do have a lot more improvements planned (streamed parallel processing, use of rayon, refactoring the code to make it more modular, etc) that would have broken some API contracts so avoided as part of this change. Would like to take them on, but they seem more like a major version change to me. I strongly believe we should be able to parse large files even faster.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was the ParseOutput change - but I see you pointed out that it's not really part of the public contract (even though it's in a public function). If this was not a 0.x.x or if there were more uses of this as a library I'd push back but fair enough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like after the current changes, raw.log will stop being produced. Ideally, we still want raw.log to be produced for BC reasons (among other data collection reasons).

Can we still produce raw.log?

if that will make the perf go down significantly, I'd propose to bump the major version number as @aorenste suggested. We can fix this internally by directly copy the input file as the output raw.log like in D99742022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The raw.log file will be generated if the tlparse was created using the uncompressed raw.log file. It will not be generated if the users used the compressed file to begin with. This should be the expectation since currently tlparse does not support compressed file parsing. So if a user is using it, they already are well integrated where in a later version when we remove the uncompressed file support, their integration won't break

https://github.com/meta-pytorch/tlparse/pull/179/changes#diff-b2812f19576dd53d0c35b107a322f58a00fce6977f4b5976e1961853982af3ccR173

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the raw.log is moved to the cli layer? but the users could be calling tlparse from parse_path instead of the cli. In this case, raw.log is not produced anymore.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yushangdi for catching that. Fixed it and now we generate both files. I was testing primarily from the cli so missed this. Added tests to avoid it in future as well.

edition = "2021"
authors = ["Edward Z. Yang <ezyang@mit.edu>"]
description = "Parse TORCH_LOG logs produced by PyTorch torch.compile"
Expand Down Expand Up @@ -33,8 +33,14 @@ serde = { version = "1.0.185", features = ["serde_derive"] }
serde_json = "1.0.100"
tinytemplate = "1.1.0"
tiny_http = "0.12"
flate2 = "1.0"

[dev-dependencies]
assert_cmd = "2.0"
libc = "0.2"
predicates = "3.1.0"
tempfile = "3.10.1"

[[bench]]
name = "parse_benchmark"
harness = false
180 changes: 180 additions & 0 deletions benches/parse_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//! Benchmark for tlparse: measures wall time and peak memory (RSS).
//!
//! Usage:
//! TLPARSE_BENCH_INPUT=/path/to/file cargo bench --bench parse_benchmark
Comment thread
anubhavchaturvedi marked this conversation as resolved.
//! cargo bench --bench parse_benchmark -- /path/to/file # custom input via CLI arg

use std::io::BufRead;
use std::path::PathBuf;
use std::time::Instant;
use tempfile::tempdir;

const WARMUP_ITERS: u32 = 2;
const BENCH_ITERS: u32 = 5;

fn get_peak_rss_bytes() -> Option<u64> {
#[cfg(target_os = "macos")]
{
use std::mem::MaybeUninit;
unsafe {
let mut usage = MaybeUninit::<libc::rusage>::zeroed();
if libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr()) == 0 {
// macOS reports ru_maxrss in bytes
Some(usage.assume_init().ru_maxrss as u64)
} else {
None
}
}
}
#[cfg(target_os = "linux")]
{
use std::mem::MaybeUninit;
unsafe {
let mut usage = MaybeUninit::<libc::rusage>::zeroed();
if libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr()) == 0 {
// Linux reports ru_maxrss in kilobytes
Some(usage.assume_init().ru_maxrss as u64 * 1024)
} else {
None
}
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
None
}
}

fn format_bytes(bytes: u64) -> String {
Comment thread
anubhavchaturvedi marked this conversation as resolved.
if bytes >= 1024 * 1024 * 1024 {
format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
} else if bytes >= 1024 * 1024 {
format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
} else if bytes >= 1024 {
format!("{:.2} KB", bytes as f64 / 1024.0)
} else {
format!("{} B", bytes)
}
}

fn run_parse(input: &PathBuf) -> std::time::Duration {
Comment thread
anubhavchaturvedi marked this conversation as resolved.
let config = tlparse::ParseConfig::default();
let out_dir = tempdir().expect("failed to create temp dir");
let start = Instant::now();
let output = tlparse::parse_path(input, &config).expect("parse_path failed");
let elapsed = start.elapsed();

// Write output to exercise the full pipeline
for (path, content) in &output {
let full_path = out_dir.path().join(path);
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent).expect("failed to create output subdirectory");
}
std::fs::write(&full_path, content).expect("failed to write output file");
}
elapsed
}

fn main() {
// Determine input path: CLI arg > env var (no default — must be explicit)
let args: Vec<String> = std::env::args().collect();
let input_path = if args.len() > 1 && !args[1].starts_with('-') {
PathBuf::from(&args[1])
} else if let Ok(env_path) = std::env::var("TLPARSE_BENCH_INPUT") {
PathBuf::from(env_path)
} else {
eprintln!("Error: no input file specified.");
eprintln!();
eprintln!("Provide a TORCH_LOG file via one of:");
eprintln!(" TLPARSE_BENCH_INPUT=/path/to/file cargo bench --bench parse_benchmark");
eprintln!(" cargo bench --bench parse_benchmark -- /path/to/file");
std::process::exit(1);
};

if !input_path.exists() {
eprintln!("Error: input file not found: {}", input_path.display());
std::process::exit(1);
}

let file_size = std::fs::metadata(&input_path).map(|m| m.len()).unwrap_or(0);
let line_count = std::io::BufReader::new(
std::fs::File::open(&input_path).expect("failed to open input file for line counting"),
)
.lines()
.count();

println!("=== tlparse benchmark ===");
println!(
"Input: {} ({}, {} lines)",
input_path.display(),
format_bytes(file_size),
line_count
);
println!();

// Cold-run RSS: measure peak RSS after a single parse before any warmup.
// This captures the first-run memory footprint before caches are populated.
let rss_cold_before = get_peak_rss_bytes();
run_parse(&input_path);
let rss_cold_after = get_peak_rss_bytes();

// Warmup
print!("Warming up ({WARMUP_ITERS} iterations)...");
for _ in 0..WARMUP_ITERS {
run_parse(&input_path);
}
println!(" done");

// NOTE: ru_maxrss reports the *lifetime* peak RSS of the process, so the
// value after warmup already includes the high-water mark from earlier
// iterations. The "RSS delta (during bench)" below therefore only captures
// *new* peaks that exceed the warmup maximum — it will be zero if the
// warmup already reached the true peak. The cold-run measurement above
// provides a more meaningful single-iteration memory figure.
let rss_before = get_peak_rss_bytes();

// Benchmark
println!("Running {BENCH_ITERS} iterations...");
let mut durations = Vec::with_capacity(BENCH_ITERS as usize);
for i in 0..BENCH_ITERS {
let elapsed = run_parse(&input_path);
println!(" iter {}: {:.3}ms", i + 1, elapsed.as_secs_f64() * 1000.0);
durations.push(elapsed);
}

let rss_after = get_peak_rss_bytes();

// Stats
durations.sort();
let total: std::time::Duration = durations.iter().sum();
let mean = total / BENCH_ITERS;
let median = durations[durations.len() / 2];
let min = durations[0];
let max = durations[durations.len() - 1];

println!();
println!("--- Results ---");
println!(" mean: {:.3}ms", mean.as_secs_f64() * 1000.0);
println!(" median: {:.3}ms", median.as_secs_f64() * 1000.0);
println!(" min: {:.3}ms", min.as_secs_f64() * 1000.0);
println!(" max: {:.3}ms", max.as_secs_f64() * 1000.0);

// Cold-run RSS (single iteration, no prior warmup)
if let (Some(before), Some(after)) = (rss_cold_before, rss_cold_after) {
println!(" cold-run peak RSS: {}", format_bytes(after));
if after > before {
println!(" cold-run RSS delta: {}", format_bytes(after - before));
}
}

if let Some(rss) = rss_after {
println!(" lifetime peak RSS: {}", format_bytes(rss));
if let Some(before) = rss_before {
if rss > before {
println!(" RSS delta (during bench): {}", format_bytes(rss - before));
}
}
} else {
println!(" peak RSS: unavailable on this platform");
}
}
27 changes: 23 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use clap::Parser;

use anyhow::{bail, Context};
use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs;
use std::io::Read;
use std::io::{self, Read};
use std::path::PathBuf;

use tlparse::{
Expand Down Expand Up @@ -162,6 +164,21 @@ fn parse_and_write_output(
}
fs::write(out_path, content)?;
}

// Copy the raw log file directly instead of reading it into memory.
// This avoids holding the entire input file as a String in ParseOutput.
if log_path.extension().map_or(false, |ext| ext == "gz") {
fs::copy(log_path, output_dir.join("raw.log.gz"))?;
} else {
fs::copy(log_path, output_dir.join("raw.log"))?;
// Also store a gzip-compressed copy alongside raw.log
let mut in_file = fs::File::open(log_path)?;
let gz_file = fs::File::create(output_dir.join("raw.log.gz"))?;
let mut encoder = GzEncoder::new(gz_file, Compression::default());
io::copy(&mut in_file, &mut encoder)?;
encoder.finish()?;
}

Ok(output_dir.join("index.html"))
}

Expand Down Expand Up @@ -226,9 +243,11 @@ fn handle_all_ranks(
return None;
}
let filename = path.file_name()?.to_str()?;
filename
.strip_prefix("dedicated_log_torch_trace_rank_")?
.strip_suffix(".log")?
let after_prefix = filename.strip_prefix("dedicated_log_torch_trace_rank_")?;
let after_suffix = after_prefix
.strip_suffix(".log.gz")
.or_else(|| after_prefix.strip_suffix(".log"))?;
after_suffix
.split('_')
.next()?
.parse::<u32>()
Expand Down
Loading
Loading