Skip to content

feat: add kafka sink in firehose#120

Closed
sumitaich1998 wants to merge 147 commits into
raystack:mainfrom
goto:feat/kafka-sink-cursor
Closed

feat: add kafka sink in firehose#120
sumitaich1998 wants to merge 147 commits into
raystack:mainfrom
goto:feat/kafka-sink-cursor

Conversation

@sumitaich1998
Copy link
Copy Markdown

No description provided.

spy16 and others added 30 commits March 2, 2023 19:27
fix: kube deployment name-limit handling
chore: change odpf references to goto
* fix: golanci config

* fix: gci lint issues

* fix: lint issues

* fix: remove nosnakecase lint from disable list
* fix: deployment id name
* chore: add better error description

* refactor: merge firehose module files
refactor: reduce firehose package
* feat: implement create, update, reset
* feat: implement upgrade
* feat: implement start, stop, scale
* test: add lots of tests
* feat: implement reset-sync
* refactor: simplify kafka reset flow
* feat: implement log
* refactor: separate client & server CLI
* feat: fix entropy client cli
* fix: use previous telegraf conf
* refactor: remove old firehose module
fix: change commit author to bot
* fix: strip trailing colon in image tag
* feat: add telegraf config templating
ishanarya0 and others added 28 commits April 18, 2025 13:01
* fix: change go struct to go map

* refactor: use mapstructure

---------

Co-authored-by: Ishan Arya <ishan.arya@gojek.com>
Co-authored-by: Ishan Arya <ishan.arya@gojek.com>
Co-authored-by: Ishan Arya <ishan.arya@gojek.com>
Co-authored-by: Ishan Arya <ishan.arya@gojek.com>
* feat: add burst and qps config

* chore: change defaults

* chore: change defaults

---------

Co-authored-by: Ishan Arya <ishan.arya@gojek.com>
* feat: log failure response

* fix: change log type

* feat: log time in UTC

* reorder

---------

Co-authored-by: Ishan Arya <ishan.arya@gojek.com>
feat: store default Influx DB name in Flink resource

Co-authored-by: Ayushi Sharma <ayushi.sharma@gojek.com>
feat: firehose toleration affinity based on autoscaler
* fix: toleration affinity mode kube

* fix: toleration affinity mode kube
* feat(firehose): sink based autoscaler config

* feat: update custom transformers

* feat: override triggers only

* feat: update trigger override

* feat: remove unused const

* feat: update triggers
Co-authored-by: Ayushi Sharma <ayushi.sharma@gojek.com>
Co-authored-by: Ayushi Sharma <ayushi.sharma@gojek.com>
* feat: otel integration

* fix: resolve non-constant format string issues

- Fix WithCausef calls in pkg/validator/validator.go to use constant format strings
- Fix WithCausef call in pkg/errors/errors.go to use constant format string
- Fix multiple WithCausef calls in pkg/helm/helm.go to use constant format strings
- All format functions now properly use '%s' as constant format with dynamic values as arguments
- Resolves security and linting issues related to format string usage

* fix: resolve all remaining non-constant format string issues

- Applied comprehensive fix across entire codebase
- Fixed WithCausef and WithMsgf calls to use constant format strings
- Used automated script to fix patterns like err.Error() and strings.Join()
- Manual fixes for edge cases in firehose and core modules
- All format functions now properly use '%s' as constant format with dynamic values as arguments
- Resolves all remaining security and linting issues related to format string usage

* chore: update lint option

* chore: update lint option

* feat: enable newrelic

* feat: enable grpc otel

---------

Co-authored-by: Femi Novia Lina <feminovi@gmail.com>
feat: add dagger fs oss endpoint helm values
* feat(dagger): add support for tolerations and node affinity for dagger

* feat: test dagger chart with existing autoscaler tolerations

* add debug logs

* feat: centralize node affinity interface conversion logic

---------

Co-authored-by: Ayushi Sharma <ayushi.sharma@gojek.com>
Add KAFKA as a valid SINK_TYPE in the Firehose JSON Schema config,
enabling Entropy to orchestrate Firehose deployments with Kafka sinks.
Add a KAFKA_Sink test case in driver_test.go to verify helm release
generation works correctly for the new sink type.

Co-authored-by: Cursor <cursoragent@cursor.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 13, 2026

Important

Review skipped

Too many files!

This PR contains 198 files, which is 48 over the limit of 150.

