|
| 1 | +<!-- |
| 2 | +Licensed to the Apache Software Foundation (ASF) under one |
| 3 | +or more contributor license agreements. See the NOTICE file |
| 4 | +distributed with this work for additional information |
| 5 | +regarding copyright ownership. The ASF licenses this file |
| 6 | +to you under the Apache License, Version 2.0 (the |
| 7 | +"License"); you may not use this file except in compliance |
| 8 | +with the License. You may obtain a copy of the License at |
| 9 | +
|
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +
|
| 12 | +Unless required by applicable law or agreed to in writing, |
| 13 | +software distributed under the License is distributed on an |
| 14 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | +KIND, either express or implied. See the License for the |
| 16 | +specific language governing permissions and limitations |
| 17 | +under the License. |
| 18 | +--> |
| 19 | + |
| 20 | +# flink-table-planner |
| 21 | + |
| 22 | +Translates and optimizes SQL/Table API programs into executable plans using Apache Calcite. Bridges the Table/SQL API and the runtime by generating code and execution plans. The planner is loaded in a separate classloader (`flink-table-planner-loader`) to isolate Calcite dependencies. |
| 23 | + |
| 24 | +See also [README.md](README.md) for Immutables rule config conventions and JSON plan test regeneration. |
| 25 | + |
| 26 | +## Build Commands |
| 27 | + |
| 28 | +Full table modules rebuild: |
| 29 | + |
| 30 | +``` |
| 31 | +./mvnw clean install -T1C -DskipTests -Pskip-webui-build -pl flink-table/flink-table-common,flink-table/flink-sql-parser,flink-table/flink-table-planner-loader,flink-table/flink-table-planner,flink-table/flink-table-api-java -am |
| 32 | +``` |
| 33 | + |
| 34 | +After the first full build, drop `-am` for faster rebuilds when you're only changing code within these modules. |
| 35 | + |
| 36 | +## Key Directory Structure |
| 37 | + |
| 38 | +- `plan/rules/physical/stream/` and `plan/rules/physical/batch/` — Physical planner rules |
| 39 | +- `plan/rules/logical/` — Logical optimization rules |
| 40 | +- `plan/nodes/exec/stream/` and `plan/nodes/exec/batch/` — ExecNodes (bridge between planner and runtime) |
| 41 | +- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, WindowSpec, etc.) |
| 42 | +- `plan/nodes/physical/stream/` and `plan/nodes/physical/batch/` — Intermediate physical nodes (Calcite-based) |
| 43 | +- `plan/nodes/logical/` — Logical nodes (Calcite-based) |
| 44 | +- `codegen/` — Code generation |
| 45 | +- `codegen/calls/` — Custom code generators for specific functions (e.g., `JsonCallGen.scala`) |
| 46 | +- `functions/casting/` — Cast rules for code generation (e.g., `BinaryToBinaryCastRule`, `StringToTimeCastRule`) |
| 47 | +- `functions/` — Function management and inference |
| 48 | +- `catalog/` — Catalog integration |
| 49 | + |
| 50 | +## Key Abstractions |
| 51 | + |
| 52 | +- **ExecNode**: Bridge between planner and runtime. Annotated with `@ExecNodeMetadata(name, version, minPlanVersion, minStateVersion)` for versioning and backwards compatibility. Extends `ExecNodeBase<T>` and implements either `StreamExecNode<T>` (streaming) or `BatchExecNode<T>` (batch); `T` is typically `RowData`. |
| 53 | +- **Physical rules**: Extend `RelRule`, use Immutables `@Value.Immutable` for config. Transform logical nodes to physical nodes. Registered in `FlinkStreamRuleSets` and/or `FlinkBatchRuleSets`. |
| 54 | +- **Logical optimization rules**: Also extend `RelRule`, often use `RexShuttle` for expression rewriting. Registered in rule sets. |
| 55 | +- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, WindowSpec, etc.) that carry operator configuration. |
| 56 | + |
| 57 | +## Common Change Patterns |
| 58 | + |
| 59 | +### Adding a new table operator |
| 60 | + |
| 61 | +Components involved (can be developed top-down or bottom-up): |
| 62 | + |
| 63 | +1. **Runtime operator** in `flink-table-runtime` under `operators/` (extend `TableStreamOperator`, implement `OneInputStreamOperator` or `TwoInputStreamOperator`). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md). |
| 64 | +2. **ExecNode** in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase<T>`; implement `StreamExecNode<T>` for streaming or `BatchExecNode<T>` for batch; annotate with `@ExecNodeMetadata`; `T` is typically `RowData`) |
| 65 | +3. **Physical Node + Physical Rules** in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (physical rules usually extend `ConverterRule` via `Config.INSTANCE.withConversion(...)`; same-convention rewrites extend `RelRule` with an `@Value.Immutable` config) |
| 66 | +4. **Logical Node + Planner rule** |
| 67 | +5. Tests: semantic tests, plan tests, restore tests (if stateful) |
| 68 | + |
| 69 | +Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both. |
| 70 | + |
| 71 | +### Adding a planner optimization rule |
| 72 | + |
| 73 | +Pick the base class by what the rule does: |
| 74 | +- Converts a node from one calling convention to another (for example, logical → stream physical): extend `ConverterRule`. |
| 75 | +Call `ConverterRule.Config.INSTANCE.withConversion(...)` in the constructor, do not define your own config. |
| 76 | +- Rewrites nodes within the same convention (logical → logical, physical → physical): extend `RelRule` with an `@Value.Immutable` config. |
| 77 | +Some existing rules still use Calcite's older `RelOptRule`; prefer `RelRule` for new code. |
| 78 | + |
| 79 | +Then: |
| 80 | +1. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala` |
| 81 | +2. Plan tests with XML golden files — when the test fails, copy the framework's generated log file over the reference `.xml` (cases are ordered alphabetically by method name) |
| 82 | +3. A same-convention rewrite needs no runtime changes. A `ConverterRule` that produces a new physical node also needs the physical node, ExecNode, and runtime operator — see "Adding a new table operator" above. |
| 83 | + |
| 84 | +### Extending SQL syntax |
| 85 | + |
| 86 | +1. Modify parser grammar in `flink-sql-parser` (`parserImpls.ftl`) |
| 87 | +2. Add operation conversion logic in `SqlNodeToOperationConversion.java` |
| 88 | +3. Test with parser tests and SQL gateway integration tests (`.q` files) |
| 89 | + |
| 90 | +### Code generation changes |
| 91 | + |
| 92 | +- Cast rules live in `functions/casting/`. Each extends `AbstractExpressionCodeGeneratorCastRule` or similar. |
| 93 | +- Custom call generators for functions live in `codegen/calls/` (e.g., `JsonCallGen.scala`). Simple scalar functions typically don't need these; the planner handles them uniformly through the function definition. |
| 94 | +- Immutables library is used for rule configs (`@Value.Immutable`, `@Value.Enclosing`). See [README.md](README.md). |
| 95 | + |
| 96 | +### Plan serialization changes |
| 97 | + |
| 98 | +- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility. |
| 99 | +- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced. |
| 100 | + |
| 101 | +### ExecNode versioning |
| 102 | + |
| 103 | +When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version. |
| 104 | + |
| 105 | +### Configuration options |
| 106 | + |
| 107 | +New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes). |
| 108 | + |
| 109 | +## Testing Patterns |
| 110 | + |
| 111 | +Choose test types based on what you're changing: |
| 112 | + |
| 113 | +- **Semantic tests** (for ExecNode/operator changes): Use `SemanticTestBase` (streaming) or `BatchSemanticTestBase` (batch) in `plan/nodes/exec/testutils/`. Extends `CommonSemanticTestBase` which implements `TableTestProgramRunner`. Prefer these over ITCase for operators and ExecNodes. |
| 114 | +- **Restore tests** (for stateful operators): Use `RestoreTestBase` or `BatchRestoreTestBase` in `plan/nodes/exec/testutils/`. Implements `TableTestProgramRunner`, uses `@ExtendWith(MiniClusterExtension.class)`. Required when your operator uses state. Tests savepoint creation and job restart in two phases: (1) generate compiled plans + savepoints, (2) verify recovery. |
| 115 | +- **Plan tests** (for optimization rules): Verify the generated execution plan using XML golden files. Used for logical and physical optimization rules. |
| 116 | +- **ITCase** (for built-in functions): Function tests typically use ITCase with `TestSetSpec` for end-to-end verification (e.g., `JsonFunctionsITCase`, `TimeFunctionsITCase`). |
| 117 | +- **JSON plan test regeneration:** Set `PLAN_TEST_FORCE_OVERWRITE=true` environment variable (documented in [README.md](README.md)). |
0 commit comments