Skip to content

Commit 27af268

Browse files
committed
Cloud I/O enhancements and parallelism
1 parent 55c4e63 commit 27af268

2 files changed

Lines changed: 120 additions & 168 deletions

File tree

mdio/utils/segy_export.h

Lines changed: 116 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
#include <string>
2020
#include <vector>
2121

22+
#include <algorithm> // for std::erase_if
23+
#include <thread> // for sleep_for
24+
#include <chrono> // for milliseconds
25+
2226
#include "mdio/mdio.h"
2327
#include "tensorstore/util/result.h"
2428

@@ -310,8 +314,12 @@ Result<void> MdioToSegy(
310314
(void)trace_headers;
311315

312316
// 1 GiB cache
313-
auto cacheJson = nlohmann::json::parse(
314-
R"({"cache_pool": {"total_bytes_limit": 5000000000}})");
317+
auto cacheJson = nlohmann::json::parse(R"({
318+
"cache_pool": { "total_bytes_limit": 5000000000 },
319+
"data_copy_concurrency": {"limit": 16},
320+
"gcs_request_concurrency": {"limit": 16},
321+
"s3_request_concurrency": {"limit": 16}
322+
})");
315323
auto ctxSpec = Context::Spec::FromJson(cacheJson);
316324
auto ctx = Context(ctxSpec.value());
317325

@@ -320,8 +328,6 @@ Result<void> MdioToSegy(
320328
auto ds,
321329
Dataset::Open(mdio_path, constants::kOpen, ctx).result());
322330

