KafkaEx is an Elixir client for Apache Kafka. KafkaEx requires Elixir 1.14+ and Erlang OTP 24+. Supported Kafka versions: 0.11.0 and newer (see Supported Kafka Versions).
KafkaEx v1.0 is the Kayrock-based release. Scope:
- ✅ Producer, consumer groups, admin APIs
- ✅ SASL (PLAIN, SCRAM-256/512, OAUTHBEARER, AWS MSK IAM)
- ✅ Compression (gzip, snappy, lz4, zstd)
- ✅ Automatic API version negotiation with per-request / per-app overrides
- ✅ First-class telemetry (27+ events)
- ❌ Idempotent / transactional producer (planned, post-1.0)
- ❌ KIP-848 new rebalance protocol (planned, post-1.0)
- ❌ KIP-368 proactive SASL re-authentication on token expiry (see AUTH.md § Token expiry behaviour for the current reconnect-driven behaviour)
⚠️ Kafka 4.0 compatibility tracked in #497
- HexDocs: http://hexdocs.pm/kafka_ex/
- GitHub: https://github.com/kafkaex/kafka_ex/
- Authentication: AUTH.md
- Contributing: CONTRIBUTING.md
- Features
- Quick Start
- Configuration
- Usage
- Authentication (SASL)
- Telemetry & Observability
- Error Handling & Resilience
- Testing
- Contributing
KafkaEx v1.0 uses Kayrock for Kafka protocol serialization with automatic API version negotiation—no manual version configuration needed.
- ✅ Producer - Single and batch message production with timestamps and headers
- ✅ Consumer - Message fetching with offset management
- ✅ Consumer Groups - Coordinated consumption with automatic partition assignment
- ✅ Compression - Gzip, Snappy, LZ4, and Zstd
- ✅ Authentication - SASL/PLAIN, SASL/SCRAM, OAUTHBEARER, AWS MSK IAM
- ✅ SSL/TLS - Secure connections with certificate-based authentication
- ✅ Topic Management - Create and delete topics programmatically
- ✅ Metadata API - Discover brokers, topics, and partitions
- ✅ Offset Management - Commit, fetch, and reset offsets
- ✅ Telemetry - Built-in observability with telemetry events
- ✅ Automatic Retries - Smart retry logic with exponential backoff
- Minimum: Kafka 0.11.0+ required (for RecordBatch format, headers, timestamps)
- Tested against: Kafka 2.1.0 through 3.8.x
- Kafka 4.0+: partial compatibility — tracked in #497
Add KafkaEx to your mix.exs dependencies:
def deps do
[
{:kafka_ex, "~> 1.0"}
]
endThen run:
mix deps.get# Start a client
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])
# Produce a message
{:ok, _metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello")
# Fetch messages from offset 0
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)For production applications, define a module with the KafkaEx.API behaviour:
defmodule MyApp.Kafka do
use KafkaEx.API, client: MyApp.KafkaClient
end
# In your application.ex supervision tree:
children = [
{KafkaEx.Client, name: MyApp.KafkaClient, brokers: [{"localhost", 9092}]}
]
# Now call without passing client:
MyApp.Kafka.produce_one("my-topic", 0, "hello")
{:ok, messages} = MyApp.Kafka.fetch("my-topic", 0, 0)See KafkaEx.API documentation for the complete API reference.
KafkaEx can be configured via config.exs or by passing options directly to KafkaEx.API.start_client/1.
# config/config.exs
config :kafka_ex,
# List of Kafka brokers
brokers: [{"localhost", 9092}, {"localhost", 9093}],
# Client identifier
client_id: "my-app",
# Default consumer group
default_consumer_group: "my-consumer-group",
# Request timeout (milliseconds)
sync_timeout: 10_000config :kafka_ex,
brokers: [{"kafka.example.com", 9093}],
use_ssl: true,
ssl_options: [
cacertfile: "/path/to/ca-cert.pem",
certfile: "/path/to/client-cert.pem",
keyfile: "/path/to/client-key.pem",
verify: :verify_peer
]config :kafka_ex,
default_consumer_group: "my-group",
# Auto-commit settings
commit_interval: 5_000, # Commit every 5 seconds
commit_threshold: 100, # Or every 100 messages
# What to do when no committed offset exists, or the requested offset is
# out of range. Allowed values:
# :none — raise (the library default — strict; good for production
# where an unexpected out-of-range indicates a real problem)
# :earliest — reset to the oldest available offset (common for new
# consumer groups that want to read existing data)
# :latest — reset to the newest offset (skip backlog, read only new)
auto_offset_reset: :noneMost users do not need to change these. Shown here with defaults so the knobs are discoverable.
config :kafka_ex,
# Declare the compression algorithms your app uses so Client.init
# crashes loudly at boot if the backing optional dep isn't loaded.
# Default: [] (no validation). Any of :gzip (needs no dep), :snappy
# (needs :snappyer), :lz4 (needs :lz4b), :zstd (needs :ezstd).
required_compression: [],
# Delay before a broker-reconnect retry (ms). Lower reconnects faster
# but hammers down brokers; higher smooths flapping at the cost of
# longer error windows.
sleep_for_reconnect: 400,
# Periodic metadata refresh cadence (ms). The client issues a full
# Metadata request this often to pick up leader elections, new
# topics, and broker membership changes.
metadata_update_interval: 30_000,
# Top-level application supervisor restart intensity — if more than
# max_restarts children exit in any max_seconds window, the
# supervisor shuts down. Tuning these trades "restart spam on flaps"
# against "tolerance for broker brownouts".
max_restarts: 10,
max_seconds: 60Compression is set per-request, not globally:
# Produce with gzip compression
KafkaEx.API.produce(client, "topic", 0, messages, compression: :gzip)
# Supported: :none (default), :gzip, :snappy, :lz4, :zstdFor Snappy compression, add to mix.exs:
{:snappyer, "~> 1.2"}You can use MFA or anonymous functions for dynamic broker resolution:
# Using MFA tuple
config :kafka_ex,
brokers: {MyApp.Config, :get_kafka_brokers, []}
# Using anonymous function
config :kafka_ex,
brokers: fn -> Application.get_env(:my_app, :kafka_brokers) endSee KafkaEx.Config for all available options.
# Single message
{:ok, metadata} = KafkaEx.API.produce_one(
client,
"my-topic",
0, # partition
"hello world" # message value
)
# With message key (for partition routing)
{:ok, metadata} = KafkaEx.API.produce_one(
client,
"my-topic",
0,
"message value",
key: "user-123"
)
# Batch produce
messages = [
%{value: "message 1", key: "key1"},
%{value: "message 2", key: "key2"},
%{value: "message 3", key: "key3"}
]
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, messages)# Fetch from specific offset
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 100)
result.records
|> Enum.each(fn record ->
IO.puts("Offset: #{record.offset}, Value: #{record.value}")
end)
# Fetch all messages (earliest to high watermark)
{:ok, result} = KafkaEx.API.fetch_all(client, "my-topic", 0)Consumer groups provide coordinated consumption with automatic partition assignment and offset management.
defmodule MyApp.MessageConsumer do
use KafkaEx.Consumer.GenConsumer
require Logger
# Messages are delivered in batches
def handle_message_set(message_set, state) do
Enum.each(message_set, fn record ->
Logger.info("Processing: #{inspect(record.value)}")
# Process your message here
end)
# Commit offsets asynchronously
{:async_commit, state}
end
endAvailable commit strategies:
{:async_commit, state}- Commit in background (recommended){:sync_commit, state}- Wait for commit to complete
# In your application.ex
def start(_type, _args) do
children = [
# Start the consumer group
%{
id: MyApp.MessageConsumer,
start: {
KafkaEx.Consumer.ConsumerGroup,
:start_link,
[
MyApp.MessageConsumer, # Your consumer module
"my-consumer-group", # Consumer group ID
["topic1", "topic2"], # Topics to consume
[
# Optional configuration
commit_interval: 5_000,
commit_threshold: 100,
auto_offset_reset: :earliest
]
]
}
}
]
Supervisor.start_link(children, strategy: :one_for_one)
endSee KafkaEx.Consumer.GenConsumer for details.
# Get all topics
{:ok, metadata} = KafkaEx.API.metadata(client)
# Get specific topics
{:ok, metadata} = KafkaEx.API.metadata(client, ["topic1", "topic2"])
# Inspect partitions
metadata.topics
|> Enum.each(fn topic ->
IO.puts("Topic: #{topic.name}, Partitions: #{length(topic.partitions)}")
end)# Get latest offset for a partition
{:ok, offset} = KafkaEx.API.latest_offset(client, "my-topic", 0)
# Get earliest offset
{:ok, offset} = KafkaEx.API.earliest_offset(client, "my-topic", 0)
# List offsets by timestamp
timestamp = DateTime.utc_now() |> DateTime.add(-3600, :second) |> DateTime.to_unix(:millisecond)
partition_request = %{partition_num: 0, timestamp: timestamp}
{:ok, offsets} = KafkaEx.API.list_offsets(client, [{"my-topic", [partition_request]}])
# Fetch committed offset for consumer group
partitions = [%{partition_num: 0}]
{:ok, offsets} = KafkaEx.API.fetch_committed_offset(
client,
"my-consumer-group",
"my-topic",
partitions
)
# Commit offset for consumer group
partitions = [%{partition_num: 0, offset: 100}]
{:ok, result} = KafkaEx.API.commit_offset(
client,
"my-consumer-group",
"my-topic",
partitions
)# Create a topic
{:ok, result} = KafkaEx.API.create_topic(
client,
"new-topic",
num_partitions: 3,
replication_factor: 2,
config_entries: %{
"retention.ms" => "86400000",
"compression.type" => "gzip"
}
)
# Delete a topic
{:ok, result} = KafkaEx.API.delete_topic(client, "old-topic")KafkaEx supports multiple compression formats. Compression is applied per-request:
# Gzip compression (built-in)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :gzip
)
# Snappy compression (requires snappyer package)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :snappy
)
# LZ4 compression (built-in, Kafka 0.9.0+)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :lz4
)
# Zstd compression (built-in, Kafka 2.1.0+)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :zstd
)Supported Formats:
| Format | Kafka Version | Dependency Required |
|---|---|---|
:gzip |
0.7.0+ | None (built-in) |
:snappy |
0.8.0+ | {:snappyer, "~> 1.2"} |
:lz4 |
0.9.0+ | None (built-in) |
:zstd |
2.1.0+ | None (built-in) |
Decompression is handled automatically when consuming messages.
KafkaEx supports multiple SASL authentication mechanisms for secure connections to Kafka clusters.
Simple username/password authentication. Always use with SSL/TLS to protect credentials.
config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
ssl_options: [verify: :verify_peer, cacertfile: "/path/to/ca.pem"],
sasl: %{
mechanism: :plain,
username: "alice",
password: "secret123"
}Challenge-response authentication (more secure than PLAIN).
config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
sasl: %{
mechanism: :scram,
username: "alice",
password: "secret123",
mechanism_opts: %{algo: :sha256} # or :sha512
}OAuth 2.0 token-based authentication.
config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
sasl: %{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: &MyApp.get_oauth_token/0,
extensions: %{"traceId" => "optional-data"}
}
}AWS IAM authentication for Amazon Managed Streaming for Kafka (MSK).
config :kafka_ex,
brokers: [{"msk-cluster.region.amazonaws.com", 9098}],
use_ssl: true,
sasl: %{
mechanism: :msk_iam,
mechanism_opts: %{
region: "us-east-1"
# Credentials automatically resolved from environment
}
}Authentication Requirements:
| Mechanism | Minimum Kafka | SSL Required | Notes |
|---|---|---|---|
| PLAIN | 0.9.0+ | ✅ Yes | Never use without SSL/TLS |
| SCRAM | 0.10.2+ | Challenge-response, more secure | |
| OAUTHBEARER | 2.0+ | Requires token provider | |
| MSK_IAM | MSK 2.7.1+ | ✅ Yes | AWS-specific |
See AUTH.md for detailed authentication setup and troubleshooting.
KafkaEx emits telemetry events for monitoring connections, requests, and consumer operations.
| Category | Events | Description |
|---|---|---|
| Connection | 4 | Connect, disconnect, reconnect, close |
| Request | 4 | Request start/stop/exception, retry |
| Produce | 4 | Produce start/stop/exception, batch metrics |
| Fetch | 4 | Fetch start/stop/exception, messages received |
| Offset | 4 | Commit/fetch offset operations |
| Consumer | 8 | Group join, sync, heartbeat, rebalance, message processing |
| Metadata | 4 | Cluster metadata updates |
| SASL Auth | 6 | PLAIN/SCRAM authentication spans |
defmodule MyApp.KafkaTelemetry do
require Logger
def attach do
:telemetry.attach_many(
"my-kafka-handler",
[
[:kafka_ex, :connection, :stop],
[:kafka_ex, :request, :stop],
[:kafka_ex, :produce, :stop],
[:kafka_ex, :fetch, :stop]
],
&handle_event/4,
nil
)
end
def handle_event([:kafka_ex, :connection, :stop], measurements, metadata, _config) do
Logger.info("Connected to #{metadata.host}:#{metadata.port} in #{measurements.duration / 1_000_000}ms")
end
def handle_event([:kafka_ex, :request, :stop], measurements, metadata, _config) do
Logger.debug("Request #{metadata.api_key} took #{measurements.duration / 1_000_000}ms")
end
def handle_event([:kafka_ex, :produce, :stop], measurements, metadata, _config) do
Logger.info("Produced #{measurements.message_count} messages to #{metadata.topic}")
end
def handle_event([:kafka_ex, :fetch, :stop], measurements, metadata, _config) do
Logger.info("Fetched #{measurements.message_count} messages from #{metadata.topic}")
end
endThen in your application startup:
# application.ex
def start(_type, _args) do
MyApp.KafkaTelemetry.attach()
# ...
endSee KafkaEx.Telemetry for the complete event reference.
KafkaEx v1.0 includes smart error handling and retry logic for production resilience.
- Producer requests - Automatically retry on leadership-related errors (
not_leader_for_partition,leader_not_available) with metadata refresh - Offset commits - Retry transient errors (timeout, coordinator not available) with exponential backoff
- API version negotiation - Retry parse errors during initial connection
Consumer groups handle transient errors gracefully following the Java client pattern (KAFKA-6829):
unknown_topic_or_partitiontriggers retry instead of crash- Heartbeat errors trigger rejoin for recoverable errors
- Exponential backoff for join retries (1s→10s, up to 6 attempts)
Important: Produce requests only retry on leadership errors where we know the message wasn't written. Timeout errors are NOT retried to prevent potential duplicate messages.
For truly idempotent produces, enable enable.idempotence=true on your Kafka cluster (requires Kafka 0.11+).
When using certain versions of OTP, random timeouts can occur with SSL.
Impacted versions:
- OTP 21.3.8.1 → 21.3.8.14
- OTP 22.1 → 22.3.1
Solution: Upgrade to OTP 21.3.8.15 or 22.3.2+.
Run tests that don't require a live Kafka cluster:
mix test.unitKafkaEx includes a Dockerized test cluster with 3 Kafka brokers configured with different authentication mechanisms:
Ports:
- 9092-9094: No authentication (SSL)
- 9192-9194: SASL/PLAIN (SSL)
- 9292-9294: SASL/SCRAM (SSL)
- 9392-9394: SASL/OAUTHBEARER (SSL)
Start the test cluster:
./scripts/docker_up.shRun all tests:
# Unit tests
mix test.unit
# Integration tests
mix test.integration
# All tests together
mix testRun specific test categories:
mix test --only consumer_group
mix test --only produce
mix test --only consume
mix test --only authRun SASL tests:
MIX_ENV=test mix test --include saslmix format # Format code
mix format --check-formatted
mix credo --strict # Linting
mix dialyzer # Type checkingAll contributions are managed through the KafkaEx GitHub repo.
- Issues: github.com/kafkaex/kafka_ex/issues
- Pull Requests: See CONTRIBUTING.md for our contribution process
- Slack: #kafkaex on elixir-lang.slack.com (request invite)
- Slack Archive: slack.elixirhq.com/kafkaex
kafka_ex has no tiered maintainer structure — every user of kafka_ex is a maintainer. If you run it in production, triage issues when you hit them, send PRs when you find bugs, and help review others' PRs. The project survives on that.
Historically active contributors you may see around the issues and PRs:
- Piotr Rybarczyk (@Argonus, Fresha)
- Dan Swain (@dantswain) — Kayrock author
- bjhaid (@bjhaid)
- Joshua Scott (@joshuawscott)
- Jack Lund (@jacklund)
v1.0 is the Kayrock-based release. Expect prioritisation rather than blanket coverage.
In scope:
- Critical bugs (data loss, crashes, protocol violations)
- Kafka compatibility issues against supported broker versions (0.11.0–3.8.x)
- Security vulnerabilities (see below)
Accepted with best-effort review (PRs welcome):
- Feature requests that fit the v1.x roadmap (see Project Status)
- Performance improvements with benchmarks
- Documentation improvements
Explicitly out of scope for v1.x:
- Idempotent / transactional producer (planned post-1.0)
- KIP-848 new rebalance protocol (planned post-1.0)
- KIP-368 proactive SASL re-authentication (planned post-1.0 — see AUTH.md § Token expiry behaviour)
- Back-porting fixes to 0.15.x beyond security patches
- Supporting Kafka < 0.11 (use an earlier kafka_ex tag)
- Critical bugs / security issues: 2 weeks
- Other PRs: best-effort; bump the thread if no reply after 3 weeks
- Questions / discussion: GitHub issues or #kafkaex on elixir-lang.slack.com; no SLA
Do not file public GitHub issues for security vulnerabilities. Use either:
- GitHub private vulnerability reporting: https://github.com/kafkaex/kafka_ex/security/advisories/new
- Reach out via the channels listed under Contributing.
Response within ~2 weeks; disclosure coordination typically 30–90 days. Security fixes land on 1.0.x; 0.15.x receives security-only patches through 2027-04-17; earlier versions are unsupported.
KafkaEx is released under the MIT License. See LICENSE for details.