Reduce copies in Arrow IPC writer#10044
Conversation
gabotechs
left a comment
There was a problem hiding this comment.
This is looking pretty good. Good job! left some comments mainly directed towards exploring more reuse and bringing a bit more clarity to this file, let me know if you have other ideas.
| } | ||
|
|
||
| let (encoded_dictionaries, encoded_message) = self.data_gen.encode( | ||
| let (dict_sizes, (meta, data)) = self.data_gen.write_batch_direct( |
There was a problem hiding this comment.
The two last (meta, data) fields returned by write_batch_direct are named (aligned_size, body_len) in that function.
Is this correct? not sure if this is just a naming thing, but it's hard to know if this is correct given the different naming. Is meta == aligned_size and data == body_len? they sound like completely different things.
There was a problem hiding this comment.
I updated the struct & variable names to hopefully make this clearer.
| /// each buffer is compressed into a per-buffer scratch `Vec<u8>` and written from | ||
| /// there, eliminating the extra copy that `write_buffer` -> `arrow_data` -> | ||
| /// `write_body_buffers` would otherwise incur. | ||
| fn write_batch_direct<W: Write>( |
There was a problem hiding this comment.
I see most contents of this function are essentially copy-pastes from record_batch_to_bytes, duplication seems too much here. Is there any chance to:
- Completely replace
record_batch_to_bytesand keep just a single function for writing batches in IPC format - Factoring out some ergonomic helpers that could be reused in both functions?
Also, it seems like the .encode() method and the new .write_batch_direct() are both doing the same thing with slightly different ergonomics. Do you see any opportunity to collapse them into just 1 method?
This file is overall pretty bloated with complex logic and a relatively arbitrary separation of concerns between methods, the more we can do for debloating it the better it will be for future maintainers.
There was a problem hiding this comment.
This makes sense to me. the main issue is that FileWriter needs metadata while both StreamWriter and arrow-flight do not. Its better to not compute metadata the caller will not use but the slowdown should be negligible.
Benchmark results from #10031 |
|
going to look into why |
fc1dbb8 to
ecb37f2
Compare
|
I think its worth mentioning that no dictionary optimizations were made in the PR, could make to make that a follow up ticket. |
|
Ran benchmarks again and the results still look good. Test are passing for arrow-flight & arrow-ipc. |
|
run benchmark flight |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/optimize-arrow-ipc-copies (04e7992) to 97f4b14 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmarks ipc_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/optimize-arrow-ipc-copies (04e7992) to 97f4b14 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤔 results look good, Im curious as to why two of the roundtrip benchmarks were slightly slower even thought |
|
run benchmark flight |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/optimize-arrow-ipc-copies (04e7992) to 97f4b14 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
Looks like the results are reproducable -- next step would be to profile it to see if you can find the answer I wonder if we are missing a |
|
i'm suspecting the issue has to do with The two regression cases both involve large variable-length data where the encoded payload can be huge: This also shows up in the regression cases, taking a look at the other benchmark results this seems consistant, Even in the event where this isn't the reason for the slow down I think 1MB is still to small for realistic max throughput. |
I think this is a strong possibility after looking at the profile, from my understanding this is mostly in arrow-flight itself and not arrow-ipc. Since arrow-flight is very dependent on arrow-ipc it make sense to start from the ground up with these optimizations. |
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. --> - Closes #10029. # Rationale for this change Increase the duplex buffer from 1 MB to 64 MB to eliminate artificial back-pressure in the roundtrip benchmarks. See rational in this [comment](#10044 (comment)) <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? bumps `max_buf_size` to 64**MB** <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> # Are these changes tested? n/a <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? If this PR claims a performance improvement, please include evidence such as benchmark results. --> # Are there any user-facing changes? n/a <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. -->
| ipc_message: finished_data.to_vec(), | ||
| arrow_data, | ||
| }) | ||
| if let Some(w) = writer { |
There was a problem hiding this comment.
We need to find another way of modeling this without falling into "if-driven-development" patterns.
Whenever you find yourself coding something switching execution branches with completely different bodies over booleans, it's an indicator that you are falling into an "if-driven-development" pattern, and this is will greatly hurt maintenance in the future.
One way of trying to improve this situation can be:
- Refactor things preferring code duplication, and split into different functions with different responsibilities, even if that implies copy-pasting big quantities of code.
- Once you have the different functions with a fair amount of copy-pasted code, try to see what are the common bits, and factor them out little by little, trying to progressively reduce the LOC count in each one.
- If you still see functions that need to accept
boolorOptionparameters that have the capability of switching how the function behaves, that means the function was the wrong abstraction on the first place, so don't be afraid of tearing existing functions apart if that allows you to reuse smaller bits in different places without switching on if statements.
| arrow_data: &mut Vec<u8>, | ||
| encoded_buffers: &mut Vec<EncodedBuffer>, | ||
| nodes: &mut Vec<crate::FieldNode>, | ||
| offset: i64, | ||
| num_rows: usize, | ||
| null_count: usize, | ||
| compression_codec: Option<CompressionCodec>, | ||
| compression_context: &mut CompressionContext, | ||
| write_options: &IpcWriteOptions, | ||
| is_direct: bool, | ||
| ) -> Result<i64, ArrowError> { | ||
| let mut offset = offset; |
There was a problem hiding this comment.
This is another example of my comment above, but this time reinforced by a big Clippy bypass at the top.
Ideally, we should not be contributing towards stronger Clippy violations. If you try to follow the steps above, there are good chances that we can either maintain the width of this function's signature, or even reduce it.
9ec873d to
83102e2
Compare
|
Replaced the ( |
83102e2 to
70a7567
Compare
|
run benchmarks flight |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/optimize-arrow-ipc-copies (70a7567) to d7ef673 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
I am excited by this one -- I am starting to check it out |
|
run benchmarks ipc_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/optimize-arrow-ipc-copies (70a7567) to d7ef673 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
| buffers: &mut Vec<crate::Buffer>, | ||
| arrow_data: &mut Vec<u8>, | ||
| sink: &mut BufferSink<'_>, | ||
| nodes: &mut Vec<crate::FieldNode>, |
There was a problem hiding this comment.
This is looking very good!
There's one thing that might confuse readers though:
Here, we are passing these two bits:
buffers: &mut Vec<crate::Buffer>,
sink: &mut BufferSink<'_>,Which at first sight, it raises some questions:
- Why do we need to pass two "buffer sinks"? isn't
buffersalready a sink forcrate::Bufferobjects? - Why do we need to dump the buffers in two places, one for
bufferand other forsink, are we unnecessarily copying data because of that?
After reading a bit more carefully, one notices that there are two kinds of buffers:
crate::Buffer: an autogenerated struct from the Arrow IPC specarrow_buffer::Buffer: The classical in-memory Arrow buffer
Maybe there's a way of making this distinction a bit clearer with some more descriptive names for BufferSink? like naming it ArrowDataSink maybe?
There was a problem hiding this comment.
this makes sense to me, I've renamed it to IpcBodySink to help new readers understand the distinction better.
gabotechs
left a comment
There was a problem hiding this comment.
Just left some comments mostly about naming and some other nits, but overall, it looks good to me!
It was pretty hard navigating through this file, and you did a very good job at shipping this performance optimization without making it worst, nice job @Rich-T-kid!
ca764ea to
ae03023
Compare
alamb
left a comment
There was a problem hiding this comment.
Thank you so much @Rich-T-kid -- this looks great
It was pretty hard navigating through this file, and you did a very good job at shipping this performance optimization without making it worst, nice job @Rich-T-kid!
I actually think it is slightly better now (though I have ideas on how to make it even better :) -- thank you @gabotechs for the reviews
I think it would be good to try and avoid the extra copy (comments inline) but we could also do that as a follow on if you prefer
While reviewing this, claude and I found some missing coverage, which I have proposed adding here:
| buffers: &mut Vec<crate::Buffer>, // output buffer descriptors | ||
| arrow_data: &mut Vec<u8>, // output stream | ||
| offset: i64, // current output stream offset | ||
| fn encode_sink_buffer( |
There was a problem hiding this comment.
similarly to @gabotechs above I found the relationship between buffer, buffers and sink confusing.
Could you perhaps (can be a follow on PR) to add documentation here explaining what this is doing -- specifically that the IpcBodySink is used for the actual arrow data and the buffers is an in-progresss list what will eventually become the IPC metadata
There was a problem hiding this comment.
As a follow on, we might even be able to make this clearer by encapsulating buffers in some sort of other struct
struct IpcMetadataBuilder {
buffers: Vec<crate::Buffer>,
nodes: Vec<crate::FieldNode>,
}And then pass that through 🤔
There was a problem hiding this comment.
We could probably also make the code less verbose by making buffers, sink, and field, fields on the IPC writer 🤔
There was a problem hiding this comment.
Added some doc comments above both write_array_data() & encode_sink_buffer(). I think encapsulating buffers into a struct would be worth placing in a separate small PR. that PR would/should also include some other minor tweaks like removing the allow(clippy::too_many_arguments) on write_array_data()
3bfccfd to
a674bed
Compare
|
@alamb introduced the changes you mentioned. I'll open up a small PR tomorrow that refactors this file to be easier to understand. Thank you! |
|
run benchmarks ipc_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/optimize-arrow-ipc-copies (70d5802) to d7ef673 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |



Which issue does this PR close?
A document that provides a bit of context
Rationale for this change
Compression is the most compute and memory intensive part of the arrow-ipc encoding pipeline. It runs per buffer, not per record batch. For a Flight stream of 10 batches with 5 primitive arrays each, that is 100 compression calls minimum, more for string and struct arrays. Each of those calls produced an owned compressed Vec that was then copied a second time into a flat arrow_data accumulator before being written to the output. For the uncompressed path the situation was the same: Arc-backed buffer slices that required no compression were still copied into that accumulator unnecessarily.
Separately, the original write_message() function flushed after every dictionary and every record batch, causing repeated small OS write calls per batch. ( for non vector backed writer implementations )
The goal was to eliminate both problems: stop copying buffers that do not need to be copied, and stop flushing on every message.
What changes are included in this PR?
Are these changes tested?
These changes are intended to be completely seamless. I didn't write new unit test for the code as nothing externally changed. all test still pass
benchmarks
[main ->

cargo bench --bench ipc_writer -- "StreamWriter/write_10$" --sample-size 100][my branch ->
cargo bench --bench ipc_writer -- "StreamWriter/write_10$" --sample-size 100][main ->

cargo bench --bench ipc_writer -- --sample-size 1000][my branch ->
cargo bench --bench ipc_writer -- --sample-size 1000]Are there any user-facing changes?
no