323-
std::cout << ds << std::endl;
324-
325331
// Pick the highest-rank (float-prefer) seismic variable
326332
bool found = false;
327333
Variable<> seismic_var;
@@ -348,10 +354,7 @@ Result<void> MdioToSegy(
348354
// Get dims & shape from the seismic variable's ordered domain
349355
auto domain = seismic_var.dimensions();
350356
auto labels = domain.labels(); // e.g. {"inline","crossline","time"}
351-
for (const auto& l : labels) {
352-
std::cout << l << std::endl;
353-
}
354-
auto shape = domain.shape(); // e.g. { ni, nc, nt }
357+
auto shape = domain.shape(); // e.g. { ni, nc, nt }
355358
size_t rank = shape.size();
356359
if (rank < 2) {
357360
return absl::InvalidArgumentError(
@@ -367,216 +370,161 @@ Result<void> MdioToSegy(
367370
num_traces *= shape[i];
368371
}
369372

370-
// Use the domain size of the second-fastest growing dimension for chunking
371-
// This ensures we process complete "slices" along that dimension
372-
Index output_chunk_size = 1;
373-
if (rank >= 2) {
374-
// Use the full domain size of the second-fastest growing spatial dimension
375-
output_chunk_size = shape[rank - 2];
376-
} else if (rank >= 1) {
377-
// Fallback to first spatial dimension size
378-
output_chunk_size = shape[0];
379-
}
380-
381-
std::cout << "Using output chunk size: " << output_chunk_size << " (full domain of dimension "
382-
<< (rank >= 2 ? rank - 2 : 0) << ")" << std::endl;
373+
// Determine chunk size along second-fastest spatial dim
374+
Index output_chunk_size = (rank >= 2)
375+
? shape[rank - 2]
376+
: shape[0];
383377

384378
// Build Zarr spec for output SEG-Y
385379
nlohmann::json spec;
386380
spec["driver"] = "zarr";
387-
388-
// infer driver from URI
389381
std::string driver = "file";
390382
if (absl::StartsWith(segy_path, "gs://")) driver = "gcs";
391383
else if (absl::StartsWith(segy_path, "s3://")) driver = "s3";
392384
spec["kvstore"] = {{"driver", driver}, {"path", segy_path}};
393-
394385
if (driver != "file") {
395-
// strip "gs://" or "s3://"
396386
size_t pos = segy_path.find("://");
397387
std::string tail = segy_path.substr(pos + 3);
398-
// build a real vector<string>
399-
std::vector<std::string> file_parts;
400-
for (auto piece : absl::StrSplit(tail, '/')) {
401-
file_parts.emplace_back(piece);
402-
}
403-
if (file_parts.size() < 2) {
404-
return absl::InvalidArgumentError(
405-
"Cloud path requires [gs/s3]://bucket/path format");
406-
}
407-
spec["kvstore"]["bucket"] = file_parts[0];
408-
// rejoin remainder as object path
409-
spec["kvstore"]["path"] =
410-
absl::StrJoin(file_parts.begin() + 1, file_parts.end(), "/");
388+
std::vector<std::string> parts;
389+
for (auto& p : absl::StrSplit(tail, '/')) parts.emplace_back(p);
390+
spec["kvstore"]["bucket"] = parts[0];
391+
spec["kvstore"]["path"] = absl::StrJoin(parts.begin()+1, parts.end(), "/");
411392
}
412-
413393
spec["metadata"] = {
414394
{"dtype", std::string("|V") + std::to_string(trace_bytes)},
415395
{"shape", {num_traces}},
416-
{"chunks", {output_chunk_size}}, // Use the computed chunk size instead of 1
396+
{"chunks", {output_chunk_size}},
417397
{"dimension_separator", "."},
418398
{"compressor", nullptr},
419399
{"fill_value", nullptr},
420400
{"order", "C"},
421-
{"zarr_format", 2}};
401+
{"zarr_format", 2}
402+
};
422403
spec["attributes"] = {
423404
{"dimension_names", {"trace"}},
424-
{"long_name", ""}};
405+
{"long_name", ""}
406+
};
425407

426408
MDIO_ASSIGN_OR_RETURN(
427409
auto out_var,
428410
Variable<>::Open(spec, constants::kCreateClean, ctx).result());
429411

430-
// Process traces by chunking along the second-fastest growing dimension
431-
// This ensures we align with the underlying storage chunks
432-
size_t chunk_dim = (rank >= 2) ? rank - 2 : 0; // Index of second-fastest growing spatial dim
412+
// === Throttle helper ===
413+
constexpr size_t MAX_IN_FLIGHT = 10;
414+
std::vector<tensorstore::WriteFutures> futures;
415+
auto enqueue_write = [&](tensorstore::WriteFutures wf) {
416+
while (futures.size() >= MAX_IN_FLIGHT) {
417+
// Remove completed writes using remove-erase
418+
futures.erase(
419+
std::remove_if(
420+
futures.begin(), futures.end(),
421+
[](auto& wf){ return wf.commit_future.ready(); }),
422+
futures.end());
423+
if (futures.size() >= MAX_IN_FLIGHT) {
424+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
425+
}
426+
}
427+
futures.emplace_back(std::move(wf));
428+
};
429+
430+
// Iterate over chunks and submit writes
431+
size_t chunk_dim = (rank >= 2) ? rank - 2 : 0;
433432
Index chunk_dim_size = shape[chunk_dim];
434-
435-
// Calculate total number of "slices" in all other spatial dimensions
436433
Index num_outer_slices = 1;
437-
for (size_t d = 0; d < rank - 1; ++d) {
438-
if (d != chunk_dim) {
439-
num_outer_slices *= shape[d];
440-
}
441-
}
442-
443-
Index traces_processed = 0;
444-
445-
// Iterate through all combinations of the other spatial dimensions
434+
for (size_t d = 0; d < rank - 1; ++d) if (d != chunk_dim) num_outer_slices *= shape[d];
446435
std::vector<Index> outer_coords(rank - 1, 0);
447-
448-
for (Index outer_slice = 0; outer_slice < num_outer_slices; ++outer_slice) {
449-
// Convert outer_slice to coordinates for all dimensions except chunk_dim
450-
Index temp_slice = outer_slice;
451-
for (int d = static_cast<int>(rank) - 2; d >= 0; --d) {
452-
if (static_cast<size_t>(d) != chunk_dim) {
453-
outer_coords[d] = temp_slice % shape[d];
454-
temp_slice /= shape[d];
436+
437+
Index traces_processed = 0;
438+
for (Index outer = 0; outer < num_outer_slices; ++outer) {
439+
// compute outer_coords
440+
Index tmp = outer;
441+
for (int d = rank - 2; d >= 0; --d) {
442+
if ((size_t)d != chunk_dim) {
443+
outer_coords[d] = tmp % shape[d];
444+
tmp /= shape[d];
455445
}
456446
}
457-
458-
// Now chunk along the chunk_dim
459-
for (Index chunk_start = 0; chunk_start < chunk_dim_size; chunk_start += output_chunk_size) {
460-
Index chunk_end = std::min(chunk_start + output_chunk_size, chunk_dim_size);
461-
Index chunk_size = chunk_end - chunk_start;
462-
447+
448+
for (Index start = 0; start < chunk_dim_size; start += output_chunk_size) {
449+
Index end = std::min(start + output_chunk_size, chunk_dim_size);
450+
Index size = end - start;
451+
463452
// Progress bar (tqdm style)
464453
double progress = static_cast<double>(traces_processed) / static_cast<double>(num_traces);
465454
int bar_width = 50;
466455
int pos = static_cast<int>(bar_width * progress);
467-
468-
// std::cout << "\r[";
469-
// for (int i = 0; i < bar_width; ++i) {
470-
// if (i < pos) std::cout << "=";
471-
// else if (i == pos) std::cout << ">";
472-
// else std::cout << " ";
473-
// }
474-
// std::cout << "] " << static_cast<int>(progress * 100.0) << "% "
475-
// << "(" << traces_processed << "/" << num_traces << " traces)";
476-
// std::cout.flush();
477-
478-
// Build slice descriptors for this specific chunk
479-
std::vector<RangeDescriptor<Index>> chunk_descs;
480-
chunk_descs.reserve(rank);
481-
482-
// For each spatial dimension, either use the fixed coordinate or the chunk range
456+
std::cout << "\r[";
457+
for (int i = 0; i < bar_width; ++i) {
458+
if (i < pos) std::cout << "=";
459+
else if (i == pos) std::cout << ">";
460+
else std::cout << " ";
461+
}
462+
std::cout << "] " << static_cast<int>(progress * 100.0) << "% "
463+
<< "(" << traces_processed << "/" << num_traces << " traces)";
464+
std::cout.flush();
465+
466+
// Build slice descriptors
467+
std::vector<RangeDescriptor<Index>> descs;
468+
descs.reserve(rank);
483469
for (size_t d = 0; d < rank - 1; ++d) {
484470
if (d == chunk_dim) {
485-
// This is the dimension we're chunking along
486-
chunk_descs.push_back(RangeDescriptor<Index>{
487-
labels[d],
488-
chunk_start,
489-
chunk_end,
490-
1});
471+
descs.push_back({labels[d], start, end, 1});
491472
} else {
492-
// This is a fixed coordinate for this outer slice
493-
chunk_descs.push_back(RangeDescriptor<Index>{
494-
labels[d],
495-
outer_coords[d],
496-
outer_coords[d] + 1,
497-
1});
473+
descs.push_back({labels[d], outer_coords[d], outer_coords[d]+1, 1});
498474
}
499475
}
500-
501-
// Full slice on the last dimension (samples/time)
502-
chunk_descs.push_back(RangeDescriptor<Index>{
503-
labels[rank - 1],
504-
0,
505-
num_samples,
506-
1});
507-
508-
// Read only the seismic data we need for this chunk
509-
MDIO_ASSIGN_OR_RETURN(auto chunk_var, seismic_var.slice(chunk_descs));
510-
std::cout << chunk_var << std::endl;
511-
MDIO_ASSIGN_OR_RETURN(auto chunk_data, chunk_var.Read().result());
512-
std::cout << "Finished reading chunk_var..." << std::endl;
513-
514-
const char* chunk_ptr = reinterpret_cast<const char*>(
515-
chunk_data.get_data_accessor().data());
516-
ptrdiff_t chunk_off = chunk_data.get_flattened_offset();
517-
518-
// Allocate buffer for the output chunk
519-
std::vector<char> chunk_buffer(chunk_size * trace_bytes, 0);
520-
521-
// Process each trace in the chunk
522-
for (Index i = 0; i < chunk_size; ++i) {
523-
// Calculate the offset in the chunk data for this trace
524-
Index trace_offset_in_chunk = i * num_samples;
525-
526-
// Copy trace data to chunk buffer (240 bytes header + samples)
527-
char* trace_buffer = chunk_buffer.data() + (i * trace_bytes);
528-
std::memset(trace_buffer, 0, 240); // Zero the header
529-
std::memcpy(
530-
trace_buffer + 240,
531-
chunk_ptr + (chunk_off + trace_offset_in_chunk) * sample_bytes,
532-
static_cast<size_t>(num_samples) * sample_bytes);
476+
descs.push_back({labels[rank-1], 0, num_samples, 1});
477+
478+
MDIO_ASSIGN_OR_RETURN(auto var_slice, seismic_var.slice(descs));
479+
MDIO_ASSIGN_OR_RETURN(auto data, var_slice.Read().result());
480+
481+
const char* ptr = reinterpret_cast<const char*>(data.get_data_accessor().data());
482+
ptrdiff_t off = data.get_flattened_offset();
483+
std::vector<char> buffer(size * trace_bytes, 0);
484+
for (Index i = 0; i < size; ++i) {
485+
char* tb = buffer.data() + i * trace_bytes;
486+
std::memset(tb, 0, 240);
487+
std::memcpy(tb + 240, ptr + (off + i*num_samples)*sample_bytes,
488+
size_t(num_samples)*sample_bytes);
533489
}
534490

535-
// Calculate the output trace range for this chunk
536-
// Convert coordinates back to linear trace indices
537-
Index output_start = 0;
538-
Index multiplier = 1;
539-
540-
// Build the linear index from coordinates
541-
std::vector<Index> trace_coords = outer_coords;
542-
trace_coords[chunk_dim] = chunk_start;
543-
544-
for (int d = static_cast<int>(rank) - 2; d >= 0; --d) {
545-
output_start += trace_coords[d] * multiplier;
546-
multiplier *= shape[d];
491+
// Compute write range
492+
Index out_start = 0, mul = 1;
493+
std::vector<Index> coords = outer_coords;
494+
coords[chunk_dim] = start;
495+
for (int d = rank-2; d >= 0; --d) {
496+
out_start += coords[d] * mul;
497+
mul *= shape[d];
547498
}
548-
549-
Index output_end = output_start + chunk_size;
499+
RangeDescriptor<Index> write_desc{"trace", out_start, out_start+size, 1};
550500

551-
// Write the chunk to output
552-
RangeDescriptor<Index> write_desc{"trace", output_start, output_end, 1};
553501
MDIO_ASSIGN_OR_RETURN(auto out_slice, out_var.slice(write_desc));
554-
std::cout << out_slice << std::endl;
555-
MDIO_ASSIGN_OR_RETURN(auto out_data, out_slice.Read().result());
556-
std::cout << "Finished reading out_data..." << std::endl;
557-
char* out_ptr = reinterpret_cast<char*>(
558-
out_data.get_data_accessor().data());
502+
// MDIO_ASSIGN_OR_RETURN(auto out_data, out_slice.Read().result());
503+
MDIO_ASSIGN_OR_RETURN(auto out_data, from_variable(out_slice));
504+
505+
char* out_ptr = reinterpret_cast<char*>(out_data.get_data_accessor().data());
559506
ptrdiff_t out_off = out_data.get_flattened_offset();
560-
561-
// Copy the chunk buffer to output
562-
std::memcpy(out_ptr + out_off, chunk_buffer.data(), chunk_size * trace_bytes);
563-
564-
// Write it back
565-
std::cout << "Writing back to out_slice..." << std::endl;
566-
auto fut = out_slice.Write(out_data);
567-
if (!fut.status().ok()) return fut.status();
568-
std::cout << "Finished writing back to out_slice..." << std::endl;
569-
traces_processed += chunk_size;
507+
std::memcpy(out_ptr + out_off, buffer.data(), size * trace_bytes);
508+
509+
// Submit write with throttle
510+
auto wf = out_slice.Write(out_data);
511+
enqueue_write(std::move(wf));
512+
513+
traces_processed += size;
570514
}
571515
}
572516

573-
// Final progress bar showing 100% completion
574-
std::cout << "\r[";
575-
for (int i = 0; i < 50; ++i) {
576-
std::cout << "=";
517+
// Drain remaining writes and check errors
518+
for (auto& wf : futures) {
519+
if (!wf.commit_future.result().ok()) {
520+
return wf.commit_future.status();
521+
}
577522
}
578-
std::cout << "] 100% (" << num_traces << "/" << num_traces << " traces)" << std::endl;
579-
std::cout << "Conversion completed successfully!" << std::endl;
523+
524+
std::cout << "\r[";
525+
for (int i = 0; i < 50; ++i) std::cout << "=";
526+
std::cout << "] 100% (" << num_traces << "/" << num_traces << ")\n"
527+
<< "Conversion completed successfully!\n";
580528

581529
return absl::OkStatus();
582530
}

mdio/variable.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,11 +557,15 @@ template <typename T = void, DimensionIndex R = dynamic_rank,
557557
Future<Variable<T, R, M>> OpenVariable(const nlohmann::json& json_store,
558558
Option&&... options) {
559559
// Infer the name from the path
560+
561+
560562
std::string variable_name = json_store["kvstore"]["path"].get<std::string>();
561563
std::vector<std::string> pathComponents = absl::StrSplit(variable_name, "/");
562564
variable_name = pathComponents.back();
563565

564566
auto store_spec = json_store;
567+
store_spec["recheck_cached_data"] = "open";
568+
store_spec["recheck_cached_metadata"] = "open";
565569
// retain attributes if we want to check values ...
566570
::nlohmann::json suppliedAttributes;
567571
if (store_spec.contains("attributes")) {

0 commit comments

Comments
 (0)