Skip to content

Commit 4fb9237

Browse files
committed
Add Kafka Supervisor spec
1 parent 58ef815 commit 4fb9237

9 files changed

Lines changed: 797 additions & 12 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
public struct AppendableIndexSpec: Codable, Hashable, Equatable, Sendable {
2+
public init(type: String, preserveExistingMetrics: Bool? = nil) {
3+
self.type = type
4+
self.preserveExistingMetrics = preserveExistingMetrics
5+
}
6+
7+
public let type: String
8+
public let preserveExistingMetrics: Bool?
9+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/// https://druid.apache.org/docs/latest/ingestion/supervisor/#tuning-configuration
2+
/// https://druid.apache.org/docs/latest/ingestion/kafka-ingestion#tuning-configuration
3+
public struct KafkaTuningConfig: Codable, Hashable, Equatable, Sendable {
4+
public init(
5+
skipSequenceNumberAvailabilityCheck: Bool? = nil,
6+
recordBufferSizeBytes: Int? = nil,
7+
recordBufferOfferTimeout: Int? = nil,
8+
recordBufferFullWait: Int? = nil,
9+
fetchThreads: Int? = nil,
10+
maxBytesPerPoll: Int? = nil,
11+
repartitionTransitionDuration: String? = nil,
12+
useListShards: Bool? = nil,
13+
maxRowsInMemory: Int? = nil,
14+
maxBytesInMemory: Int? = nil,
15+
skipBytesInMemoryOverheadCheck: Bool? = nil,
16+
maxRowsPerSegment: Int? = nil,
17+
maxTotalRows: Int? = nil,
18+
intermediateHandoffPeriod: String? = nil,
19+
intermediatePersistPeriod: String? = nil,
20+
maxPendingPersists: Int? = nil,
21+
indexSpec: IndexSpec? = nil,
22+
indexSpecForIntermediatePersists: IndexSpec? = nil,
23+
reportParseExceptions: Bool? = nil,
24+
handoffConditionTimeout: Int? = nil,
25+
resetOffsetAutomatically: Bool? = nil,
26+
workerThreads: Int? = nil,
27+
chatRetries: Int? = nil,
28+
httpTimeout: String? = nil,
29+
shutdownTimeout: String? = nil,
30+
offsetFetchPeriod: String? = nil,
31+
logParseExceptions: Bool? = nil,
32+
maxParseExceptions: Int? = nil,
33+
maxSavedParseExceptions: Int? = nil,
34+
numPersistThreads: Int? = nil,
35+
appendableIndexSpec: AppendableIndexSpec? = nil
36+
) {
37+
self.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck
38+
self.recordBufferSizeBytes = recordBufferSizeBytes
39+
self.recordBufferOfferTimeout = recordBufferOfferTimeout
40+
self.recordBufferFullWait = recordBufferFullWait
41+
self.fetchThreads = fetchThreads
42+
self.maxBytesPerPoll = maxBytesPerPoll
43+
self.repartitionTransitionDuration = repartitionTransitionDuration
44+
self.useListShards = useListShards
45+
self.maxRowsInMemory = maxRowsInMemory
46+
self.maxBytesInMemory = maxBytesInMemory
47+
self.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck
48+
self.maxRowsPerSegment = maxRowsPerSegment
49+
self.maxTotalRows = maxTotalRows
50+
self.intermediateHandoffPeriod = intermediateHandoffPeriod
51+
self.intermediatePersistPeriod = intermediatePersistPeriod
52+
self.maxPendingPersists = maxPendingPersists
53+
self.indexSpec = indexSpec
54+
self.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists
55+
self.reportParseExceptions = reportParseExceptions
56+
self.handoffConditionTimeout = handoffConditionTimeout
57+
self.resetOffsetAutomatically = resetOffsetAutomatically
58+
self.workerThreads = workerThreads
59+
self.chatRetries = chatRetries
60+
self.httpTimeout = httpTimeout
61+
self.shutdownTimeout = shutdownTimeout
62+
self.offsetFetchPeriod = offsetFetchPeriod
63+
self.logParseExceptions = logParseExceptions
64+
self.maxParseExceptions = maxParseExceptions
65+
self.maxSavedParseExceptions = maxSavedParseExceptions
66+
self.numPersistThreads = numPersistThreads
67+
self.appendableIndexSpec = appendableIndexSpec
68+
}
69+
70+
// - MARK: Kinesis Related Properties
71+
/// Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If false, the indexing task attempts to reset the current sequence number, depending on the value of resetOffsetAutomatically.
72+
public let skipSequenceNumberAvailabilityCheck: Bool?
73+
74+
/// The size of the buffer (heap memory bytes) Druid uses between the Kinesis fetch threads and the main ingestion thread.
75+
public let recordBufferSizeBytes: Int?
76+
77+
/// The number of milliseconds to wait for space to become available in the buffer before timing out.
78+
public let recordBufferOfferTimeout: Int?
79+
80+
/// The number of milliseconds to wait for the buffer to drain before Druid attempts to fetch records from Kinesis again.
81+
public let recordBufferFullWait: Int?
82+
83+
/// The size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.
84+
public let fetchThreads: Int?
85+
86+
/// The maximum number of bytes to be fetched from buffer per poll. At least one record is polled from the buffer regardless of this config.
87+
public let maxBytesPerPoll: Int?
88+
89+
/// ISO 8601 period When shards are split or merged, the supervisor recomputes shard to task group mappings. The supervisor also signals any running tasks created under the old mappings to stop early at current time + repartitionTransitionDuration. Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split or merge, which helps avoid issues with empty shard handling. https://github.com/apache/druid/issues/7600
90+
public let repartitionTransitionDuration: String?
91+
92+
/// Indicates if listShards API of AWS Kinesis SDK can be used to prevent LimitExceededException during ingestion. You must set the necessary IAM permissions.
93+
public let useListShards: Bool?
94+
95+
// - MARK: Generic Properties
96+
/// The number of rows to accumulate before persisting. This number represents the post-aggregation rows. It is not equivalent to the number of input events, but the resulting number of aggregated rows.
97+
public let maxRowsInMemory: Int?
98+
99+
/// The number of bytes to accumulate in heap memory before persisting. The value is based on a rough estimate of memory usage and not actual usage. Normally, Druid computes the value
100+
public let maxBytesInMemory: Int?
101+
102+
/// The calculation of maxBytesInMemory takes into account overhead objects created during ingestion and each intermediate persist. To exclude the bytes of these overhead objects from the maxBytesInMemory check, set skipBytesInMemoryOverheadCheck to true.
103+
public let skipBytesInMemoryOverheadCheck: Bool?
104+
105+
/// The number of rows to store in a segment. This number is post-aggregation rows. Handoff occurs when maxRowsPerSegment or maxTotalRows is reached or every intermediateHandoffPeriod, whichever happens first.
106+
public let maxRowsPerSegment: Int?
107+
108+
/// The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff happens either if maxRowsPerSegment or maxTotalRows is reached or every intermediateHandoffPeriod, whichever happens earlier.
109+
public let maxTotalRows: Int?
110+
111+
/// ISO 8601 period The period that determines how often tasks hand off segments. Handoff occurs if maxRowsPerSegment or maxTotalRows is reached or every intermediateHandoffPeriod, whichever happens first.
112+
public let intermediateHandoffPeriod: String?
113+
114+
/// ISO 8601 period The period that determines the rate at which intermediate persists occur.
115+
public let intermediatePersistPeriod: String?
116+
117+
/// Maximum number of persists that can be pending but not started. If a new intermediate persist exceeds this limit, Druid blocks ingestion until the currently running persist finishes. One persist can be running concurrently with ingestion, and none can be queued up. The maximum heap memory usage for indexing scales is maxRowsInMemory * (2 + maxPendingPersists).
118+
public let maxPendingPersists: Int?
119+
120+
/// Defines segment storage format options to use at indexing time
121+
public let indexSpec: IndexSpec?
122+
123+
/// Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use indexSpecForIntermediatePersists to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published.
124+
public let indexSpecForIntermediatePersists: IndexSpec?
125+
126+
/// DEPRECATED. If true, Druid throws exceptions encountered during parsing causing ingestion to halt. If false, Druid skips unparseable rows and fields. Setting reportParseExceptions to true overrides existing configurations for maxParseExceptions and maxSavedParseExceptions, setting maxParseExceptions to 0 and limiting maxSavedParseExceptions to not more than 1.
127+
public let reportParseExceptions: Bool?
128+
129+
/// Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely.
130+
public let handoffConditionTimeout: Int?
131+
132+
/// Resets partitions when the sequence number is unavailable. If set to true, Druid resets partitions to the earliest or latest offset, based on the value of useEarliestSequenceNumber or useEarliestOffset (earliest if true, latest if false). If set to false, Druid surfaces the exception causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially through resetting the supervisor.
133+
public let resetOffsetAutomatically: Bool?
134+
135+
/// The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.
136+
public let workerThreads: Int?
137+
138+
/// The number of times Druid retries HTTP requests to indexing tasks before considering tasks unresponsive.
139+
public let chatRetries: Int?
140+
141+
/// ISO 8601 period The period of time to wait for a HTTP response from an indexing task.
142+
public let httpTimeout: String?
143+
144+
/// ISO 8601 period The period of time to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.
145+
public let shutdownTimeout: String?
146+
147+
/// ISO 8601 period Determines how often the supervisor queries the streaming source and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value of PT5S, the supervisor ignores the value and uses the minimum value instead.
148+
public let offsetFetchPeriod: String?
149+
150+
// not implemented: segmentWriteOutMediumFactory
151+
152+
/// If true, Druid logs an error message when a parsing exception occurs, containing information about the row where the error occurred.
153+
public let logParseExceptions: Bool?
154+
155+
/// The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Setting reportParseExceptions overrides this limit.
156+
public let maxParseExceptions: Int?
157+
158+
/// When a parse exception occurs, Druid keeps track of the most recent parse exceptions. maxSavedParseExceptions limits the number of saved exception instances. These saved exceptions are available after the task finishes in the task completion report. Setting reportParseExceptions overrides this limit.
159+
public let maxSavedParseExceptions: Int?
160+
161+
/// Used by druid, but not documented
162+
public let numPersistThreads: Int?
163+
164+
/// Used by druid, but not documented
165+
public let appendableIndexSpec: AppendableIndexSpec?
166+
}

Sources/DataTransferObjects/Druid/configuration/TuningConfig/KinesisTuningConfig.swift

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,4 @@ public struct KinesisTuningConfig: Codable, Hashable, Equatable, Sendable {
165165
public let appendableIndexSpec: AppendableIndexSpec?
166166
}
167167

168-
public struct AppendableIndexSpec: Codable, Hashable, Equatable, Sendable {
169-
public init(type: String, preserveExistingMetrics: Bool? = nil) {
170-
self.type = type
171-
self.preserveExistingMetrics = preserveExistingMetrics
172-
}
173168

174-
public let type: String
175-
public let preserveExistingMetrics: Bool?
176-
}

Sources/DataTransferObjects/Druid/configuration/TuningConfig/TuningConfig.swift

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
public indirect enum TuningConfig: Codable, Hashable, Equatable, Sendable {
22
case kinesis(KinesisTuningConfig)
3+
case kafka(KafkaTuningConfig)
34
case indexParallel(IndexParallelTuningConfig)
4-
// case kafka not implemented
55

66
enum CodingKeys: String, CodingKey {
77
case type
@@ -14,6 +14,8 @@ public indirect enum TuningConfig: Codable, Hashable, Equatable, Sendable {
1414
switch type {
1515
case "kinesis":
1616
self = try .kinesis(KinesisTuningConfig(from: decoder))
17+
case "kafka":
18+
self = try .kafka(KafkaTuningConfig(from: decoder))
1719
case "index_parallel":
1820
self = try .indexParallel(IndexParallelTuningConfig(from: decoder))
1921

@@ -26,9 +28,12 @@ public indirect enum TuningConfig: Codable, Hashable, Equatable, Sendable {
2628
var container = encoder.container(keyedBy: CodingKeys.self)
2729

2830
switch self {
29-
case let .kinesis(ioConfig):
31+
case let .kinesis(tuningConfig):
3032
try container.encode("kinesis", forKey: .type)
31-
try ioConfig.encode(to: encoder)
33+
try tuningConfig.encode(to: encoder)
34+
case let .kafka(tuningConfig):
35+
try container.encode("kafka", forKey: .type)
36+
try tuningConfig.encode(to: encoder)
3237
case let .indexParallel(tuningConfig):
3338
try container.encode("index_parallel", forKey: .type)
3439
try tuningConfig.encode(to: encoder)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/// https://druid.apache.org/docs/latest/ingestion/kafka-ingestion/#consumer-properties
2+
public struct KafkaIndexTaskConsumerProperties: Codable, Hashable, Equatable, Sendable {
3+
public init(bootstrapServers: String) {
4+
/// <BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...
5+
self.bootstrapServers = bootstrapServers
6+
}
7+
8+
public let bootstrapServers: String
9+
10+
private enum CodingKeys: String, CodingKey {
11+
case bootstrapServers = "bootstrap.servers"
12+
}
13+
}

0 commit comments

Comments
 (0)