From 2da9768277a892db55dff6d7af641707a3553a8e Mon Sep 17 00:00:00 2001 From: KimJS0328 Date: Mon, 25 May 2026 14:23:46 +0900 Subject: [PATCH] enhancement(amqp source): expose message headers and properties in events (#23368) * enhancement(amqp source): expose AMQP message headers and properties as part of emitted events. * fix(amqp source): message timestamp handling by interpreting timestamps as Unix seconds instead of milliseconds, resolving an incorrect timestamp parsing bug discovered during this work. --- ..._amqp_source_headers_properties.feature.md | 3 + .../23368_amqp_source_timestamp.fix.md | 3 + src/sources/amqp.rs | 406 +++++++++++++++++- .../components/sources/generated/amqp.cue | 10 + 4 files changed, 410 insertions(+), 12 deletions(-) create mode 100644 changelog.d/23368_amqp_source_headers_properties.feature.md create mode 100644 changelog.d/23368_amqp_source_timestamp.fix.md diff --git a/changelog.d/23368_amqp_source_headers_properties.feature.md b/changelog.d/23368_amqp_source_headers_properties.feature.md new file mode 100644 index 0000000000000..22dee4ea0b72f --- /dev/null +++ b/changelog.d/23368_amqp_source_headers_properties.feature.md @@ -0,0 +1,3 @@ +Expose AMQP source message headers and properties on events. + +authors: KimJS0328 diff --git a/changelog.d/23368_amqp_source_timestamp.fix.md b/changelog.d/23368_amqp_source_timestamp.fix.md new file mode 100644 index 0000000000000..5aeab989773f5 --- /dev/null +++ b/changelog.d/23368_amqp_source_timestamp.fix.md @@ -0,0 +1,3 @@ +Correct AMQP source message timestamps to be interpreted as Unix seconds. + +authors: KimJS0328 diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index 425b982a316b1..187793d2ea3ff 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -1,13 +1,18 @@ //! `AMQP` source. //! Handles version AMQP 0.9.1 which is used by RabbitMQ. -use std::{io::Cursor, pin::Pin}; +use std::{collections::BTreeMap, io::Cursor, pin::Pin}; use async_stream::stream; use bytes::Bytes; use chrono::{TimeZone, Utc}; use futures::{FutureExt, StreamExt}; use futures_util::Stream; -use lapin::{Acker, Channel, message::Delivery, options::BasicQosOptions}; +use lapin::{ + Acker, BasicProperties, Channel, + message::Delivery, + options::BasicQosOptions, + types::{AMQPValue, LongString, ShortString}, +}; use snafu::Snafu; use vector_lib::{ EstimatedJsonEncodedSizeOf, @@ -22,7 +27,8 @@ use vector_lib::{ internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _}, lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path}, }; -use vrl::value::Kind; +use vrl::value::{Kind, kind::Collection}; +use vrl::value::{ObjectMap, Value}; use crate::{ SourceSender, @@ -84,6 +90,16 @@ pub struct AmqpSourceConfig { #[derivative(Default(value = "default_offset_key()"))] pub(crate) offset_key: OptionalValuePath, + /// The key where `AMQP` message headers are added to events. + #[serde(default = "default_headers_key")] + #[derivative(Default(value = "default_headers_key()"))] + pub(crate) headers_key: OptionalValuePath, + + /// The key where `AMQP` message properties are added to events. + #[serde(default = "default_properties_key")] + #[derivative(Default(value = "default_properties_key()"))] + pub(crate) properties_key: OptionalValuePath, + /// The namespace to use for logs. This overrides the global setting. #[configurable(metadata(docs::hidden))] #[serde(default)] @@ -135,6 +151,14 @@ fn default_offset_key() -> OptionalValuePath { OptionalValuePath::from(owned_value_path!("offset")) } +fn default_headers_key() -> OptionalValuePath { + OptionalValuePath::from(owned_value_path!("headers")) +} + +fn default_properties_key() -> OptionalValuePath { + OptionalValuePath::from(owned_value_path!("properties")) +} + impl_generate_config_from_default!(AmqpSourceConfig); impl AmqpSourceConfig { @@ -189,6 +213,23 @@ impl SourceConfig for AmqpSourceConfig { &owned_value_path!("offset"), Kind::integer(), None, + ) + .with_source_metadata( + AmqpSourceConfig::NAME, + self.headers_key.path.clone().map(LegacyKey::InsertIfEmpty), + &owned_value_path!("headers"), + Kind::object(Collection::empty().with_unknown(Kind::any())), + None, + ) + .with_source_metadata( + AmqpSourceConfig::NAME, + self.properties_key + .path + .clone() + .map(LegacyKey::InsertIfEmpty), + &owned_value_path!("properties"), + Kind::object(Collection::empty().with_unknown(Kind::any())), + None, ); vec![SourceOutput::new_maybe_logs( @@ -246,6 +287,103 @@ struct Keys<'a> { exchange: &'a str, offset_key: &'a OptionalValuePath, delivery_tag: i64, + headers_key: &'a OptionalValuePath, + headers: Value, + properties_key: &'a OptionalValuePath, + properties: Value, +} + +fn long_string_to_value(value: &LongString) -> Value { + match std::str::from_utf8(value.as_bytes()) { + Ok(value) => value.to_owned().into(), + Err(_) => Bytes::copy_from_slice(value.as_bytes()).into(), + } +} + +fn amqp_timestamp_to_datetime(value: u64) -> Option> { + Utc.timestamp_opt(value as i64, 0).latest() +} + +fn amqp_value_to_value(value: &AMQPValue) -> Value { + match value { + AMQPValue::Boolean(value) => (*value).into(), + AMQPValue::ShortShortInt(value) => (*value as i64).into(), + AMQPValue::ShortShortUInt(value) => (*value as i64).into(), + AMQPValue::ShortInt(value) => (*value as i64).into(), + AMQPValue::ShortUInt(value) => (*value as i64).into(), + AMQPValue::LongInt(value) => (*value as i64).into(), + AMQPValue::LongUInt(value) => (*value as i64).into(), + AMQPValue::LongLongInt(value) => (*value).into(), + AMQPValue::Float(value) => (*value as f64).into(), + AMQPValue::Double(value) => (*value).into(), + AMQPValue::DecimalValue(value) => Value::Object(BTreeMap::from([ + ("scale".into(), (value.scale as i64).into()), + ("value".into(), (value.value as i64).into()), + ])), + AMQPValue::ShortString(value) => value.to_string().into(), + AMQPValue::LongString(value) => long_string_to_value(value), + AMQPValue::FieldArray(value) => { + Value::Array(value.as_slice().iter().map(amqp_value_to_value).collect()) + } + AMQPValue::Timestamp(value) => amqp_timestamp_to_datetime(*value) + .map(Value::from) + .unwrap_or_else(|| (*value as i64).into()), + AMQPValue::FieldTable(value) => field_table_to_value(value), + AMQPValue::ByteArray(value) => Bytes::copy_from_slice(value.as_slice()).into(), + AMQPValue::Void => Value::Null, + } +} + +fn field_table_to_value<'a>( + table: impl IntoIterator, +) -> Value { + Value::Object( + table + .into_iter() + .map(|(key, value)| (key.to_string().into(), amqp_value_to_value(value))) + .collect(), + ) +} + +fn insert_short_string(properties: &mut ObjectMap, key: &str, value: &Option) { + if let Some(value) = value { + properties.insert(key.into(), value.to_string().into()); + } +} + +fn basic_properties_to_value(properties: &BasicProperties) -> Value { + let mut values = BTreeMap::new(); + + insert_short_string(&mut values, "content_type", properties.content_type()); + insert_short_string( + &mut values, + "content_encoding", + properties.content_encoding(), + ); + if let Some(value) = properties.delivery_mode() { + values.insert("delivery_mode".into(), (*value as i64).into()); + } + if let Some(value) = properties.priority() { + values.insert("priority".into(), (*value as i64).into()); + } + insert_short_string(&mut values, "correlation_id", properties.correlation_id()); + insert_short_string(&mut values, "reply_to", properties.reply_to()); + insert_short_string(&mut values, "expiration", properties.expiration()); + insert_short_string(&mut values, "message_id", properties.message_id()); + if let Some(value) = properties.timestamp() { + values.insert( + "timestamp".into(), + amqp_timestamp_to_datetime(*value) + .map(Value::from) + .unwrap_or_else(|| (*value as i64).into()), + ); + } + insert_short_string(&mut values, "type", properties.kind()); + insert_short_string(&mut values, "user_id", properties.user_id()); + insert_short_string(&mut values, "app_id", properties.app_id()); + insert_short_string(&mut values, "cluster_id", properties.cluster_id()); + + Value::Object(values) } /// Populates the decoded event with extra metadata. @@ -285,6 +423,25 @@ fn populate_log_event( keys.delivery_tag, ); + log_namespace.insert_source_metadata( + AmqpSourceConfig::NAME, + log, + keys.headers_key.path.as_ref().map(LegacyKey::InsertIfEmpty), + path!("headers"), + keys.headers.clone(), + ); + + log_namespace.insert_source_metadata( + AmqpSourceConfig::NAME, + log, + keys.properties_key + .path + .as_ref() + .map(LegacyKey::InsertIfEmpty), + path!("properties"), + keys.properties.clone(), + ); + log_namespace.insert_vector_metadata( log, log_schema().source_type_key(), @@ -326,21 +483,32 @@ async fn receive_event( let decoder = config.decoder(log_namespace).map_err(|_e| ())?; let mut stream = DecoderFramedRead::new(payload, decoder); - // Extract timestamp from AMQP message + // Extract AMQP message timestamp. AMQP timestamps are Unix timestamps in seconds. let timestamp = msg .properties .timestamp() - .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest()); + .and_then(amqp_timestamp_to_datetime); let routing = msg.routing_key.to_string(); let exchange = msg.exchange.to_string(); + let headers = msg + .properties + .headers() + .as_ref() + .map(field_table_to_value) + .unwrap_or_else(|| Value::Object(BTreeMap::new())); + let properties = basic_properties_to_value(&msg.properties); let keys = Keys { routing_key_field: &config.routing_key_field, exchange_key: &config.exchange_key, offset_key: &config.offset_key, + headers_key: &config.headers_key, + properties_key: &config.properties_key, routing: &routing, exchange: &exchange, delivery_tag: msg.delivery_tag as i64, + headers, + properties, }; let events_received = register!(EventsReceived); @@ -514,8 +682,15 @@ async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) { #[cfg(test)] pub mod test { + use std::collections::BTreeMap; + + use chrono::TimeZone; + use lapin::{ + BasicProperties, + types::{AMQPValue, FieldArray, FieldTable, ShortString}, + }; use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig}; - use vrl::value::kind::Collection; + use vrl::value::Value; use super::*; @@ -590,7 +765,17 @@ pub mod test { ) .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None) .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None) - .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None); + .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None) + .with_metadata_field( + &owned_value_path!("amqp", "headers"), + Kind::object(Collection::empty().with_unknown(Kind::any())), + None, + ) + .with_metadata_field( + &owned_value_path!("amqp", "properties"), + Kind::object(Collection::empty().with_unknown(Kind::any())), + None, + ); assert_eq!(definition, Some(expected_definition)); } @@ -617,10 +802,112 @@ pub mod test { .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None) .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None) .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None) - .with_event_field(&owned_value_path!("offset"), Kind::integer(), None); + .with_event_field(&owned_value_path!("offset"), Kind::integer(), None) + .with_event_field( + &owned_value_path!("headers"), + Kind::object(Collection::empty().with_unknown(Kind::any())), + None, + ) + .with_event_field( + &owned_value_path!("properties"), + Kind::object(Collection::empty().with_unknown(Kind::any())), + None, + ); assert_eq!(definition, Some(expected_definition)); } + + #[test] + fn amqp_field_table_to_value_preserves_supported_types() { + let timestamp = Utc.timestamp_opt(1_700_000_000, 0).unwrap(); + + let mut nested = FieldTable::default(); + nested.insert("nested-key".into(), AMQPValue::Boolean(true)); + + let mut table = FieldTable::default(); + table.insert( + "string".into(), + AMQPValue::LongString(String::from("value").into()), + ); + table.insert("bytes".into(), AMQPValue::LongString(vec![0, 159].into())); + table.insert("bool".into(), AMQPValue::Boolean(true)); + table.insert("int".into(), AMQPValue::LongLongInt(42)); + table.insert("uint".into(), AMQPValue::LongUInt(7)); + table.insert("timestamp".into(), AMQPValue::Timestamp(1_700_000_000)); + table.insert("nested".into(), AMQPValue::FieldTable(nested)); + table.insert( + "array".into(), + AMQPValue::FieldArray(FieldArray::from(vec![ + AMQPValue::ShortString(ShortString::from("one")), + AMQPValue::Boolean(false), + ])), + ); + + assert_eq!( + field_table_to_value(&table), + Value::Object(BTreeMap::from([ + ( + "array".into(), + Value::Array(vec!["one".into(), false.into()]) + ), + ("bool".into(), true.into()), + ("bytes".into(), Value::Bytes(vec![0, 159].into())), + ("int".into(), 42.into()), + ( + "nested".into(), + Value::Object(BTreeMap::from([("nested-key".into(), true.into())])), + ), + ("string".into(), "value".into()), + ("timestamp".into(), timestamp.into()), + ("uint".into(), 7.into()), + ])) + ); + } + + #[test] + fn basic_properties_to_value_includes_scalar_properties() { + let timestamp = Utc.timestamp_opt(1_700_000_000, 0).unwrap(); + + let properties = BasicProperties::default() + .with_content_type("application/json".into()) + .with_content_encoding("gzip".into()) + .with_delivery_mode(2) + .with_priority(5) + .with_correlation_id("correlation".into()) + .with_reply_to("reply".into()) + .with_expiration("60000".into()) + .with_message_id("message-id".into()) + .with_timestamp(1_700_000_000) + .with_type("type".into()) + .with_user_id("user".into()) + .with_app_id("app".into()); + + assert_eq!( + basic_properties_to_value(&properties), + Value::Object(BTreeMap::from([ + ("app_id".into(), "app".into()), + ("content_encoding".into(), "gzip".into()), + ("content_type".into(), "application/json".into()), + ("correlation_id".into(), "correlation".into()), + ("delivery_mode".into(), 2.into()), + ("expiration".into(), "60000".into()), + ("message_id".into(), "message-id".into()), + ("priority".into(), 5.into()), + ("reply_to".into(), "reply".into()), + ("timestamp".into(), timestamp.into()), + ("type".into(), "type".into()), + ("user_id".into(), "user".into()), + ])) + ); + } + + #[test] + fn amqp_timestamp_to_datetime_uses_unix_seconds() { + assert_eq!( + amqp_timestamp_to_datetime(1_700_000_000), + Utc.timestamp_opt(1_700_000_000, 0).latest() + ); + } } /// Integration tests use the docker compose files in `tests/integration/docker-compose.amqp.yml`. @@ -628,7 +915,7 @@ pub mod test { #[cfg(test)] mod integration_test { use chrono::Utc; - use lapin::types::ShortString; + use lapin::types::{AMQPValue, FieldTable, ShortString}; use lapin::{BasicProperties, options::*}; use tokio::time::Duration; use vector_lib::config::log_schema; @@ -684,7 +971,7 @@ mod integration_test { exchange: &str, routing_key: &str, text: &str, - _timestamp: i64, + properties: BasicProperties, ) { let payload = text.as_bytes(); let payload_len = payload.len(); @@ -696,7 +983,7 @@ mod integration_test { routing_key.into(), BasicPublishOptions::default(), payload.as_ref(), - BasicProperties::default(), + properties, ) .await .unwrap() @@ -763,7 +1050,7 @@ mod integration_test { exchange.as_str(), routing_key, "my message", - now.timestamp_millis(), + BasicProperties::default(), ) .await; @@ -784,6 +1071,94 @@ mod integration_test { assert_eq!(log["exchange"], exchange.as_str().into()); } + async fn source_consume_event_with_headers_and_properties(mut config: AmqpSourceConfig) { + let exchange = format!("test-{}-exchange", random_string(10)); + let queue = format!("test-{}-queue", random_string(10)); + let routing_key = "my_key"; + let exchange: ShortString = exchange.into(); + + config.consumer = format!("test-consumer-{}", random_string(10)); + config.queue = queue; + + let (_conn, channel) = config.connection.connect().await.unwrap(); + channel + .exchange_declare( + exchange.clone(), + lapin::ExchangeKind::Fanout, + ExchangeDeclareOptions { + auto_delete: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await + .unwrap(); + + let queue: ShortString = config.queue.clone().into(); + channel + .queue_declare( + queue.clone(), + QueueDeclareOptions { + auto_delete: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await + .unwrap(); + + channel + .queue_bind( + queue, + exchange.clone(), + "".into(), + QueueBindOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + + let mut headers = FieldTable::default(); + headers.insert( + "x-request-id".into(), + AMQPValue::LongString(String::from("abc123").into()), + ); + headers.insert("retry".into(), AMQPValue::Boolean(true)); + + let timestamp = Utc.timestamp_opt(1_700_000_000, 0).unwrap(); + + let properties = BasicProperties::default() + .with_headers(headers) + .with_content_type("application/json".into()) + .with_priority(7) + .with_timestamp(1_700_000_000); + + send_event( + &channel, + exchange.as_str(), + routing_key, + "my message", + properties, + ) + .await; + + let events = + run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await; + let log = events[0].as_log(); + + assert_eq!(log["headers.x-request-id"], "abc123".into()); + assert_eq!(log["headers.retry"], true.into()); + assert_eq!(log["properties.content_type"], "application/json".into()); + assert_eq!(log["properties.priority"], 7.into()); + assert_eq!( + log[log_schema().timestamp_key().unwrap().to_string()] + .as_timestamp() + .copied(), + Some(timestamp) + ); + assert_eq!(log["properties.timestamp"], timestamp.into()); + } + #[tokio::test] async fn amqp_source_consume_event() { let config = make_config(); @@ -797,4 +1172,11 @@ mod integration_test { await_connection(&config.connection).await; source_consume_event(config).await; } + + #[tokio::test] + async fn amqp_source_consume_event_with_headers_and_properties() { + let config = make_config(); + await_connection(&config.connection).await; + source_consume_event_with_headers_and_properties(config).await; + } } diff --git a/website/cue/reference/components/sources/generated/amqp.cue b/website/cue/reference/components/sources/generated/amqp.cue index 2862dcf64a87c..45399ba0f1a06 100644 --- a/website/cue/reference/components/sources/generated/amqp.cue +++ b/website/cue/reference/components/sources/generated/amqp.cue @@ -551,6 +551,11 @@ generated: components: sources: amqp: configuration: { } } } + headers_key: { + description: "The key where `AMQP` message headers are added to events." + required: false + type: string: default: "headers" + } offset_key: { description: "The `AMQP` offset key." required: false @@ -571,6 +576,11 @@ generated: components: sources: amqp: configuration: { 100, ] } + properties_key: { + description: "The key where `AMQP` message properties are added to events." + required: false + type: string: default: "properties" + } queue: { description: "The name of the queue to consume." required: false