Bu proje, Kafka üzerinde stream işleme ve join mantığını Node.js (KafkaJS) ile gösteren bir örnektir. Önce Kafka Streams kavramlarına kısa bir giriş, sonra bu repodaki örnek ve data/ klasörü anlatılmaktadır.
Kafka Streams, Apache Kafka’daki event stream’leri (olay akışlarını) işlemek için kullanılan bir kütüphane/yaklaşımdır. Veriler topic’lerde sürekli akar; Streams ile bu akışları filtreleyebilir, dönüştürebilir, birleştirebilir (join) veya toplayabilirsiniz.
- Topic: Mesajların yazıldığı kanal (örn.
orders.created,customers). Partition’lara bölünerek ölçeklenir. - Producer: Topic’lere mesaj gönderen uygulama.
- Consumer: Topic’lerden mesaj okuyan uygulama. Consumer group ile paralel ve tekrar işleme (at-least-once) yönetilir.
- Stream: Zaman sıralı, sınırsız kayıt akışı (Kafka topic’indeki mesajlar).
- Stream–Table join: Bir stream’i bir “tablo” (güncel değerler) ile zenginleştirme (örn. sipariş + müşteri bilgisi).
- Stream–Stream join: İki stream’i zaman penceresi içinde eşleştirme (örn. sipariş + ödeme aynı orderId için).
Stream işlemede iki temel yaklaşım vardır:
Stateless (durumsuz) işlemler
Her mesaj tek başına işlenir; filter, map, flatMap gibi. Önceki kayıtları “hatırlamaya” gerek yoktur. Girdi mesajı yeterlidir.
Stateful (durumlu) işlemler
Join, groupBy + aggregate, windowing gibi işlemler state (durum) ister: önceki veya diğer stream’deki kayıtları tutup eşleştirmek gerekir. Kafka Streams bu işi yerel state store kavramıyla çözer: Her task, ihtiyacı olan state’i state store içinde tutar ve API ile erişilebilir.
State store türleri (Kafka Streams tarafında):
- Persistent key-value store – Disk’e yazılır; restart sonrası korunur.
- In-memory store – Sadece RAM’de; hızlıdır, restart’ta sıfırlanır.
- İhtiyaca göre window store, session store gibi diğer yapılar da kullanılabilir.
Bu projede join-service.js stateful davranır: customers için in-memory bir “tablo” (KTable benzeri), sipariş–ödeme eşleştirmesi için ise pendingOrders / pendingPayments gibi state tutar. Kafka’nın resmi Java kütüphanesi yerine KafkaJS ile aynı mantık Node.js’te uygulanmaktadır.
Örnek senaryo: Sipariş ve Ödeme event’leri geliyor; bunları birleştirip müşteri bilgisiyle zenginleştirilmiş çıktılar üretiyoruz.
| Topic | Açıklama |
|---|---|
customers |
Müşteri kayıtları (KTable benzeri; compact) |
orders.created |
Yeni sipariş event’leri |
payments.completed |
Tamamlanan ödeme event’leri |
orders.paid |
Sipariş + ödeme join sonucu (stream–stream) |
orders.enriched |
Sipariş + müşteri join sonucu (stream–table) |
- create-topics.sh – Gerekli topic’leri Docker’daki Kafka’da oluşturur (
--if-not-existsile tekrar çalıştırılabilir). - produce-customers.js – Örnek müşteri kayıtlarını
customerstopic’ine yazar. - produce-orders.js – Örnek sipariş event’ini
orders.createdtopic’ine yazar. - produce-payments.js – Örnek ödeme event’ini
payments.completedtopic’ine yazar. - join-service.js – Stream işleme ve join mantığı:
customers→ in-memory “tablo” (KTable benzeri).orders.created+customers→orders.enriched(sipariş + müşteri).orders.created+payments.completed(orderId + zaman penceresi) →orders.paid(sipariş + ödeme).
- consume-output.js –
orders.paidveorders.enrichedtopic’lerini okur; mesajları hem konsola hemdata/klasörüne yazar.
customers orders.created payments.completed
| | |
v v v
+---------------------+---------------------+
|
join-service
(stream–table + stream–stream)
|
+-------------+-------------+
v v
orders.enriched orders.paid
| |
+-------------+-------------+
v
consume-output
|
data/ + konsol
Tüm üretilen (produce) ve tüketilen (consume) mesajlar proje kökündeki data/ klasörüne JSONL (satır başına bir JSON) olarak yazılır. Böylece hem Kafka’ya giden hem Kafka’dan okunan veriler yerel olarak takip edilebilir.
| Dosya | İçerik |
|---|---|
data/produced-customers.jsonl |
customers topic’ine gönderilen müşteri kayıtları (topic, key, value). |
data/produced-orders.jsonl |
orders.created topic’ine gönderilen sipariş event’leri. |
data/produced-payments.jsonl |
payments.completed topic’ine gönderilen ödeme event’leri. |
Her satır bir JSON nesnesidir; topic, key ve value alanlarını içerir.
| Dosya | İçerik |
|---|---|
data/consumed-orders-paid.jsonl |
orders.paid topic’inden okunan mesajlar (sipariş + ödeme join sonucu). |
data/consumed-orders-enriched.jsonl |
orders.enriched topic’inden okunan mesajlar (sipariş + müşteri zenginleştirmesi). |
Her satırda topic, key, value (string) ve timestamp bulunur. Join sonuçları value içinde JSON string olarak gelir.
data/ klasörü .gitignore ile versiyon kontrolü dışında tutulabilir; örnek verileri yerel çalıştırmalarla üretirsiniz.
- Node.js (ES modules kullanılıyor;
package.jsoniçinde"type": "module"). - Docker ile çalışan bir Kafka (örn.
kafkaadlı container,localhost:9092).
-
Bağımlılıkları yükleyin:
npm install
-
Topic’leri oluşturun:
npm run create-topics
-
Veri üretin (sırayla):
npm run produce-customers npm run produce-orders npm run produce-payments
-
Join servisini bir terminalde çalıştırın (açık kalsın):
npm run join-service
-
Consume’u başka bir terminalde çalıştırın (açık kalsın):
npm run consume-output
Join-service ve consume-output çalışırken produce script’lerini tekrar çalıştırarak yeni event’ler ekleyebilirsiniz. Consume edilen her mesaj hem konsola basılır hem data/consumed-*.jsonl dosyalarına eklenir.
- Kafka Streams: Topic’lerdeki event akışlarını işleme ve join etme yaklaşımı; bu projede Node.js (KafkaJS) ile uygulanıyor.
- Bu örnek: Sipariş, ödeme ve müşteri topic’leri; join-service ile stream–table ve stream–stream join; consume-output ile sonuçların okunması.
data/: Produce ve consume edilen tüm veriler bu klasörde JSONL dosyalarına yazılıyor; böylece akış yerel olarak izlenebilir ve örnek veriler saklanabilir.
- Kafka Streams Nedir? — Mehmet Cem Yücel, Medium
- Kafka Streams – Introduction — Apache Kafka resmi dokümantasyonu