From 97ba0ac4b3e4ea5ad56ee4fc86c8b5b38e1cc608 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 14 Apr 2026 11:44:01 -0700 Subject: [PATCH 1/3] Add LinuxPerf metrics for logging --- Project.toml | 3 ++ ext/LinuxPerfExt.jl | 109 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 ext/LinuxPerfExt.jl diff --git a/Project.toml b/Project.toml index ce49bf6d7..16ee99b8c 100644 --- a/Project.toml +++ b/Project.toml @@ -40,6 +40,7 @@ DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +LinuxPerf = "b4c46c6c-4fb0-484d-a11a-41bc3392d094" Metal = "dde4c033-4e86-420c-a63e-0dd931031962" OpenCL = "08131aa3-fb12-5dee-8b74-c09406e224a2" Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80" @@ -54,6 +55,7 @@ GraphVizExt = "GraphViz" GraphVizSimpleExt = "Colors" IntelExt = "oneAPI" JSON3Ext = "JSON3" +LinuxPerfExt = "LinuxPerf" MetalExt = "Metal" OpenCLExt = "OpenCL" PlotsExt = ["DataFrames", "Plots"] @@ -76,6 +78,7 @@ GraphViz = "0.2" Graphs = "1" JSON3 = "1" KernelAbstractions = "0.9" +LinuxPerf = "0.4.2" MacroTools = "0.5" MemPool = "0.4.12" Metal = "1.1" diff --git a/ext/LinuxPerfExt.jl b/ext/LinuxPerfExt.jl new file mode 100644 index 000000000..91db8837d --- /dev/null +++ b/ext/LinuxPerfExt.jl @@ -0,0 +1,109 @@ +module LinuxPerfExt + +import Dagger +import LinuxPerf + +import TimespanLogging +import TimespanLogging: Event, init_similar + +const DEFAULT_METRIC_SPEC = + "cpu-clock, page-faults, context-switches, cpu-migrations, minor-faults, major-faults" + +mutable struct LinuxPerfMetrics + # Perf-style grouping string, e.g. "(cpu-cycles,instructions), page-faults" + metric_spec::String + # Lazily constructed bench; metric names are read from bench.groups[*].event_types + bench::Union{Nothing, LinuxPerf.PerfBench} + # One atomic refcount per EventGroup: enable on 0->1, disable on 1->0 + refcounts::Vector{Threads.Atomic{Int}} + # Baseline counter snapshots per active event, keyed by (category, id) + active_baselines::Dict{Any, Vector{UInt64}} +end + +LinuxPerfMetrics(spec::String) = + LinuxPerfMetrics(spec, nothing, Threads.Atomic{Int}[], Dict{Any,Vector{UInt64}}()) +LinuxPerfMetrics() = LinuxPerfMetrics(DEFAULT_METRIC_SPEC) +init_similar(m::LinuxPerfMetrics) = LinuxPerfMetrics(m.metric_spec) + +# Build the PerfBench from the metric spec string on first use. +function _ensure_bench!(metrics::LinuxPerfMetrics) + if metrics.bench === nothing + groups_spec = LinuxPerf.parse_groups(metrics.metric_spec) + event_groups = LinuxPerf.EventGroup[LinuxPerf.EventGroup(g) for g in groups_spec] + metrics.bench = LinuxPerf.PerfBench(0, event_groups) + metrics.refcounts = [Threads.Atomic{Int}(0) for _ in event_groups] + end + return metrics.bench +end + +# Read raw counter values from every group, ordered group-by-group. +# Read format per group: [nr, time_enabled, time_running, value_1, ..., value_nr] +function _read_raw_counters(bench::LinuxPerf.PerfBench) + vals = UInt64[] + for g in bench.groups + g.leader_fd == -1 && continue + buf = Vector{UInt64}(undef, length(g) + 3) + read!(g.leader_io, buf) + for i in 1:length(g) + push!(vals, buf[3 + i]) + end + end + return vals +end + +# Subtract baseline from current snapshot; build name->delta dict using +# metric names sourced from the bench's EventGroup event_types. +function _delta_to_dict(bench::LinuxPerf.PerfBench, baseline::Vector{UInt64}) + current = _read_raw_counters(bench) + result = Dict{String, Int64}() + idx = 1 + for g in bench.groups + g.leader_fd == -1 && continue + for et in g.event_types + name = get(LinuxPerf.EVENT_TO_NAME, et, string(et)) + result[name] = Int64(current[idx] - baseline[idx]) + idx += 1 + end + end + return result +end + +function (metrics::LinuxPerfMetrics)(ev::Event{:start}) + bench = _ensure_bench!(metrics) + isempty(bench.groups) && return nothing + + # Enable each group on the 0->1 transition only + for (group, rc) in zip(bench.groups, metrics.refcounts) + old = Threads.atomic_add!(rc, 1) + if old == 0 + LinuxPerf.reset!(group) + LinuxPerf.enable!(group) + end + end + + metrics.active_baselines[(ev.category, ev.id)] = _read_raw_counters(bench) + nothing +end + +function (metrics::LinuxPerfMetrics)(ev::Event{:finish}) + bench = metrics.bench + bench === nothing && return nothing + + baseline = pop!(metrics.active_baselines, (ev.category, ev.id), nothing) + baseline === nothing && return nothing + + # Read delta before potentially disabling any group + result = _delta_to_dict(bench, baseline) + + # Disable each group on the 1->0 transition only + for (group, rc) in zip(bench.groups, metrics.refcounts) + old = Threads.atomic_sub!(rc, 1) + if old == 1 + LinuxPerf.disable!(group) + end + end + + return result +end + +end # module LinuxPerfExt From aec427248462eea0299bb8d29a5c4350ff84308d Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 14 Apr 2026 11:44:54 -0700 Subject: [PATCH 2/3] logging: Add GC, compile time, LinuxPerf metrics to enable_logging --- src/utils/logging-events.jl | 90 +++++++++++++++++++++++++++++++++++++ src/utils/logging.jl | 36 ++++++++++++++- 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/src/utils/logging-events.jl b/src/utils/logging-events.jl index 344a42da5..913a6e1dd 100644 --- a/src/utils/logging-events.jl +++ b/src/utils/logging-events.jl @@ -276,4 +276,94 @@ function (::TaskToChunk)(ev::Event{:finish}) return end +""" + GCStats + +Tracks GC allocations (`Base.GC_Diff.allocd`) between the start and finish of +each event, using the `gc_num` field already captured in each `Event`. +""" +mutable struct GCStats + active_starts::Dict{Any, Base.GC_Num} +end +GCStats() = GCStats(Dict{Any, Base.GC_Num}()) +init_similar(::GCStats) = GCStats() + +function (gs::GCStats)(ev::Event{:start}) + gs.active_starts[(ev.category, ev.id)] = ev.gc_num + nothing +end +function (gs::GCStats)(ev::Event{:finish}) + start_gc = pop!(gs.active_starts, (ev.category, ev.id), nothing) + start_gc === nothing && return nothing + return Base.GC_Diff(ev.gc_num, start_gc).allocd +end + +""" + LockContentionMetrics + +Tracks the number of lock contention events between the start and finish of +each event. Uses a global atomic refcount to enable/disable +`Base.Threads.lock_profiling` only while at least one event is in-flight. +""" +const _lock_contention_refcount = Threads.Atomic{Int}(0) + +mutable struct LockContentionMetrics + active_starts::Dict{Any, Int64} +end +LockContentionMetrics() = LockContentionMetrics(Dict{Any, Int64}()) +init_similar(::LockContentionMetrics) = LockContentionMetrics() + +function (lc::LockContentionMetrics)(ev::Event{:start}) + old = Threads.atomic_add!(_lock_contention_refcount, 1) + if old == 0 + Base.Threads.lock_profiling(true) + end + lc.active_starts[(ev.category, ev.id)] = Int64(Base.Threads.LOCK_CONFLICT_COUNT[]) + nothing +end +function (lc::LockContentionMetrics)(ev::Event{:finish}) + baseline = pop!(lc.active_starts, (ev.category, ev.id), nothing) + baseline === nothing && return nothing + current = Int64(Base.Threads.LOCK_CONFLICT_COUNT[]) + old = Threads.atomic_sub!(_lock_contention_refcount, 1) + if old == 1 + Base.Threads.lock_profiling(false) + end + return current - baseline +end + +""" + CompileTimeMetrics + +Tracks cumulative Julia compile time (via `Base.cumulative_compile_time_ns`) +between the start and finish of each event. Uses a global atomic refcount to +enable/disable timing only while at least one event is in-flight. +""" +const _compile_timing_refcount = Threads.Atomic{Int}(0) + +mutable struct CompileTimeMetrics + active_starts::Dict{Any, Int64} +end +CompileTimeMetrics() = CompileTimeMetrics(Dict{Any, Int64}()) +init_similar(::CompileTimeMetrics) = CompileTimeMetrics() + +function (ct::CompileTimeMetrics)(ev::Event{:start}) + old = Threads.atomic_add!(_compile_timing_refcount, 1) + if old == 0 + Base.cumulative_compile_timing(true) + end + ct.active_starts[(ev.category, ev.id)] = Int64(Base.cumulative_compile_time_ns()[1]) + nothing +end +function (ct::CompileTimeMetrics)(ev::Event{:finish}) + baseline = pop!(ct.active_starts, (ev.category, ev.id), nothing) + baseline === nothing && return nothing + current = Int64(Base.cumulative_compile_time_ns()[1]) + old = Threads.atomic_sub!(_compile_timing_refcount, 1) + if old == 1 + Base.cumulative_compile_timing(false) + end + return current - baseline +end + end # module Events diff --git a/src/utils/logging.jl b/src/utils/logging.jl index 1b1f09abd..c1673d2db 100644 --- a/src/utils/logging.jl +++ b/src/utils/logging.jl @@ -18,6 +18,10 @@ Extra events: - `taskuidtotid::Bool`: Enables reporting of task UID-to-TID mappings - `tasktochunk::Bool`: Enables reporting of DTask-to-Chunk mappings - `profile::Bool`: Enables profiling of task execution; not currently recommended, as it adds significant overhead +- `linuxperf::Bool`: Enables Linux perf event collection (requires LinuxPerf.jl to be loaded) +- `gc_stats::Bool`: Enables GC allocation tracking per event +- `lock_contend::Bool`: Enables lock contention counting per event +- `compile_time::Bool`: Enables Julia compile-time tracking per event """ function enable_logging!(;metrics::Bool=false, timeline::Bool=false, @@ -30,7 +34,12 @@ function enable_logging!(;metrics::Bool=false, taskresult::Bool=false, taskuidtotid::Bool=false, tasktochunk::Bool=false, - profile::Bool=false) + profile::Bool=false, + linuxperf::Bool=false, + linuxperf_metrics::String="", + gc_stats::Bool=false, + lock_contend::Bool=false, + compile_time::Bool=false) ml = TimespanLogging.MultiEventLog() ml[:core] = TimespanLogging.Events.CoreMetrics() ml[:id] = TimespanLogging.Events.IDMetrics() @@ -81,10 +90,35 @@ function enable_logging!(;metrics::Bool=false, ml[:esat] = TimespanLogging.Events.EventSaturation() ml[:psat] = Dagger.Events.ProcessorSaturation() end + if linuxperf + lp = _make_linuxperf_metrics(linuxperf_metrics) + if lp !== nothing + ml[:linuxperf] = lp + end + end + if gc_stats + ml[:gc_stats] = Dagger.Events.GCStats() + end + if lock_contend + ml[:lock_contend] = Dagger.Events.LockContentionMetrics() + end + if compile_time + ml[:compile_time] = Dagger.Events.CompileTimeMetrics() + end Dagger.Sch.eager_context().log_sink = ml return end +function _make_linuxperf_metrics(linuxperf_metrics::String) + ext = Base.get_extension(Dagger, :LinuxPerfExt) + ext === nothing && throw(ErrorException("LinuxPerf.jl is not loaded")) + if !isempty(linuxperf_metrics) + return ext.LinuxPerfMetrics(linuxperf_metrics) + else + return ext.LinuxPerfMetrics() + end +end + """ disable_logging!() From 79ae955b400b01c2e0cc4b07c777ae06f59adb69 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 14 Apr 2026 11:46:04 -0700 Subject: [PATCH 3/3] viz: Add :summary text visualizer to show_logs --- src/utils/viz.jl | 178 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/src/utils/viz.jl b/src/utils/viz.jl index 605d5655d..3fe06b5b2 100644 --- a/src/utils/viz.jl +++ b/src/utils/viz.jl @@ -419,4 +419,182 @@ function logs_to_dot(logs::Dict; disconnected=false, show_data::Bool=true, return str end + +# ---- :summary backend ---- + +# Override the default string-returning dispatch so :summary prints to stdout +function show_logs(logs::Dict, ::Val{:summary}; options...) + show_logs(stdout, logs, Val{:summary}(); options...) +end + +function pretty_bytes(n; digits=3) + r(x) = round(x; digits) + s = n < 0 ? "-" : "" + a = abs(n) + if a >= 1024^3 + "$(s)$(r(a/1024^3)) GiB" + elseif a >= 1024^2 + "$(s)$(r(a/1024^2)) MiB" + elseif a >= 1024 + "$(s)$(r(a/1024)) KiB" + else + "$(s)$(r(a)) B" + end +end + +""" + show_logs(io::IO, logs::Dict, ::Val{:summary}) + +Print a text summary of event statistics to `io` (default: stdout). + +For every event category, shows mean/min/max of: +- Run Time (wall-clock duration of the event) +- Each LinuxPerf metric (if `linuxperf=true` was passed to `enable_logging!`) +- Bytes Allocated (if `gc_stats=true`) +- Lock Conflicts (if `lock_contend=true`) +- Compile Time (if `compile_time=true`) + +For `:compute` and `:move` categories, the summary is additionally broken +down by task function name. +""" +function show_logs(io::IO, logs::Dict, ::Val{:summary}) + # Build thunk_id -> function name from :add_thunk/:start events + tid_to_fn = Dict{Int, String}() + for w in keys(logs) + haskey(logs[w], :taskfuncnames) || continue + for idx in 1:length(logs[w][:core]) + core = logs[w][:core][idx] + core.category == :add_thunk && core.kind == :start || continue + fn = logs[w][:taskfuncnames][idx] + fn isa String || continue + id = logs[w][:id][idx] + id === nothing && continue + tid_to_fn[id.thunk_id::Int] = fn + end + end + + # Channels that produce a scalar Int/Float value at the :finish event, + # mapped to their display name. Linuxperf is handled separately (it + # returns a Dict at :finish rather than a scalar). + scalar_channels = ( + (:compile_time, "Compile Time"), + (:lock_contend, "Lock Conflicts"), + (:gc_stats, "Bytes Allocated"), + ) + + # Accumulate samples keyed by (category, fn_or_nothing). + # Each bucket holds runtimes plus a generic extras dict for all other metrics. + buckets = Dict{Tuple{Symbol,Union{Nothing,String}}, + @NamedTuple{runtimes::Vector{Float64}, + extras::Dict{String,Vector{Float64}}}}() + + function get_bucket(cat, fn) + get!(buckets, (cat, fn)) do + (runtimes=Float64[], extras=Dict{String,Vector{Float64}}()) + end + end + + for w in keys(logs) + running = Dict{Any,Int}() # event key -> start idx + for idx in 1:length(logs[w][:core]) + core = logs[w][:core][idx] + id = logs[w][:id][idx] + id === nothing && continue + ekey = (core.category, id) + if core.kind == :start + running[ekey] = idx + elseif haskey(running, ekey) + start_idx = pop!(running, ekey) + runtime = Float64(core.timestamp - logs[w][:core][start_idx].timestamp) + + # Collect all extra metric values at the finish index + extra = Dict{String,Float64}() + + # LinuxPerf Dict + if haskey(logs[w], :linuxperf) + lp = logs[w][:linuxperf][idx] + if lp isa Dict + for (k, v) in lp + extra[k] = Float64(v) + end + end + end + + # Scalar channels + for (chan, dname) in scalar_channels + if haskey(logs[w], chan) + v = logs[w][chan][idx] + if v isa Number + extra[dname] = Float64(v) + end + end + end + + fn = nothing + if core.category in (:compute, :move) + try; fn = get(tid_to_fn, id.thunk_id, nothing); catch; end + end + + for bucket_fn in (fn === nothing ? (nothing,) : (nothing, fn)) + b = get_bucket(core.category, bucket_fn) + push!(b.runtimes, runtime) + for (k, v) in extra + push!(get!(Vector{Float64}, b.extras, k), v) + end + end + end + end + end + + isempty(buckets) && (println(io, "(no events recorded)"); return) + + # Formatting helpers + function fmt_stats(vals::Vector{Float64}, fmt_fn) + isempty(vals) && return "N/A" + μ = sum(vals) / length(vals) + "mean=$(fmt_fn(μ)), min=$(fmt_fn(minimum(vals))), max=$(fmt_fn(maximum(vals)))" + end + fmt_count(x) = string(round(Int, x)) + # Metric name -> formatter + time_metric_names = Set(("cpu-clock", "task-clock", "Compile Time")) + bytes_metric_names = Set(("Bytes Allocated",)) + metric_fmt(name) = + name in time_metric_names ? pretty_time : + name in bytes_metric_names ? pretty_bytes : + fmt_count + + function print_bucket_stats(b, indent::String) + # Target total column = 20; always leave at least 2 spaces after the name + target_col = max(20 - length(indent), 2) + all_names = ["Run Time"; collect(keys(b.extras))] + col = max(target_col, maximum(length, all_names; init=0) + 2) + println(io, "$(indent)$(rpad("Run Time", col))$(fmt_stats(b.runtimes, pretty_time))") + for (mname, mvals) in sort!(collect(b.extras), by=x->x[1]) + println(io, "$(indent)$(rpad(mname, col))$(fmt_stats(mvals, metric_fmt(mname)))") + end + end + + println(io, "=== Dagger Execution Summary ===") + + categories = sort!(unique(k[1] for k in keys(buckets)), by=string) + for cat in categories + b_all = get(buckets, (cat, nothing), nothing) + b_all === nothing && continue + n = length(b_all.runtimes) + + fn_keys = sort!([k[2] for k in keys(buckets) if k[1] == cat && k[2] !== nothing]) + fn_suffix = isempty(fn_keys) ? "" : ", $(length(fn_keys)) unique task(s)" + + println(io, "\n[$cat] $n events$fn_suffix") + print_bucket_stats(b_all, " ") + + cat in (:compute, :move) || continue + for fn in fn_keys + b = buckets[(cat, fn)] + println(io, "\n [$cat :: $fn] $(length(b.runtimes)) events") + print_bucket_stats(b, " ") + end + end +end + end # module Viz