diff --git a/Cargo.lock b/Cargo.lock index 3a187bad76ba6..135a85466c7df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -310,104 +310,53 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e833808ff2d94ed40d9379848a950d995043c7fb3e81a30b383f4c6033821cc" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" dependencies = [ - "arrow-arith 56.2.0", - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-cast 56.2.0", - "arrow-data 56.2.0", - "arrow-ipc 56.2.0", - "arrow-json 56.2.0", - "arrow-ord 56.2.0", - "arrow-row 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", - "arrow-string 56.2.0", -] - -[[package]] -name = "arrow" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d441fdda254b65f3e9025910eb2c2066b6295d9c8ed409522b8d2ace1ff8574c" -dependencies = [ - "arrow-arith 58.1.0", - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-cast 58.1.0", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", "arrow-csv", - "arrow-data 58.1.0", - "arrow-ipc 58.1.0", - "arrow-json 58.1.0", - "arrow-ord 58.1.0", - "arrow-row 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", - "arrow-string 58.1.0", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", ] [[package]] name = "arrow-arith" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "chrono", - "num", -] - -[[package]] -name = "arrow-arith" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced5406f8b720cc0bc3aa9cf5758f93e8593cda5490677aa194e4b4b383f9a59" -dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "num-traits", ] [[package]] name = "arrow-array" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" dependencies = [ "ahash 0.8.11", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "chrono-tz", "half", - "hashbrown 0.16.0", - "num", -] - -[[package]] -name = "arrow-array" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "772bd34cacdda8baec9418d80d23d0fb4d50ef0735685bd45158b83dfeb6e62d" -dependencies = [ - "ahash 0.8.11", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "chrono", - "chrono-tz", - "half", - "hashbrown 0.16.0", + "hashbrown 0.17.1", "num-complex", "num-integer", "num-traits", @@ -415,20 +364,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" -dependencies = [ - "bytes", - "half", - "num", -] - -[[package]] -name = "arrow-buffer" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "898f4cf1e9598fdb77f356fdf2134feedfd0ee8d5a4e0a5f573e7d0aec16baa4" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" dependencies = [ "bytes", "half", @@ -438,36 +376,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", - "atoi", - "base64 0.22.1", - "chrono", - "half", - "lexical-core", - "num", - "ryu", -] - -[[package]] -name = "arrow-cast" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0127816c96533d20fc938729f48c52d3e48f99717e7a0b5ade77d742510736d" -dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-ord 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", "atoi", "base64 0.22.1", "chrono", @@ -480,13 +398,13 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca025bd0f38eeecb57c2153c0123b960494138e6a957bbda10da2b25415209fe" +checksum = "e94e8cf7e517657a52b91ea1263acf38c4ca62a84655d72458a3359b12ab97de" dependencies = [ - "arrow-array 58.1.0", - "arrow-cast 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-cast", + "arrow-schema", "chrono", "csv", "csv-core", @@ -495,24 +413,12 @@ dependencies = [ [[package]] name = "arrow-data" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" dependencies = [ - "arrow-buffer 56.2.0", - "arrow-schema 56.2.0", - "half", - "num", -] - -[[package]] -name = "arrow-data" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d10beeab2b1c3bb0b53a00f7c944a178b622173a5c7bcabc3cb45d90238df4" -dependencies = [ - "arrow-buffer 58.1.0", - "arrow-schema 58.1.0", + "arrow-buffer", + "arrow-schema", "half", "num-integer", "num-traits", @@ -520,15 +426,15 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302b2e036335f3f04d65dad3f74ff1f2aae6dc671d6aa04dc6b61193761e16fb" +checksum = "28abfe8bf9f124e5fc83b334af4fa58f8d0323ad25312ccb2d1da50178415704" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-cast 58.1.0", - "arrow-ipc 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-schema", "base64 0.22.1", "bytes", "futures", @@ -540,29 +446,15 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" -dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", - "flatbuffers 25.9.23", -] - -[[package]] -name = "arrow-ipc" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "609a441080e338147a84e8e6904b6da482cefb957c5cdc0f3398872f69a315d0" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "flatbuffers 25.9.23", "lz4_flex 0.13.0", "zstd 0.13.2", @@ -570,37 +462,16 @@ dependencies = [ [[package]] name = "arrow-json" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88cf36502b64a127dc659e3b305f1d993a544eab0d48cce704424e62074dc04b" -dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-cast 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "chrono", - "half", - "indexmap 2.12.0", - "lexical-core", - "memchr", - "num", - "serde", - "serde_json", - "simdutf8", -] - -[[package]] -name = "arrow-json" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ead0914e4861a531be48fe05858265cf854a4880b9ed12618b1d08cba9bebc8" +checksum = "205ca2119e6d679d5c133c6f30e68f027738d95ed948cf77677ea69c7800036b" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-cast 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ord", + "arrow-schema", + "arrow-select", "chrono", "half", "indexmap 2.12.0", @@ -616,67 +487,35 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", -] - -[[package]] -name = "arrow-ord" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "763a7ba279b20b52dad300e68cfc37c17efa65e68623169076855b3a9e941ca5" -dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", -] - -[[package]] -name = "arrow-row" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d07ba24522229d9085031df6b94605e0f4b26e099fb7cdeec37abd941a73753" -dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "half", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", ] [[package]] name = "arrow-row" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14fe367802f16d7668163ff647830258e6e0aeea9a4d79aaedf273af3bdcd3e" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "half", ] [[package]] name = "arrow-schema" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" - -[[package]] -name = "arrow-schema" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c30a1365d7a7dc50cc847e54154e6af49e4c4b0fddc9f607b687f29212082743" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" dependencies = [ "serde", "serde_core", @@ -684,60 +523,29 @@ dependencies = [ [[package]] name = "arrow-select" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" -dependencies = [ - "ahash 0.8.11", - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "num", -] - -[[package]] -name = "arrow-select" -version = "58.1.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78694888660a9e8ac949853db393af2a8b8fc82c19ce333132dfa2e72cc1a7fe" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" dependencies = [ "ahash 0.8.11", - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "num-traits", ] [[package]] name = "arrow-string" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", - "memchr", - "num", - "regex", - "regex-syntax", -] - -[[package]] -name = "arrow-string" -version = "58.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e04a01f8bb73ce54437514c5fd3ee2aa3e8abe4c777ee5cc55853b1652f79e" -dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "memchr", "num-traits", "regex", @@ -2694,7 +2502,7 @@ name = "codecs" version = "0.1.0" dependencies = [ "apache-avro", - "arrow 56.2.0", + "arrow", "async-trait", "bytes", "chrono", @@ -3483,8 +3291,13 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90f87c99484cd20e2c9c361e2fe8884f614f35d8cde7d630cfaacbc09c4c21e9" dependencies = [ + "arrow-array", + "arrow-flight", + "arrow-ipc", + "arrow-schema", "async-trait", "bytes", + "futures", "hyper-http-proxy", "hyper-util", "prost 0.14.3", @@ -4744,11 +4557,11 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5ee8f8f967c2b8183136920bdb0a36d06618cc9402f901495c0da66cb2cc74b" dependencies = [ - "arrow 58.1.0", - "arrow-array 58.1.0", + "arrow", + "arrow-array", "arrow-flight", - "arrow-ipc 58.1.0", - "arrow-schema 58.1.0", + "arrow-ipc", + "arrow-schema", "async-stream", "async-trait", "base64 0.22.1", @@ -4898,6 +4711,12 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" + [[package]] name = "hashlink" version = "0.10.0" @@ -7591,7 +7410,7 @@ version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51e219e79014df21a225b1860a479e2dcd7cbd9130f4defd4bd0e191ea31d67d" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "chrono", "getrandom 0.2.15", "http 1.3.1", @@ -7949,27 +7768,27 @@ dependencies = [ [[package]] name = "parquet" -version = "56.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0dbd48ad52d7dccf8ea1b90a3ddbfaea4f69878dd7683e51c507d4bc52b5b27" +checksum = "5dafa7d01085b62a47dd0c1829550a0a36710ea9c4fe358a05a85477cec8a908" dependencies = [ "ahash 0.8.11", - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-cast 56.2.0", - "arrow-data 56.2.0", - "arrow-ipc 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", "base64 0.22.1", "bytes", "chrono", "flate2", "half", - "hashbrown 0.16.0", - "lz4_flex 0.11.6", - "num", + "hashbrown 0.17.1", + "lz4_flex 0.13.0", "num-bigint", + "num-integer", + "num-traits", "paste", "seq-macro", "snap", @@ -8645,7 +8464,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.14.0", "log", "multimap", "petgraph 0.8.3", @@ -12878,8 +12697,8 @@ dependencies = [ "approx", "arc-swap", "arr_macro", - "arrow 56.2.0", - "arrow-schema 56.2.0", + "arrow", + "arrow-schema", "assert_cmd", "async-compression", "async-nats", @@ -13000,11 +12819,9 @@ dependencies = [ "proptest", "proptest-derive", "prost 0.12.6", - "prost 0.14.3", "prost-build 0.12.6", "prost-reflect", "prost-types 0.12.6", - "prost-types 0.14.3", "pulsar", "quick-junit", "quick-xml 0.31.0", diff --git a/Cargo.toml b/Cargo.toml index c389001ed9276..e219c1698b093 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -336,11 +336,7 @@ prost-reflect = { workspace = true, optional = true } prost-types = { workspace = true, optional = true } # Databricks Zerobus -databricks-zerobus-ingest-sdk = { version = "2.0.1", optional = true } -# The SDK returns prost-types 0.14 DescriptorProto values; prost-reflect (used by the rest -# of the sink) is on prost-types 0.13. We bridge the two via wire-format re-encoding. -prost-014 = { package = "prost", version = "0.14", optional = true } -prost-types-014 = { package = "prost-types", version = "0.14", optional = true } +databricks-zerobus-ingest-sdk = { version = "2.0.1", optional = true, features = ["arrow-flight"] } # GCP goauth = { version = "0.16.0", optional = true } @@ -361,9 +357,9 @@ greptimedb-ingester = { version = "0.17.0", default-features = false, optional = # External libs arc-swap = { workspace = true, default-features = false, optional = true } async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } -arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true } -arrow-schema = { version = "56.2.0", default-features = false, optional = true } -parquet = { version = "56.2.0", default-features = false, features = [ +arrow = { version = "58.2.0", default-features = false, features = ["ipc"], optional = true } +arrow-schema = { version = "58.2.0", default-features = false, optional = true } +parquet = { version = "58.2.0", default-features = false, features = [ "arrow", "snap", "zstd", @@ -934,7 +930,7 @@ sinks-chronicle = [] sinks-clickhouse = ["dep:nom", "dep:rust_decimal", "codecs-arrow"] sinks-console = [] sinks-databend = ["dep:databend-client"] -sinks-databricks-zerobus = ["dep:databricks-zerobus-ingest-sdk", "dep:prost-reflect", "dep:prost-014", "dep:prost-types-014", "dep:base64"] +sinks-databricks-zerobus = ["dep:databricks-zerobus-ingest-sdk", "codecs-arrow"] sinks-datadog_events = [] sinks-datadog_logs = [] sinks-datadog_metrics = ["protobuf-build", "dep:prost", "dep:prost-reflect"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 2d00edc12d4dd..03f1af95a638f 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -27,7 +27,7 @@ arr_macro_impl,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss arrow,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow arrow-arith,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow -arrow-array,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-array,https://github.com/apache/arrow-rs,Apache-2.0 AND MIT,Apache Arrow arrow-buffer,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow arrow-cast,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow arrow-csv,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow @@ -316,6 +316,7 @@ half,https://github.com/starkat99/half-rs,MIT OR Apache-2.0,Kathryn Long hashbag,https://github.com/jonhoo/hashbag,MIT OR Apache-2.0,Jon Gjengset hashbrown,https://github.com/rust-lang/hashbrown,MIT OR Apache-2.0,Amanieu d'Antras +hashbrown,https://github.com/rust-lang/hashbrown,MIT OR Apache-2.0,The hashbrown Authors hashlink,https://github.com/kyren/hashlink,MIT OR Apache-2.0,kyren hdrhistogram,https://github.com/HdrHistogram/HdrHistogram_rust,MIT OR Apache-2.0,"Jon Gjengset , Marshall Pierce " headers,https://github.com/hyperium/headers,MIT,Sean McArthur diff --git a/changelog.d/24840_databricks_zerobus_sink.feature.md b/changelog.d/24840_databricks_zerobus_sink.feature.md index 34a8ecb1c9224..29e51adcc2ef2 100644 --- a/changelog.d/24840_databricks_zerobus_sink.feature.md +++ b/changelog.d/24840_databricks_zerobus_sink.feature.md @@ -1,3 +1,3 @@ -Add a new `databricks_zerobus` sink that streams log data to Databricks Unity Catalog tables via the Zerobus ingestion service. Supports OAuth 2.0 authentication, automatic schema fetching from Unity Catalog, and protobuf batch encoding. +Add a new `databricks_zerobus` sink that streams log data to Databricks Unity Catalog tables via the Zerobus ingestion service. Supports OAuth 2.0 authentication, automatic schema fetching from Unity Catalog, and Arrow batch encoding. authors: flaviocruz diff --git a/changelog.d/arrow_58.enhancement.md b/changelog.d/arrow_58.enhancement.md new file mode 100644 index 0000000000000..13e64e076c21c --- /dev/null +++ b/changelog.d/arrow_58.enhancement.md @@ -0,0 +1,3 @@ +Updated the bundled Apache Arrow and Parquet libraries from 56.2 to 58. This affects components that emit Arrow or Parquet data, such as the `clickhouse` sink and the `aws_s3` sink's Parquet encoding. + +authors: flaviocruz diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 6a5f8424a61fd..c2bd34dc81d3d 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -14,8 +14,8 @@ path = "tests/bin/generate-avro-fixtures.rs" [dependencies] apache-avro = { version = "0.20.0", default-features = false } -arrow = { version = "56.2.0", default-features = false, features = ["ipc", "json"], optional = true } -parquet = { version = "56.2.0", default-features = false, features = [ +arrow = { version = "58.2.0", default-features = false, features = ["ipc", "json"], optional = true } +parquet = { version = "58.2.0", default-features = false, features = [ "arrow", "snap", "zstd", diff --git a/lib/codecs/src/encoding/encoder.rs b/lib/codecs/src/encoding/encoder.rs index 7aac89ed1e100..2bbfba6cdbb55 100644 --- a/lib/codecs/src/encoding/encoder.rs +++ b/lib/codecs/src/encoding/encoder.rs @@ -8,43 +8,42 @@ use crate::encoding::ArrowStreamSerializer; #[cfg(feature = "parquet")] use crate::encoding::ParquetSerializer; use crate::{ - encoding::{Error, Framer, ProtoBatchSerializer, Serializer}, + encoding::{Error, Framer, Serializer}, internal_events::{EncoderFramingError, EncoderSerializeError}, }; /// The output of a batch encoding operation. /// -/// Different batch serializers produce different output types: -/// - Arrow serializer produces a `RecordBatch` -/// - Proto serializer produces individual byte buffers per event +/// Only available when the `arrow` feature is enabled. +#[cfg(feature = "arrow")] #[derive(Debug)] pub enum BatchOutput { /// An Arrow RecordBatch containing all events encoded as columnar data. - #[cfg(feature = "arrow")] Arrow(arrow::record_batch::RecordBatch), - /// A list of individually-serialized records (one per event). - Records(Vec>), } /// Serializers that support batch encoding (encoding all events at once). +/// +/// Only available when the `arrow` feature is enabled (the `parquet` feature +/// implies `arrow`). +#[cfg(feature = "arrow")] #[derive(Debug, Clone)] pub enum BatchSerializer { /// Arrow IPC stream format serializer. - #[cfg(feature = "arrow")] Arrow(ArrowStreamSerializer), /// Parquet format serializer. #[cfg(feature = "parquet")] Parquet(Box), - /// Protobuf batch serializer that encodes each event individually. - ProtoBatch(ProtoBatchSerializer), } /// An encoder that encodes batches of events. +#[cfg(feature = "arrow")] #[derive(Debug, Clone)] pub struct BatchEncoder { serializer: BatchSerializer, } +#[cfg(feature = "arrow")] impl BatchEncoder { /// Creates a new `BatchEncoder` with the specified batch serializer. pub const fn new(serializer: BatchSerializer) -> Self { @@ -57,25 +56,17 @@ impl BatchEncoder { } /// Get the HTTP content type. - /// - /// Returns `None` for serializers that do not produce a single HTTP body - /// (e.g. `ProtoBatch`, which emits one record per event for an out-of-band - /// transport rather than an HTTP payload). - #[cfg(any(feature = "arrow", feature = "parquet"))] pub const fn content_type(&self) -> Option<&'static str> { match &self.serializer { - #[cfg(feature = "arrow")] BatchSerializer::Arrow(_) => Some("application/vnd.apache.arrow.stream"), #[cfg(feature = "parquet")] BatchSerializer::Parquet(_) => Some("application/vnd.apache.parquet"), - BatchSerializer::ProtoBatch(_) => None, } } /// Encode a batch of events into a `BatchOutput`. pub fn encode_batch(&self, events: &[Event]) -> Result { match &self.serializer { - #[cfg(feature = "arrow")] BatchSerializer::Arrow(serializer) => { let record_batch = serializer.encode_to_record_batch(events).map_err(|err| { use crate::encoding::ArrowEncodingError; @@ -88,12 +79,6 @@ impl BatchEncoder { })?; Ok(BatchOutput::Arrow(record_batch)) } - BatchSerializer::ProtoBatch(serializer) => { - let records = serializer - .encode_batch(events) - .map_err(|err| Error::SerializingError(Box::new(err)))?; - Ok(BatchOutput::Records(records)) - } #[cfg(feature = "parquet")] BatchSerializer::Parquet(_) => Err(Error::SerializingError(Box::from( "Parquet serializer does not support encode_batch; use the tokio Encoder interface instead", @@ -102,13 +87,12 @@ impl BatchEncoder { } } +#[cfg(feature = "arrow")] impl tokio_util::codec::Encoder> for BatchEncoder { type Error = Error; - #[allow(unused_variables)] fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { match &mut self.serializer { - #[cfg(feature = "arrow")] BatchSerializer::Arrow(serializer) => { serializer.encode(events, buffer).map_err(|err| { use crate::encoding::ArrowEncodingError; @@ -124,9 +108,6 @@ impl tokio_util::codec::Encoder> for BatchEncoder { BatchSerializer::Parquet(serializer) => serializer .encode(events, buffer) .map_err(Error::SerializingError), - BatchSerializer::ProtoBatch(_) => Err(Error::SerializingError(Box::from( - "ProtoBatch serializer does not support the tokio Encoder interface; use BatchEncoder::encode_batch() instead", - ))), } } } @@ -137,6 +118,7 @@ pub enum EncoderKind { /// Uses framing to encode individual events Framed(Box>), /// Encodes events in batches without framing + #[cfg(feature = "arrow")] Batch(BatchEncoder), } diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 33030fd41ff2e..f760ea5c8dd83 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -18,7 +18,6 @@ mod native_json; mod otlp; #[cfg(feature = "parquet")] mod parquet; -mod proto_batch; mod protobuf; mod raw_message; #[cfg(feature = "syslog")] @@ -46,7 +45,6 @@ pub use native::{NativeSerializer, NativeSerializerConfig}; pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; #[cfg(feature = "opentelemetry")] pub use otlp::{OtlpSerializer, OtlpSerializerConfig}; -pub use proto_batch::{ProtoBatchEncodingError, ProtoBatchSerializer, ProtoBatchSerializerConfig}; pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; #[cfg(feature = "syslog")] diff --git a/lib/codecs/src/encoding/format/proto_batch.rs b/lib/codecs/src/encoding/format/proto_batch.rs deleted file mode 100644 index 3bdce405f6b70..0000000000000 --- a/lib/codecs/src/encoding/format/proto_batch.rs +++ /dev/null @@ -1,285 +0,0 @@ -//! Protobuf batch serializer for encoding events as individual protobuf records. -//! -//! Encodes each event in a batch independently into protobuf bytes, producing -//! a `Vec>` where each element is a single serialized protobuf message. - -use prost_reflect::{MessageDescriptor, prost::Message as _}; -use snafu::Snafu; -use std::sync::Arc; -use vector_config::configurable_component; -use vector_core::{config::DataType, event::Event, schema}; -use vrl::protobuf::encode::{Options, encode_message}; - -/// Errors that can occur during protobuf batch encoding -#[derive(Debug, Snafu)] -pub enum ProtoBatchEncodingError { - /// No events provided - #[snafu(display("Cannot encode an empty batch"))] - NoEvents, - - /// Unsupported event type - #[snafu(display("Unsupported event type: only Log events are supported"))] - UnsupportedEventType, - - /// Protobuf encoding failed - #[snafu(display("Protobuf encoding failed: {}", source))] - EncodingFailed { - /// The underlying encoding error - source: vector_common::Error, - }, - - /// Protobuf prost encoding failed - #[snafu(display("Protobuf prost encoding failed: {}", source))] - ProstEncodingFailed { - /// The underlying prost error - source: prost_reflect::prost::EncodeError, - }, -} - -/// Configuration for protobuf batch serialization -#[configurable_component] -#[derive(Clone, Default)] -pub struct ProtoBatchSerializerConfig { - /// The protobuf message descriptor to use for encoding. - #[serde(skip)] - #[configurable(derived)] - pub descriptor: Option, -} - -impl std::fmt::Debug for ProtoBatchSerializerConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ProtoBatchSerializerConfig") - .field( - "descriptor", - &self.descriptor.as_ref().map(|d| d.full_name().to_string()), - ) - .finish() - } -} - -impl ProtoBatchSerializerConfig { - /// Create a new ProtoBatchSerializerConfig with a message descriptor - pub fn new(descriptor: MessageDescriptor) -> Self { - Self { - descriptor: Some(descriptor), - } - } - - /// The data type of events that are accepted by this serializer. - pub fn input_type(&self) -> DataType { - DataType::Log - } - - /// The schema required by the serializer. - pub fn schema_requirement(&self) -> schema::Requirement { - schema::Requirement::empty() - } -} - -/// Protobuf batch serializer that encodes each event into individual protobuf bytes. -#[derive(Clone, Debug)] -pub struct ProtoBatchSerializer { - descriptor: Arc, - options: Options, -} - -impl ProtoBatchSerializer { - /// Create a new ProtoBatchSerializer with the given configuration. - pub fn new(config: ProtoBatchSerializerConfig) -> Result { - let descriptor = config.descriptor.ok_or_else(|| { - vector_common::Error::from("Proto batch serializer requires a message descriptor.") - })?; - - Ok(Self { - descriptor: Arc::new(descriptor), - options: Options { - use_json_names: false, - allow_lossy_string_coercion: true, - }, - }) - } - - /// Encode a batch of events into individual protobuf byte buffers. - pub fn encode_batch(&self, events: &[Event]) -> Result>, ProtoBatchEncodingError> { - if events.is_empty() { - return Err(ProtoBatchEncodingError::NoEvents); - } - - let mut records = Vec::with_capacity(events.len()); - - for event in events { - let dynamic_message = match event { - Event::Log(log) => { - encode_message(&self.descriptor, log.value().clone(), &self.options) - } - Event::Trace(_) | Event::Metric(_) => { - return Err(ProtoBatchEncodingError::UnsupportedEventType); - } - } - .map_err(|source| ProtoBatchEncodingError::EncodingFailed { - source: source.into(), - })?; - - records.push(dynamic_message.encode_to_vec()); - } - - Ok(records) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use prost_reflect::{ - DescriptorPool, DynamicMessage, Value as ProstValue, - prost_types::{ - DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet, - field_descriptor_proto::{Label, Type}, - }, - }; - use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent, Value}; - use vrl::btreemap; - - fn build_descriptor() -> MessageDescriptor { - // message Inner { string label = 1; } - let inner = DescriptorProto { - name: Some("Inner".to_string()), - field: vec![FieldDescriptorProto { - name: Some("label".to_string()), - number: Some(1), - label: Some(Label::Optional as i32), - r#type: Some(Type::String as i32), - ..Default::default() - }], - ..Default::default() - }; - - // message Outer { string name = 1; int64 count = 2; Inner inner = 3; } - let outer = DescriptorProto { - name: Some("Outer".to_string()), - field: vec![ - FieldDescriptorProto { - name: Some("name".to_string()), - number: Some(1), - label: Some(Label::Optional as i32), - r#type: Some(Type::String as i32), - ..Default::default() - }, - FieldDescriptorProto { - name: Some("count".to_string()), - number: Some(2), - label: Some(Label::Optional as i32), - r#type: Some(Type::Int64 as i32), - ..Default::default() - }, - FieldDescriptorProto { - name: Some("inner".to_string()), - number: Some(3), - label: Some(Label::Optional as i32), - r#type: Some(Type::Message as i32), - type_name: Some(".test.Inner".to_string()), - ..Default::default() - }, - ], - nested_type: vec![], - ..Default::default() - }; - - let file = FileDescriptorProto { - name: Some("test.proto".to_string()), - package: Some("test".to_string()), - message_type: vec![outer, inner], - syntax: Some("proto3".to_string()), - ..Default::default() - }; - - let pool = DescriptorPool::from_file_descriptor_set(FileDescriptorSet { file: vec![file] }) - .expect("descriptor pool builds"); - pool.get_message_by_name("test.Outer") - .expect("Outer message exists") - } - - fn make_serializer() -> ProtoBatchSerializer { - ProtoBatchSerializer::new(ProtoBatchSerializerConfig::new(build_descriptor())) - .expect("serializer builds") - } - - #[test] - fn empty_batch_returns_no_events_error() { - let serializer = make_serializer(); - let err = serializer - .encode_batch(&[]) - .expect_err("empty batch errors"); - assert!(matches!(err, ProtoBatchEncodingError::NoEvents)); - } - - #[test] - fn metric_event_is_rejected() { - let serializer = make_serializer(); - let metric = Event::Metric(Metric::new( - "test", - MetricKind::Absolute, - MetricValue::Counter { value: 1.0 }, - )); - let err = serializer - .encode_batch(&[metric]) - .expect_err("metric event errors"); - assert!(matches!(err, ProtoBatchEncodingError::UnsupportedEventType)); - } - - #[test] - fn trace_event_is_rejected() { - let serializer = make_serializer(); - let trace = Event::Trace(TraceEvent::default()); - let err = serializer - .encode_batch(&[trace]) - .expect_err("trace event errors"); - assert!(matches!(err, ProtoBatchEncodingError::UnsupportedEventType)); - } - - #[test] - fn round_trip_decode_preserves_field_mapping() { - let descriptor = build_descriptor(); - let serializer = - ProtoBatchSerializer::new(ProtoBatchSerializerConfig::new(descriptor.clone())) - .expect("serializer builds"); - - let event = Event::Log(LogEvent::from(btreemap! { - "name" => Value::from("hello"), - "count" => Value::from(42_i64), - "inner" => Value::from(btreemap! { - "label" => Value::from("nested"), - }), - })); - - let records = serializer - .encode_batch(&[event]) - .expect("encoding succeeds"); - assert_eq!(records.len(), 1); - - let decoded = - DynamicMessage::decode(descriptor, records[0].as_slice()).expect("decode succeeds"); - - let name_field = decoded - .get_field_by_name("name") - .expect("name field present"); - assert_eq!(name_field.as_str(), Some("hello")); - - let count_field = decoded - .get_field_by_name("count") - .expect("count field present"); - assert_eq!(count_field.as_i64(), Some(42)); - - let inner_field = decoded - .get_field_by_name("inner") - .expect("inner field present"); - let inner_msg = match &*inner_field { - ProstValue::Message(m) => m, - other => panic!("expected nested message, got {:?}", other), - }; - let label = inner_msg - .get_field_by_name("label") - .expect("label field present"); - assert_eq!(label.as_str(), Some("nested")); - } -} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 2c8b485a49d8f..361a62a301533 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -10,7 +10,9 @@ pub mod serializer; mod transformer; pub use chunking::{Chunker, Chunking, GelfChunker}; pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType}; -pub use encoder::{BatchEncoder, BatchOutput, BatchSerializer, Encoder, EncoderKind}; +#[cfg(feature = "arrow")] +pub use encoder::{BatchEncoder, BatchOutput, BatchSerializer}; +pub use encoder::{Encoder, EncoderKind}; #[cfg(feature = "arrow")] pub use format::{ ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider, @@ -21,8 +23,7 @@ pub use format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtoBatchEncodingError, ProtoBatchSerializer, - ProtoBatchSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, + NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; @@ -41,7 +42,9 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; -pub use serializer::{BatchSerializerConfig, Serializer, SerializerConfig}; +#[cfg(feature = "arrow")] +pub use serializer::BatchSerializerConfig; +pub use serializer::{Serializer, SerializerConfig}; pub use transformer::{TimestampFormat, Transformer}; /// An error that occurred while building an encoder. diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index 0a7b07a00fd04..431f0356c3aee 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -10,7 +10,6 @@ use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig}; use super::format::{OtlpSerializer, OtlpSerializerConfig}; #[cfg(feature = "parquet")] use super::format::{ParquetSerializer, ParquetSerializerConfig}; -use super::format::{ProtoBatchSerializer, ProtoBatchSerializerConfig}; #[cfg(feature = "syslog")] use super::format::{SyslogSerializer, SyslogSerializerConfig}; use super::{ @@ -147,6 +146,10 @@ impl Default for SerializerConfig { } /// Batch serializer configuration. +/// +/// Only available when the `arrow` feature is enabled (the `parquet` feature +/// implies `arrow`); all batch serializers produce columnar Arrow/Parquet output. +#[cfg(feature = "arrow")] #[configurable_component] #[derive(Clone, Debug)] #[serde(tag = "codec", rename_all = "snake_case")] @@ -160,7 +163,6 @@ pub enum BatchSerializerConfig { /// a continuous stream of record batches. /// /// [apache_arrow]: https://arrow.apache.org/ - #[cfg(feature = "arrow")] #[serde(rename = "arrow_stream")] ArrowStream(ArrowStreamSerializerConfig), /// Encodes events in [Apache Parquet][apache_parquet] columnar format. @@ -169,24 +171,15 @@ pub enum BatchSerializerConfig { #[cfg(feature = "parquet")] #[serde(rename = "parquet")] Parquet(ParquetSerializerConfig), - - /// Encodes each event individually as a [Protocol Buffers][protobuf] message. - /// - /// Each event in the batch is serialized to protobuf bytes independently, - /// producing a list of byte buffers (one per event). - /// - /// [protobuf]: https://protobuf.dev/ - #[serde(rename = "proto_batch")] - ProtoBatch(ProtoBatchSerializerConfig), } +#[cfg(feature = "arrow")] impl BatchSerializerConfig { /// Build the batch serializer from this configuration. pub fn build_batch_serializer( &self, ) -> Result> { match self { - #[cfg(feature = "arrow")] BatchSerializerConfig::ArrowStream(arrow_config) => { let serializer = ArrowStreamSerializer::new(arrow_config.clone())?; Ok(super::BatchSerializer::Arrow(serializer)) @@ -196,32 +189,24 @@ impl BatchSerializerConfig { let serializer = ParquetSerializer::new(parquet_config.clone())?; Ok(super::BatchSerializer::Parquet(Box::new(serializer))) } - BatchSerializerConfig::ProtoBatch(proto_config) => { - let serializer = ProtoBatchSerializer::new(proto_config.clone())?; - Ok(super::BatchSerializer::ProtoBatch(serializer)) - } } } /// The data type of events that are accepted by this batch serializer. pub fn input_type(&self) -> DataType { match self { - #[cfg(feature = "arrow")] BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(), #[cfg(feature = "parquet")] BatchSerializerConfig::Parquet(parquet_config) => parquet_config.input_type(), - BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.input_type(), } } /// The schema required by the batch serializer. pub fn schema_requirement(&self) -> schema::Requirement { match self { - #[cfg(feature = "arrow")] BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(), #[cfg(feature = "parquet")] BatchSerializerConfig::Parquet(parquet_config) => parquet_config.schema_requirement(), - BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.schema_requirement(), } } } diff --git a/lib/codecs/src/lib.rs b/lib/codecs/src/lib.rs index 57f8af766ab71..b1614a75139a9 100644 --- a/lib/codecs/src/lib.rs +++ b/lib/codecs/src/lib.rs @@ -25,15 +25,17 @@ pub use decoding::{ }; #[cfg(feature = "syslog")] pub use decoding::{SyslogDeserializer, SyslogDeserializerConfig}; +#[cfg(feature = "arrow")] +pub use encoding::{BatchEncoder, BatchSerializer}; pub use encoding::{ - BatchEncoder, BatchSerializer, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, - CharacterDelimitedEncoderConfig, CsvSerializer, CsvSerializerConfig, Encoder, EncoderKind, - EncodingConfig, EncodingConfigWithFraming, GelfSerializer, GelfSerializerConfig, - JsonSerializer, JsonSerializerConfig, LengthDelimitedEncoder, LengthDelimitedEncoderConfig, - LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, - NativeSerializer, NativeSerializerConfig, NewlineDelimitedEncoder, - NewlineDelimitedEncoderConfig, RawMessageSerializer, RawMessageSerializerConfig, SinkType, - TextSerializer, TextSerializerConfig, TimestampFormat, Transformer, + BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, CharacterDelimitedEncoderConfig, + CsvSerializer, CsvSerializerConfig, Encoder, EncoderKind, EncodingConfig, + EncodingConfigWithFraming, GelfSerializer, GelfSerializerConfig, JsonSerializer, + JsonSerializerConfig, LengthDelimitedEncoder, LengthDelimitedEncoderConfig, LogfmtSerializer, + LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, + NativeSerializerConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, + RawMessageSerializer, RawMessageSerializerConfig, SinkType, TextSerializer, + TextSerializerConfig, TimestampFormat, Transformer, }; pub use gelf::{VALID_FIELD_REGEX, gelf_fields}; pub use ready_frames::ReadyFrames; diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs index 1df2f62a6f1c3..b62c668f69c44 100644 --- a/src/sinks/databricks_zerobus/config.rs +++ b/src/sinks/databricks_zerobus/config.rs @@ -232,11 +232,13 @@ impl ZerobusSinkConfig { } if let Some(max_bytes) = self.batch.max_bytes { - // Zerobus SDK limits max bytes to 10MB. This cap is a conservative safety limit: - // it's measured against Vector's pre-serialization sizing, not the protobuf bytes - // the SDK actually sends. Vector's pre-serialization size is generally larger than - // the SDK's protobuf-encoded size, so enforcing the 10MB cap here ensures the SDK's - // 10MB limit cannot be exceeded at runtime. + // Zerobus SDK limits max bytes to 10MB. This cap is a coarse safety + // limit: it's measured against Vector's pre-serialization (estimated + // JSON) sizing, not the encoded Arrow bytes the SDK actually sends. + // The two differ — for numeric-heavy schemas the encoded Arrow batch + // can be larger than the source events — so a batch configured right + // at the boundary may still exceed the SDK's limit; lower max_bytes to + // leave headroom if you see SDK-side size errors. if max_bytes > 10_000_000 { return Err(ZerobusSinkError::ConfigError { message: "max_bytes must be less than or equal to 10MB".to_string(), diff --git a/src/sinks/databricks_zerobus/error.rs b/src/sinks/databricks_zerobus/error.rs index 1e9a630415724..d79ca970449bc 100644 --- a/src/sinks/databricks_zerobus/error.rs +++ b/src/sinks/databricks_zerobus/error.rs @@ -28,11 +28,6 @@ pub enum ZerobusSinkError { #[snafu(display("Record ingestion failed: {}", source))] IngestionError { source: ZerobusError }, - /// The SDK returned no offset to wait on, so server acceptance of the - /// batch cannot be confirmed. Treated as non-retryable. - #[snafu(display("Zerobus ingest returned no offset; cannot confirm server acceptance"))] - MissingAckOffset, - /// The shared stream was closed concurrently (by shutdown or retry-driven /// replacement) before this ingest could run. Retryable: the next attempt /// will create a fresh stream via `get_or_create_stream`. @@ -55,7 +50,7 @@ impl ZerobusSinkError { | Self::IngestionError { source } => source.is_retryable(), Self::StreamClosed => true, Self::SchemaError { retryable, .. } => *retryable, - Self::ConfigError { .. } | Self::EncodingError { .. } | Self::MissingAckOffset => false, + Self::ConfigError { .. } | Self::EncodingError { .. } => false, } } diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs index 076488e55bf49..8a4c2f62e7597 100644 --- a/src/sinks/databricks_zerobus/service.rs +++ b/src/sinks/databricks_zerobus/service.rs @@ -5,14 +5,16 @@ use crate::event::Event; use crate::http::HttpClient; use crate::sinks::util::retries::RetryLogic; use crate::tls::TlsSettings; -use databricks_zerobus_ingest_sdk::{ConnectorFactory, ProxyConnector, ZerobusSdk, ZerobusStream}; +use databricks_zerobus_ingest_sdk::{ + ConnectorFactory, ProxyConnector, ZerobusArrowStream, ZerobusSdk, +}; use futures::future::BoxFuture; use std::sync::Arc; use tokio::sync::{Mutex, OnceCell, RwLock}; use tower::{Layer, Service}; use tracing::warn; use vector_lib::codecs::encoding::{ - BatchEncoder, BatchOutput, BatchSerializerConfig, ProtoBatchSerializerConfig, + ArrowStreamSerializerConfig, BatchEncoder, BatchOutput, BatchSerializerConfig, }; use vector_lib::finalization::{EventFinalizers, Finalizable}; use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; @@ -130,23 +132,23 @@ impl MetaDescriptive for ZerobusRequest { /// The active stream. /// -/// The SDK's `ZerobusStream::close()` requires `&mut self`, but ingests need +/// The SDK's `ZerobusArrowStream::close()` requires `&mut self`, but ingests need /// shared access to call `&self` methods concurrently. We resolve this with an -/// `RwLock`: ingests hold a read guard across `ingest_records_offset`, and +/// `RwLock`: ingests hold a read guard across `ingest_batch`, and /// `close()` takes the write guard, pulls the stream out of the `Option`, and /// awaits its SDK-level close on the owned value. Any holder of an `Arc` can /// invoke `close()`, so the graceful path always runs — there is no /// `try_unwrap`/`get_mut` race. enum ActiveStream { - Proto(RwLock>>), + Arrow(RwLock>>), /// Test-only variant that returns a pre-configured error on ingest. #[cfg(test)] Mock(MockStream), } impl ActiveStream { - fn proto(stream: ZerobusStream) -> Self { - ActiveStream::Proto(RwLock::new(Some(Box::new(stream)))) + fn arrow(stream: ZerobusArrowStream) -> Self { + ActiveStream::Arrow(RwLock::new(Some(Box::new(stream)))) } /// Gracefully flush and close the underlying SDK stream. @@ -160,7 +162,7 @@ impl ActiveStream { /// The SDK's own `Drop` is also a no-op once close has run. async fn close(&self) { let result = match self { - ActiveStream::Proto(lock) => { + ActiveStream::Arrow(lock) => { let taken = lock.write().await.take(); match taken { Some(mut stream) => stream.close().await, @@ -277,9 +279,27 @@ impl MockStream { /// Schema and encoding state derived from the Unity Catalog table. pub(super) struct ResolvedSchema { encoder: BatchEncoder, - /// SDK-typed (prost-types 0.14) descriptor — held in this form so each - /// stream rebuild avoids re-encoding from the prost-reflect 0.13 form. - descriptor_proto: Arc, + /// Arrow schema used to declare the Zerobus stream. Held behind an `Arc` so + /// each stream rebuild after a retryable failure clones it cheaply, and so it + /// matches the schema the Arrow batch encoder produces. + arrow_schema: Arc, +} + +#[cfg(test)] +impl ResolvedSchema { + /// Build a `ResolvedSchema` directly from an Arrow schema, mirroring what + /// `ensure_schema` does after a Unity Catalog fetch. Lets encoding tests + /// exercise the real `BatchEncoder` without a network round-trip. + fn for_test(schema: arrow::datatypes::Schema) -> Self { + let batch_serializer = + BatchSerializerConfig::ArrowStream(ArrowStreamSerializerConfig::new(schema.clone())) + .build_batch_serializer() + .expect("arrow batch serializer should build"); + Self { + encoder: BatchEncoder::new(batch_serializer), + arrow_schema: Arc::new(schema), + } + } } /// Service for handling Zerobus requests. @@ -319,22 +339,15 @@ impl ZerobusService { }) } - /// Resolve the protobuf message descriptor from Unity Catalog. + /// Resolve the Arrow schema for the target table from Unity Catalog. /// - /// Returns both the prost-reflect `MessageDescriptor` (used by the proto - /// batch encoder) and the SDK-typed `DescriptorProto` (used to construct - /// Zerobus streams). Returning both avoids re-encoding the descriptor - /// every time a stream is rebuilt after a retryable failure. - async fn resolve_descriptor( + /// The returned schema is used both to declare the Zerobus Arrow stream and + /// to drive the Arrow batch encoder, keeping the encoded `RecordBatch` schema + /// in lock-step with the stream's declared schema. + async fn resolve_arrow_schema( config: &ZerobusSinkConfig, http_client: &HttpClient, - ) -> Result< - ( - prost_reflect::MessageDescriptor, - prost_types_014::DescriptorProto, - ), - ZerobusSinkError, - > { + ) -> Result { let (client_id, client_secret) = config.auth.credentials(); let table_schema = unity_catalog_schema::fetch_table_schema( @@ -346,50 +359,53 @@ impl ZerobusService { ) .await?; - unity_catalog_schema::generate_descriptor_from_schema(&table_schema) + unity_catalog_schema::generate_arrow_schema_from_schema(&table_schema) } /// Resolve the schema on first use; cache the result. pub(super) async fn ensure_schema(&self) -> Result<&ResolvedSchema, ZerobusSinkError> { self.schema .get_or_try_init(|| async { - let (descriptor, sdk_descriptor_proto) = - Self::resolve_descriptor(&self.config, &self.http_client).await?; - let descriptor_proto = Arc::new(sdk_descriptor_proto); - - let batch_serializer = - BatchSerializerConfig::ProtoBatch(ProtoBatchSerializerConfig { - descriptor: Some(descriptor), - }) - .build_batch_serializer() - .map_err(|e| ZerobusSinkError::ConfigError { - message: format!("Failed to build batch serializer: {}", e), - })?; + let arrow_schema = + Self::resolve_arrow_schema(&self.config, &self.http_client).await?; + + let batch_serializer = BatchSerializerConfig::ArrowStream( + ArrowStreamSerializerConfig::new(arrow_schema.clone()), + ) + .build_batch_serializer() + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to build batch serializer: {}", e), + })?; Ok(ResolvedSchema { encoder: BatchEncoder::new(batch_serializer), - descriptor_proto, + arrow_schema: Arc::new(arrow_schema), }) }) .await } - pub(super) fn encode_records( + /// Encode the whole batch into a single Arrow `RecordBatch`. + /// + /// Encoding is all-or-nothing: if any event fails to encode against the + /// table's Arrow schema — most commonly an event missing (or null on) a + /// column the Unity Catalog table declares `NOT NULL` — the entire batch + /// fails with a non-retryable `EncodingError` and is dropped. UC columns are + /// nullable by default, so this only affects tables with explicit `NOT NULL` + /// columns. The underlying codec emits `EncoderNullConstraintError` naming + /// the offending field(s). + pub(super) fn encode_batch( schema: &ResolvedSchema, events: &[Event], - ) -> Result>, ZerobusSinkError> { - match schema - .encoder - .encode_batch(events) - .map_err(|e| ZerobusSinkError::EncodingError { - message: format!("Failed to encode batch: {}", e), - })? { - BatchOutput::Records(records) => Ok(records), - #[cfg(feature = "codecs-arrow")] - BatchOutput::Arrow(_) => Err(ZerobusSinkError::EncodingError { - message: "The Databricks Zerobus sink only supports proto-batch output.".into(), - }), - } + ) -> Result { + let BatchOutput::Arrow(batch) = + schema + .encoder + .encode_batch(events) + .map_err(|e| ZerobusSinkError::EncodingError { + message: format!("Failed to encode batch: {}", e), + })?; + Ok(batch) } /// Ensure we have an active stream, creating one if necessary. @@ -416,20 +432,28 @@ impl ZerobusService { let (client_id, client_secret) = self.config.auth.credentials(); let (client_id, client_secret) = (client_id.to_string(), client_secret.to_string()); + // We override only the two timeouts that `stream_options` exposes and + // otherwise accept the SDK's Arrow-stream defaults — notably + // `recovery = true`, so the SDK transparently reconnects and replays + // in-flight batches on transient stream errors. That layers under + // Vector's own retry: the SDK absorbs brief blips, and only surfaces a + // retryable error (triggering a fresh stream via Tower retry) once its + // own recovery budget is exhausted. Both layers are at-least-once, so + // a reconnect may re-send unacknowledged batches. let stream_options = &self.config.stream_options; let stream = self .sdk .stream_builder() .table(self.config.table_name.clone()) .oauth(client_id, client_secret) - .compiled_proto((*schema.descriptor_proto).clone()) + .arrow(Arc::clone(&schema.arrow_schema)) .server_lack_of_ack_timeout_ms(stream_options.server_lack_of_ack_timeout_ms) .flush_timeout_ms(stream_options.flush_timeout_ms) - .build() + .build_arrow() .await .map_err(|e| ZerobusSinkError::StreamInitError { source: e })?; - *stream_guard = Some(Arc::new(ActiveStream::proto(stream))); + *stream_guard = Some(Arc::new(ActiveStream::arrow(stream))); } Ok(Arc::clone(stream_guard.as_ref().unwrap())) @@ -457,22 +481,19 @@ impl ZerobusService { async fn ingest( &self, stream: Arc, - records: Vec>, + batch: arrow::record_batch::RecordBatch, events_byte_size: GroupedCountByteSize, ) -> Result { // Slot lock is not held here — concurrent ingests acquire read guards // on the inner `RwLock` and run truly in parallel. let result = match stream.as_ref() { - ActiveStream::Proto(lock) => { + ActiveStream::Arrow(lock) => { let guard = lock.read().await; let Some(s) = guard.as_ref() else { return Err(ZerobusSinkError::StreamClosed); }; - match s.ingest_records_offset(records).await { - Ok(Some(offset)) => s.wait_for_offset(offset).await.map(|_| ()), - Ok(None) => { - return Err(ZerobusSinkError::MissingAckOffset); - } + match s.ingest_batch(batch).await { + Ok(offset) => s.wait_for_offset(offset).await.map(|_| ()), Err(e) => Err(e), } } @@ -524,9 +545,9 @@ impl Service for ZerobusService { Box::pin(async move { let schema = service.ensure_schema().await?; - let records = Self::encode_records(schema, &request.events)?; + let batch = Self::encode_batch(schema, &request.events)?; let stream = service.get_or_create_stream(schema).await?; - service.ingest(stream, records, events_byte_size).await + service.ingest(stream, batch, events_byte_size).await }) } } @@ -698,8 +719,10 @@ mod tests { } } - fn dummy_records() -> Vec> { - vec![vec![1, 2, 3]] + fn dummy_batch() -> arrow::record_batch::RecordBatch { + // The mock stream ignores the batch contents, so an empty batch with an + // empty schema is sufficient for the ingest-path tests. + arrow::record_batch::RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty())) } async fn current_stream(service: &ZerobusService) -> Arc { @@ -714,11 +737,7 @@ mod tests { let stream = current_stream(&service).await; let result = service - .ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged(), - ) + .ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) .await; assert!(result.is_ok()); @@ -738,11 +757,7 @@ mod tests { let stream = current_stream(&service).await; let err = service - .ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged(), - ) + .ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) .await .unwrap_err(); @@ -763,11 +778,7 @@ mod tests { let stream = current_stream(&service).await; let err = service - .ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged(), - ) + .ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) .await .unwrap_err(); @@ -789,11 +800,7 @@ mod tests { let stream = current_stream(&service).await; assert!( service - .ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged() - ) + .ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) .await .is_ok() ); @@ -812,11 +819,7 @@ mod tests { // Second ingest fails and clears the stream. let stream = current_stream(&service).await; let err = service - .ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged(), - ) + .ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) .await .unwrap_err(); assert!(ZerobusRetryLogic.is_retriable_error(&err)); @@ -830,11 +833,7 @@ mod tests { let stream = current_stream(&service).await; assert!( service - .ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged() - ) + .ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) .await .is_ok() ); @@ -882,22 +881,14 @@ mod tests { let s1 = service.clone(); let t1 = tokio::spawn(async move { let stream = current_stream(&s1).await; - s1.ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged(), - ) - .await + s1.ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) + .await }); let s2 = service.clone(); let t2 = tokio::spawn(async move { let stream = current_stream(&s2).await; - s2.ingest( - stream, - dummy_records(), - GroupedCountByteSize::new_untagged(), - ) - .await + s2.ingest(stream, dummy_batch(), GroupedCountByteSize::new_untagged()) + .await }); // Wait until both ingests are inside the gate (both `Arc`s alive). @@ -1021,4 +1012,80 @@ mod tests { .unwrap(); assert_eq!(resp.status, vector_lib::event::EventStatus::Delivered); } + + /// Encode real log events against a schema covering the common UC→Arrow + /// types and assert the resulting `RecordBatch` columns, types, and a null + /// in a nullable column. Exercises the production `encode_batch` path + /// (`ArrowStreamSerializer` → `RecordBatch`), which the mock stream tests + /// bypass. + #[test] + fn encode_batch_maps_events_to_record_batch() { + use crate::event::LogEvent; + use arrow::array::{Array, AsArray}; + use arrow::datatypes::{ + DataType, Field, Int64Type, Schema, TimeUnit, TimestampMicrosecondType, + }; + use chrono::Utc; + + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("body", DataType::LargeUtf8, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ]); + let resolved = ResolvedSchema::for_test(schema); + + let mut e1 = LogEvent::default(); + e1.insert("id", 1i64); + e1.insert("body", "hello"); + e1.insert("ts", Utc::now()); + + let mut e2 = LogEvent::default(); + e2.insert("id", 2i64); + // `body` and `ts` omitted — both nullable, so they encode as null. + + let batch = + ZerobusService::encode_batch(&resolved, &[Event::Log(e1), Event::Log(e2)]).unwrap(); + + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + + let ids = batch.column(0).as_primitive::(); + assert_eq!(ids.value(0), 1); + assert_eq!(ids.value(1), 2); + + // LargeUtf8 -> LargeStringArray (i64 offsets). + let body = batch.column(1).as_string::(); + assert_eq!(body.value(0), "hello"); + assert!(body.is_null(1)); + + let ts = batch.column(2).as_primitive::(); + assert!(!ts.is_null(0)); + assert!(ts.is_null(1)); + } + + /// An event missing a column the table declares `NOT NULL` fails the whole + /// batch with a non-retryable `EncodingError` (the batch is dropped, not + /// replayed). Locks in the documented strict-null behavior. + #[test] + fn encode_batch_rejects_event_missing_non_nullable_field() { + use crate::event::LogEvent; + use arrow::datatypes::{DataType, Field, Schema}; + + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, false), // NOT NULL + Field::new("body", DataType::LargeUtf8, true), + ]); + let resolved = ResolvedSchema::for_test(schema); + + let mut e = LogEvent::default(); + e.insert("body", "no id here"); // `id` omitted + + let err = ZerobusService::encode_batch(&resolved, &[Event::Log(e)]).unwrap_err(); + assert!(matches!(err, ZerobusSinkError::EncodingError { .. })); + assert!(!err.is_retryable()); + } } diff --git a/src/sinks/databricks_zerobus/unity_catalog_schema.rs b/src/sinks/databricks_zerobus/unity_catalog_schema.rs index 343604512242d..c4637cb58e9c1 100644 --- a/src/sinks/databricks_zerobus/unity_catalog_schema.rs +++ b/src/sinks/databricks_zerobus/unity_catalog_schema.rs @@ -1,16 +1,15 @@ -//! Unity Catalog schema fetching and protobuf descriptor generation. +//! Unity Catalog schema fetching and Arrow schema generation. //! -//! The UC-to-protobuf conversion is delegated to +//! The UC-to-Arrow conversion is delegated to //! [`databricks_zerobus_ingest_sdk::schema`]; this module only wraps it with -//! the HTTP fetching + descriptor-pool assembly that the sink needs. +//! the HTTP fetching that the sink needs. use bytes::Buf; -use databricks_zerobus_ingest_sdk::schema::descriptor_from_uc_schema; +use databricks_zerobus_ingest_sdk::schema::arrow_schema_from_uc_schema; use http::{Request, StatusCode, Uri}; use http_body::Body as HttpBody; use hyper::Body; use percent_encoding::{NON_ALPHANUMERIC, percent_encode}; -use prost_reflect::prost_types; use serde::Deserialize; use super::error::ZerobusSinkError; @@ -206,208 +205,32 @@ async fn get_oauth_token( Ok(token_response.access_token) } -/// Format a protobuf MessageDescriptor as a .proto file string for logging -fn format_descriptor_as_proto(descriptor: &prost_reflect::MessageDescriptor) -> String { - let mut output = String::new(); - format_message_as_proto(descriptor, &mut output, 0); - output -} - -/// Recursively format a message and its nested types -fn format_message_as_proto( - descriptor: &prost_reflect::MessageDescriptor, - output: &mut String, - indent_level: usize, -) { - let indent = " ".repeat(indent_level); - - // Write message header - output.push_str(&format!("{}message {} {{\n", indent, descriptor.name())); - - // Write fields - for field in descriptor.fields() { - let field_indent = " ".repeat(indent_level + 1); - let field_type = format_field_type(&field); - let field_number = field.number(); - output.push_str(&format!( - "{}{}{} = {};\n", - field_indent, - field_type, - field.name(), - field_number - )); - } - - output.push_str(&format!("{}}}\n", indent)); - - // Write nested message types - for nested in descriptor.child_messages() { - output.push('\n'); - format_message_as_proto(&nested, output, indent_level); - } -} - -/// Format a field's type declaration -fn format_field_type(field: &prost_reflect::FieldDescriptor) -> String { - use prost_reflect::Kind; - - if field.is_map() { - // Map fields: map field_name - if let Kind::Message(map_entry) = field.kind() { - let key_field = map_entry.fields().find(|f| f.name() == "key").unwrap(); - let value_field = map_entry.fields().find(|f| f.name() == "value").unwrap(); - let key_type = format_scalar_type(&key_field); - let value_type = format_scalar_type(&value_field); - return format!("map<{}, {}> ", key_type, value_type); - } - } - - let base_type = match field.kind() { - Kind::Message(msg) => msg.name().to_string(), - kind => format_kind_type(&kind), - }; - - if field.is_list() { - format!("repeated {} ", base_type) - } else { - format!("{} ", base_type) - } -} - -/// Format a scalar field type (for map keys/values) -fn format_scalar_type(field: &prost_reflect::FieldDescriptor) -> String { - match field.kind() { - prost_reflect::Kind::Message(msg) => msg.name().to_string(), - kind => format_kind_type(&kind), - } -} - -/// Map Kind enum to proto type string -fn format_kind_type(kind: &prost_reflect::Kind) -> String { - use prost_reflect::Kind; - match kind { - Kind::Double => "double".into(), - Kind::Float => "float".into(), - Kind::Int32 => "int32".into(), - Kind::Int64 => "int64".into(), - Kind::Uint32 => "uint32".into(), - Kind::Uint64 => "uint64".into(), - Kind::Sint32 => "sint32".into(), - Kind::Sint64 => "sint64".into(), - Kind::Fixed32 => "fixed32".into(), - Kind::Fixed64 => "fixed64".into(), - Kind::Sfixed32 => "sfixed32".into(), - Kind::Sfixed64 => "sfixed64".into(), - Kind::Bool => "bool".into(), - Kind::String => "string".into(), - Kind::Bytes => "bytes".into(), - Kind::Message(msg) => msg.name().to_string(), - Kind::Enum(e) => e.name().to_string(), - } -} - -/// Generate a protobuf message descriptor from a Unity Catalog table schema. +/// Generate an Arrow schema from a Unity Catalog table schema. /// -/// The core UC-type → protobuf conversion lives in -/// [`databricks_zerobus_ingest_sdk::schema::descriptor_from_uc_schema`]; this -/// wrapper adds the `FileDescriptorProto` / `DescriptorPool` plumbing that -/// Vector needs to get a `prost_reflect::MessageDescriptor` usable for -/// dynamic message encoding. +/// The core UC-type → Arrow conversion lives in +/// [`databricks_zerobus_ingest_sdk::schema::arrow_schema_from_uc_schema`], which +/// produces the canonical Arrow schema the Databricks Arrow Flight server expects +/// (`STRING` → `LargeUtf8`, `TIMESTAMP` → `Timestamp(Microsecond, UTC)`, etc.). /// -/// Returns both the `MessageDescriptor` (for dynamic encoding via prost-reflect) -/// and the SDK-typed `DescriptorProto` (prost-types 0.14, used when constructing -/// the Zerobus stream). Returning both lets us avoid re-encoding the descriptor -/// on every stream rebuild. -pub fn generate_descriptor_from_schema( +/// The returned schema is used both to declare the Zerobus Arrow stream and to +/// drive the Arrow batch encoder, so a single source of truth keeps the encoded +/// `RecordBatch` schema in lock-step with the stream's declared schema. +pub fn generate_arrow_schema_from_schema( schema: &UnityCatalogTableSchema, -) -> Result< - ( - prost_reflect::MessageDescriptor, - prost_types_014::DescriptorProto, - ), - ZerobusSinkError, -> { - let sdk_message_proto = - descriptor_from_uc_schema(schema).map_err(|e| ZerobusSinkError::ConfigError { - message: format!("Failed to convert Unity Catalog schema to protobuf: {}", e), - })?; - - // The SDK returns a prost-types 0.14 DescriptorProto, but prost-reflect (used - // below to build the descriptor pool) is on prost-types 0.13. Re-encode through - // the protobuf wire format to bridge the two versions. - let message_name = sdk_message_proto.name().to_string(); - let encoded = prost_014::Message::encode_to_vec(&sdk_message_proto); - let message_proto = - ::decode( - encoded.as_slice(), - ) - .map_err(|e| ZerobusSinkError::ConfigError { - message: format!("Failed to re-decode SDK DescriptorProto: {}", e), - })?; - let package_name = sanitize_package_name(&schema.catalog_name); - - let file_proto = prost_types::FileDescriptorProto { - name: Some(format!("{}.proto", message_name)), - package: Some(package_name.clone()), - message_type: vec![message_proto], - ..Default::default() - }; - - let file_descriptor_set = prost_types::FileDescriptorSet { - file: vec![file_proto], - }; - - let pool = prost_reflect::DescriptorPool::from_file_descriptor_set(file_descriptor_set) - .map_err(|e| ZerobusSinkError::ConfigError { - message: format!("Failed to build descriptor pool: {}", e), - })?; - - let full_message_name = format!("{}.{}", package_name, message_name); - let message_descriptor = pool - .get_message_by_name(&full_message_name) - .ok_or_else(|| ZerobusSinkError::ConfigError { - message: format!("Failed to get message descriptor for {}", full_message_name), +) -> Result { + let arrow_schema = + arrow_schema_from_uc_schema(schema).map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to convert Unity Catalog schema to Arrow: {}", e), })?; if tracing::enabled!(tracing::Level::INFO) { - let proto_schema = format_descriptor_as_proto(&message_descriptor); info!( - "Inferred protobuf schema from Unity Catalog table {}.{}.{}:\n{}", - schema.catalog_name, schema.schema_name, schema.name, proto_schema + "Inferred Arrow schema from Unity Catalog table {}.{}.{}:\n{:#?}", + schema.catalog_name, schema.schema_name, schema.name, arrow_schema ); } - Ok((message_descriptor, sdk_message_proto)) -} - -/// Default prefix for package name segments that start with a non-letter. -const PACKAGE_SEGMENT_PREFIX: char = 'p'; - -/// Sanitize a string for use as a protobuf package name. -/// -/// Package identifiers allow `[a-zA-Z][a-zA-Z0-9_]*` segments separated by `.`. -/// Invalid characters are replaced with `_` and each segment is ensured to start -/// with a letter. -fn sanitize_package_name(name: &str) -> String { - name.split('.') - .map(|segment| { - let mut s: String = segment - .chars() - .map(|c| { - if c.is_ascii_alphanumeric() || c == '_' { - c - } else { - '_' - } - }) - .collect(); - if s.is_empty() || !s.starts_with(|c: char| c.is_ascii_alphabetic()) { - s.insert(0, PACKAGE_SEGMENT_PREFIX); - } - s - }) - .collect::>() - .join(".") + Ok(arrow_schema) } #[cfg(test)] @@ -432,10 +255,13 @@ mod tests { assert!(!status_is_retryable(StatusCode::BAD_REQUEST)); } - /// Smoke test: the wrapper calls into the SDK and builds a usable - /// `MessageDescriptor` via the descriptor pool. + /// Smoke test: the wrapper calls into the SDK and produces an Arrow schema + /// with the expected fields and the Databricks-canonical type mapping + /// (`STRING` → `LargeUtf8`, `BIGINT` → `Int64`). #[test] - fn test_generate_descriptor_simple_schema() { + fn test_generate_arrow_schema_simple_schema() { + use arrow::datatypes::DataType; + let schema = UnityCatalogTableSchema { name: "test_table".to_string(), catalog_name: "test_catalog".to_string(), @@ -460,71 +286,70 @@ mod tests { ], }; - let (descriptor, _sdk_proto) = - generate_descriptor_from_schema(&schema).expect("descriptor should be generated"); - assert_eq!(descriptor.fields().len(), 2); - assert!(descriptor.get_field_by_name("id").is_some()); - assert!(descriptor.get_field_by_name("body").is_some()); + let arrow_schema = + generate_arrow_schema_from_schema(&schema).expect("arrow schema should be generated"); + assert_eq!(arrow_schema.fields().len(), 2); + + let id = arrow_schema.field_with_name("id").expect("id field"); + assert_eq!(id.data_type(), &DataType::Int64); + assert!(!id.is_nullable()); + + let body = arrow_schema.field_with_name("body").expect("body field"); + assert_eq!(body.data_type(), &DataType::LargeUtf8); + assert!(body.is_nullable()); } - /// Snapshot test for the proto-text formatter used in info logging. - /// The UC→proto conversion itself is covered by the SDK's own tests; - /// this guards the local `format_descriptor_as_proto` rendering. + /// Exercises the UC→Arrow conversion on a schema with nested structs and + /// arrays. The conversion itself is owned by the SDK; this guards that the + /// wrapper feeds the fixture through and yields the expected complex fields. #[test] - fn test_proto_schema_snapshot() { + fn test_arrow_schema_nested_structs() { + use arrow::datatypes::DataType; + let json = include_str!("tests/fixtures/nested_structs_complete_schema.json"); let schema: UnityCatalogTableSchema = serde_json::from_str(json).expect("Failed to parse nested_structs_complete schema"); - let (descriptor, _sdk_proto) = - generate_descriptor_from_schema(&schema).expect("Failed to generate descriptor"); - - let proto_text = format_descriptor_as_proto(&descriptor); + let arrow_schema = + generate_arrow_schema_from_schema(&schema).expect("Failed to generate arrow schema"); - assert!( - proto_text.contains("message TestSchemaNestedStructsTable"), - "Proto should have main message definition" - ); - assert!( - proto_text.contains("string field_003"), - "Proto should have field_003 (string)" - ); - assert!( - proto_text.contains("int64 field_007"), - "Proto should have field_007 (int64)" - ); - assert!( - proto_text.contains("repeated int64 field_027"), - "Proto should have field_027 as repeated int64" - ); - assert!( - proto_text.contains("message Field018"), - "Proto should have Field018 nested message" - ); - assert!( - proto_text.contains("message Field021"), - "Proto should have Field021 nested message" + // Primitive fields map to their canonical Arrow types. + assert_eq!( + arrow_schema + .field_with_name("field_003") + .unwrap() + .data_type(), + &DataType::LargeUtf8, ); - assert!( - proto_text.contains("message Field008"), - "Proto should have Field008 nested message" + assert_eq!( + arrow_schema + .field_with_name("field_007") + .unwrap() + .data_type(), + &DataType::Int64, ); - } - #[test] - fn test_sanitize_package_name_non_ascii() { - // Non-ASCII alphanumeric characters (e.g. accented letters, CJK) are - // valid for `char::is_alphanumeric` but not for protobuf identifiers, - // so they must be replaced with `_`. - assert_eq!(sanitize_package_name("café"), "caf_"); - assert_eq!(sanitize_package_name("日本.tbl"), "p__.tbl"); - assert_eq!(sanitize_package_name("naïve.schema"), "na_ve.schema"); - } + // ARRAY becomes a List field. + assert!(matches!( + arrow_schema + .field_with_name("field_027") + .unwrap() + .data_type(), + DataType::List(_), + )); - #[test] - fn test_sanitize_package_name_ascii_preserved() { - assert_eq!(sanitize_package_name("main.default_v2"), "main.default_v2"); - assert_eq!(sanitize_package_name("1abc"), "p1abc"); - assert_eq!(sanitize_package_name("_x"), "p_x"); + // STRUCT columns become Struct fields. + for struct_field in ["field_018", "field_021", "field_008"] { + assert!( + matches!( + arrow_schema + .field_with_name(struct_field) + .unwrap() + .data_type(), + DataType::Struct(_), + ), + "{struct_field} should be a Struct" + ); + } } } diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index b07e439bd0c88..63f8407cef53b 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -99,6 +99,7 @@ impl Encoder for (Transformer, vector_lib::codecs::Encoder<()>) { } } +#[cfg(feature = "codecs-arrow")] impl Encoder> for (Transformer, vector_lib::codecs::BatchEncoder) { fn encode_input( &self, @@ -155,6 +156,7 @@ impl Encoder> for (Transformer, vector_lib::codecs::EncoderKind) { vector_lib::codecs::EncoderKind::Framed(encoder) => { (self.0.clone(), *encoder.clone()).encode_input(events, writer) } + #[cfg(feature = "codecs-arrow")] vector_lib::codecs::EncoderKind::Batch(encoder) => { (self.0.clone(), encoder.clone()).encode_input(events, writer) } diff --git a/website/cue/reference/components/sinks/databricks_zerobus.cue b/website/cue/reference/components/sinks/databricks_zerobus.cue index f9510774dfe16..e666124a17101 100644 --- a/website/cue/reference/components/sinks/databricks_zerobus.cue +++ b/website/cue/reference/components/sinks/databricks_zerobus.cue @@ -83,25 +83,31 @@ components: sinks: databricks_zerobus: { schema: { title: "Schema" body: """ - The sink requires a schema to encode events into protobuf format. + The sink requires a schema to encode events into Arrow format. The sink automatically fetches the table schema from the Unity Catalog API at startup using the configured `table_name` and `unity_catalog_endpoint`, ensuring the schema always matches the target table. No additional schema configuration is required. + + Columns are nullable by default. If the target table declares a column + `NOT NULL`, then every event in a batch must provide a non-null value for + that column; otherwise the entire batch fails encoding and is dropped + (an error is logged naming the offending column). Prefer nullable columns + for fields that are not always present. """ } batching: { title: "Batching" body: """ - Events are batched before being sent to Zerobus. Each event is individually - serialized as a protobuf message, and the batch is sent as a single request. - The maximum batch size is 10MB, enforced by the Zerobus SDK. + Events are batched before being sent to Zerobus. Each batch is encoded as a + single Arrow `RecordBatch` and sent over Arrow Flight as one request. The + maximum batch size is 10MB, enforced by the Zerobus SDK. Vector sizes batches against `batch.max_bytes` using the *uncompressed, pre-serialization* event size, while the SDK's 10MB cap applies to the - *encoded protobuf* size. For most schemas the protobuf encoding is smaller + *encoded Arrow* size. For most schemas the Arrow encoding is smaller than (or comparable to) the source event, but for numeric-heavy schemas (many integer or float fields) the encoded form can be larger — so a batch configured right at the 10MB boundary may exceed the SDK limit and the