To get a review, narrow the scope:
• coderabbit review --type committed # exclude uncommitted changes
• coderabbit review --dir # limit to a subdirectory
• coderabbit review --base # compare against a closer base

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 25e6db63-3eda-40a3-93da-f53972a9572b

📥 Commits

Reviewing files that changed from the base of the PR and between 9e4c9be and d8575c2.

⛔ Files ignored due to path filters (10)
  • go.sum is excluded by !**/*.sum
  • proto/gotocompany/common/v1/service.pb.go is excluded by !**/*.pb.go
  • proto/gotocompany/common/v1/service.pb.gw.go is excluded by !**/*.pb.gw.go
  • proto/gotocompany/common/v1/service_grpc.pb.go is excluded by !**/*.pb.go
  • proto/gotocompany/entropy/v1beta1/module.pb.go is excluded by !**/*.pb.go
  • proto/gotocompany/entropy/v1beta1/module.pb.gw.go is excluded by !**/*.pb.gw.go
  • proto/gotocompany/entropy/v1beta1/module_grpc.pb.go is excluded by !**/*.pb.go
  • proto/gotocompany/entropy/v1beta1/resource.pb.go is excluded by !**/*.pb.go
  • proto/gotocompany/entropy/v1beta1/resource.pb.gw.go is excluded by !**/*.pb.gw.go
  • proto/gotocompany/entropy/v1beta1/resource_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (198)
  • .github/workflows/lint.yml
  • .github/workflows/release.yml
  • .github/workflows/test.yml
  • .gitignore
  • .golangci.yml
  • .goreleaser.yml
  • Makefile
  • README.md
  • buf.gen.yaml
  • cli/action.go
  • cli/cli.go
  • cli/client.go
  • cli/config.go
  • cli/display.go
  • cli/logs.go
  • cli/migrate.go
  • cli/module.go
  • cli/resource.go
  • cli/serve.go
  • cli/utils.go
  • cli/version.go
  • cli/worker.go
  • core/core.go
  • core/core_test.go
  • core/mocks/async_worker.go
  • core/mocks/driver.go
  • core/mocks/loggable_module.go
  • core/mocks/module_registry.go
  • core/mocks/module_service.go
  • core/mocks/module_store.go
  • core/mocks/resource_store.go
  • core/module/action.go
  • core/module/driver.go
  • core/module/module.go
  • core/module/service.go
  • core/read.go
  • core/read_test.go
  • core/resource/resource.go
  • core/resource/resource_test.go
  • core/resource/state.go
  • core/resource/state_test.go
  • core/sync.go
  • core/write.go
  • core/write_test.go
  • docker-compose.yaml
  • docs/concepts/resource-life-cycle.md
  • docs/installation.md
  • docs/modules/firehose.md
  • docs/modules/kubernetes.md
  • entropy.yaml
  • go.mod
  • internal/server/middlewares.go
  • internal/server/server.go
  • internal/server/serverutils/context.go
  • internal/server/serverutils/grpcerror.go
  • internal/server/v1/mocks/module_service.go
  • internal/server/v1/mocks/resource_service.go
  • internal/server/v1/modules/mappers.go
  • internal/server/v1/modules/server.go
  • internal/server/v1/modules/server_test.go
  • internal/server/v1/resources/logwrapper.go
  • internal/server/v1/resources/mappers.go
  • internal/server/v1/resources/server.go
  • internal/server/v1/resources/server_test.go
  • internal/store/postgres/module_model.go
  • internal/store/postgres/module_store.go
  • internal/store/postgres/postgres.go
  • internal/store/postgres/postgres_test.go
  • internal/store/postgres/resource_model.go
  • internal/store/postgres/resource_store.go
  • internal/store/postgres/resource_store_test.go
  • internal/store/postgres/revision_model.go
  • internal/store/postgres/revision_store.go
  • internal/store/postgres/schema.sql
  • internal/store/postgres/testdata/resources.json
  • internal/store/postgres/utils.go
  • main.go
  • modules/dagger/config.go
  • modules/dagger/driver.go
  • modules/dagger/driver_log.go
  • modules/dagger/driver_output.go
  • modules/dagger/driver_plan.go
  • modules/dagger/driver_sync.go
  • modules/dagger/module.go
  • modules/dagger/schema/config.json
  • modules/firehose/autoscaler.go
  • modules/firehose/autoscaler_test.go
  • modules/firehose/config.go
  • modules/firehose/config_test.go
  • modules/firehose/data.go
  • modules/firehose/driver.go
  • modules/firehose/driver_log.go
  • modules/firehose/driver_output.go
  • modules/firehose/driver_output_test.go
  • modules/firehose/driver_plan.go
  • modules/firehose/driver_plan_create_test.go
  • modules/firehose/driver_plan_test.go
  • modules/firehose/driver_plan_update_test.go
  • modules/firehose/driver_sync.go
  • modules/firehose/driver_sync_test.go
  • modules/firehose/driver_test.go
  • modules/firehose/kafka/consumer.go
  • modules/firehose/keda.go
  • modules/firehose/keda_test.go
  • modules/firehose/log.go
  • modules/firehose/module.go
  • modules/firehose/module_test.go
  • modules/firehose/output.go
  • modules/firehose/plan.go
  • modules/firehose/plan_test.go
  • modules/firehose/schema/config.json
  • modules/firehose/schema/reset.json
  • modules/firehose/schema/scale.json
  • modules/firehose/sync.go
  • modules/firehose/test/module-config.json
  • modules/flink/config.go
  • modules/flink/driver.go
  • modules/flink/driver_output.go
  • modules/flink/driver_plan.go
  • modules/flink/driver_sync.go
  • modules/flink/module.go
  • modules/flink/schema/config.json
  • modules/job/config/config.go
  • modules/job/config/schema/config.json
  • modules/job/driver/driver.go
  • modules/job/driver/driver_test.go
  • modules/job/driver/log.go
  • modules/job/driver/output.go
  • modules/job/driver/plan.go
  • modules/job/driver/sync.go
  • modules/job/module.go
  • modules/kafka/config.go
  • modules/kafka/driver.go
  • modules/kafka/module.go
  • modules/kafka/schema/config.json
  • modules/kubernetes/config_schema.json
  • modules/kubernetes/config_schema_test.go
  • modules/kubernetes/driver.go
  • modules/kubernetes/kubernetes.go
  • modules/kubernetes/module.go
  • modules/kubernetes/output.go
  • modules/registry.go
  • modules/registry_test.go
  • modules/utils.go
  • modules/utils_test.go
  • pkg/common/common.go
  • pkg/errors/errors.go
  • pkg/errors/errors_test.go
  • pkg/helm/client.go
  • pkg/helm/config.go
  • pkg/helm/helm.go
  • pkg/helm/release.go
  • pkg/helm/release_test.go
  • pkg/helm/status.go
  • pkg/kafka/consumer_reset.go
  • pkg/kube/client.go
  • pkg/kube/client_test.go
  • pkg/kube/config.go
  • pkg/kube/container/container.go
  • pkg/kube/container/container_test.go
  • pkg/kube/job/job.go
  • pkg/kube/job/job_test.go
  • pkg/kube/job/processor.go
  • pkg/kube/pod/pod.go
  • pkg/kube/volume/Volume.go
  • pkg/kube/volume/Volume_test.go
  • pkg/logger/logger.go
  • pkg/telemetry/opencensus.go
  • pkg/telemetry/opentelemetry.go
  • pkg/telemetry/telemetry.go
  • pkg/validator/validator.go
  • pkg/version/version.go
  • pkg/worker/example/main.go
  • pkg/worker/job_test.go
  • pkg/worker/mocks/job_queue.go
  • pkg/worker/pgq/pgq.go
  • pkg/worker/pgq/pgq_utils.go
  • pkg/worker/worker.go
  • pkg/worker/worker_test.go
  • proto/entropy.swagger.yaml
  • proto/gotocompany/common/v1/service.pb.validate.go
  • proto/gotocompany/entropy/v1beta1/module.pb.validate.go
  • proto/gotocompany/entropy/v1beta1/resource.pb.validate.go
  • test/e2e_test/firehose_helper_test.go
  • test/e2e_test/firehose_test.go
  • test/e2e_test/flink_test.go
  • test/e2e_test/kafka_test.go
  • test/e2e_test/worker_test.go
  • test/testbench/bootstrap.go
  • test/testbench/test_data/module/firehose_module.json
  • test/testbench/test_data/module/flink_module.json
  • test/testbench/test_data/module/kafka_module.json
  • test/testbench/test_data/module/kubernetes_module.json
  • test/testbench/test_data/resource/firehose_resource.json
  • test/testbench/test_data/resource/flink_resource.json
  • test/testbench/test_data/resource/kafka_resource.json
  • test/testbench/test_data/resource/kubernetes_resource.json
  • test/testbench/testbench.go

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants