Skip to content

feat : vocab Kafka Producer 워커 추가 (#9)#10

Merged
zzuhannn merged 7 commits into
mainfrom
feat/#9
May 10, 2026
Merged

feat : vocab Kafka Producer 워커 추가 (#9)#10
zzuhannn merged 7 commits into
mainfrom
feat/#9

Conversation

@zzuhannn

Copy link
Copy Markdown
Member

Summary

  • fairytale_paragraph 컨슈밍 → extract_vocab LLM 호출 → vocab_extracted 발행 워커 추가
  • 페이지당 최소 1건 발행 (Spring UNIQUE이 dedup)
  • 빈 텍스트/OpenAI 실패/poison-pill 모두 빈 vocab으로 fail-open

주요 변경

  • app/services/kafka_worker.py 신규
  • app/schemas/kafka_messages.py 신규 (Spring DTO와 camelCase 정합)
  • FastAPI lifespan에 결선 + /health/ready
  • Dockerfile에 librdkafka1 사전 설치

합의 사항

  • 토픽: vocab_extracted / 컨슈머 그룹: vocab-extractor
  • 메시지: camelCase JSON, ADD_TYPE_INFO_HEADERS=false

관련

zzuhannn added 7 commits May 10, 2026 16:46
- requirements.txt: confluent-kafka>=2.6.1,<3.0 (cp313 manylinux wheel 보장)
- config.py: kafka_url, fairytale_paragraph, vocab_extracted, vocab_consumer_group, openai_request_timeout 설정 추가
- llm_service.extract_vocab: OpenAI 클라이언트에 timeout=30s 적용 (단일 호출 무한 hang 방지)
- FairytaleParagraphMessage: 컨슈밍 메시지 (snake_case fairytale_id/page/text)
- VocabExtractedMessage: 발행 메시지 (camelCase, Spring DTO와 1:1 정합)
- model_dump_json() 결과가 Spring JsonDeserializer가 기대하는 camelCase byte string으로 직렬화
- _produce_with_ack: delivery callback gated commit (broker ack 받은 메시지만 offset 진행)
- _handle_poison_pill: ValidationError 시에도 raw JSON에서 fid/page 추출 가능하면 빈 vocab 발행
- OpenAI 호출 실패/빈 텍스트도 빈 vocab 발행 (fail-open SSE)
- Consumer max.poll.interval.ms=120000 (OpenAI 30s timeout보다 충분히 김, rebalance loop 방지)
- Producer acks=all + idempotence=true
- 페이지당 최소 1건 발행 보장 (Spring UNIQUE이 dedup)
- lifespan startup non-blocking: Kafka broker 죽어도 FastAPI는 계속 부팅 (Phase 1 HTTP 폴백 보존)
- vocab_worker_enabled=false 또는 kafka_url 빈 값일 때 워커 미기동 (로컬 dev 폴백)
- /health: process liveness
- /health/ready: producer.list_topics(2s)로 broker 도달 가능 여부 확인 (503 on failure)
- shutdown 시 consumer 닫고 producer flush
- test_kafka_worker.py 6건: 정상/빈텍스트/OpenAI 예외/poison-pill 복구/poison-pill skip/delivery 실패 시 commit 안 됨
- test_wire_format.py 2건: VocabExtractedMessage가 Spring DTO와 byte-level 정합 (정상/빈 word)
- 모든 종착 분기에서 페이지당 최소 1건 발행 보장 검증
- delivery callback 실패 시 consumer.commit이 호출되지 않음을 검증 (at-least-once 보장)
- Dockerfile: confluent-kafka cp313 manylinux wheel 사용 가정이지만 librdkafka1 런타임 라이브러리를 사전 설치해 폴백 안전망 확보
- .env.example: Kafka/OpenAI 환경변수 템플릿 (Spring Back과 동일 키 사용 — KAFKA_URL/FAIRYTALE_PARAGRAPH/VOCAB_EXTRACTED)
- VOCAB_WORKER_ENABLED=false로 로컬에서 워커 비활성화 가능 (Phase 1 HTTP 폴백만 사용)
- FairytaleParagraphMessage: model_config = ConfigDict(extra="ignore") 명시 (Pydantic v2 default와 동일하지만 의도 명시)
- VocabExtractedMessage.messageId: Optional → 필수 str (워커가 항상 UUID 설정)
- extract_graph도 OpenAI(timeout=30s) 적용 (extract_vocab과 일관성)
- FairytaleParagraphMessage 미래 필드 추가 호환성 검증 테스트 추가
@zzuhannn zzuhannn changed the title [Phase 2] vocab Kafka Producer 워커 추가 (#9) feat : vocab Kafka Producer 워커 추가 (#9) May 10, 2026
@zzuhannn zzuhannn merged commit 925e04d into main May 10, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant