Skip to content

Commit 826b65c

Browse files
authored
Merge pull request #6679 from Flowminder/delarative-partitioning
Delarative partitioning
2 parents 4b89707 + 693c4ab commit 826b65c

14 files changed

Lines changed: 56 additions & 193 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
1313
- FlowKit test and synthetic data now uses parquet foreign tables.
1414
> [!WARNING]
1515
> The location of the parquet files in the container is `/parquet_data`, if you are testing with larger amounts of data you may wish to add an additional bind mount for this location.
16+
- FlowDB now uses [declarative partitioning](https://www.postgresql.org/docs/current/ddl-partitioning.html#DDL-PARTITIONING-DECLARATIVE)
17+
- FlowETL now attached new data as partitions, rather than subtables
18+
> [!WARNING]
19+
> This change is not backwards compatible with earlier releases of FlowDB, and you will need to repopulate your deployment. We recommend combining this change with the new parquet support.
1620
1721
### Fixed
1822

flowdb/bin/build/0020_schema_events.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ CREATE SCHEMA IF NOT EXISTS events;
6464
operator_code NUMERIC,
6565
country_code NUMERIC
6666

67-
);
67+
) PARTITION BY RANGE (datetime);
6868

6969
CREATE TABLE IF NOT EXISTS events.forwards(
7070

@@ -87,7 +87,7 @@ CREATE SCHEMA IF NOT EXISTS events;
8787
operator_code NUMERIC,
8888
country_code NUMERIC
8989

90-
);
90+
) PARTITION BY RANGE (datetime);
9191

9292
CREATE TABLE IF NOT EXISTS events.sms(
9393

@@ -108,7 +108,7 @@ CREATE SCHEMA IF NOT EXISTS events;
108108
operator_code NUMERIC,
109109
country_code NUMERIC
110110

111-
);
111+
) PARTITION BY RANGE (datetime);
112112

113113
CREATE TABLE IF NOT EXISTS events.mds(
114114

@@ -131,7 +131,7 @@ CREATE SCHEMA IF NOT EXISTS events;
131131
operator_code NUMERIC,
132132
country_code NUMERIC
133133

134-
);
134+
) PARTITION BY RANGE (datetime);
135135

136136
CREATE TABLE IF NOT EXISTS events.topups(
137137

@@ -156,7 +156,7 @@ CREATE SCHEMA IF NOT EXISTS events;
156156
operator_code NUMERIC,
157157
country_code NUMERIC
158158

159-
);
159+
) PARTITION BY RANGE (datetime);
160160

161161
CREATE TABLE IF NOT EXISTS events.location_ids (
162162
location_id TEXT,

flowdb/testdata/bin/generate_synthetic_data.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,7 @@ def write_day_csv(subscribers, cells, date, num_calls, call_seed, output_root_di
224224
calls_df_twoline.to_csv(fpath, index=False)
225225

226226
ingest_sql = """
227-
CREATE TABLE IF NOT EXISTS events.calls_{table} (
228-
CHECK ( datetime >= '{table}'::TIMESTAMPTZ
229-
AND datetime < '{end_date}'::TIMESTAMPTZ)
230-
) INHERITS (events.calls);
231-
ALTER TABLE events.calls_{table} NO INHERIT events.calls;
227+
CREATE TABLE IF NOT EXISTS events.calls_{table} PARTITION OF events.calls FOR VALUES FROM ('{table}') TO ('{end_date}');
232228
233229
COPY events.calls_{table}( datetime,msisdn_counterpart,id,msisdn,location_id,outgoing,duration,tac )
234230
FROM '{output_root_dir}/data/records/calls/calls_{table}.csv'
@@ -241,8 +237,7 @@ def write_day_csv(subscribers, cells, date, num_calls, call_seed, output_root_di
241237
CREATE INDEX ON events.calls_{table} (location_id);
242238
CREATE INDEX ON events.calls_{table} (datetime);
243239
CLUSTER events.calls_{table} USING calls_{table}_msisdn_idx;
244-
ANALYZE events.calls_{table};
245-
ALTER TABLE events.calls_{table} INHERIT events.calls;""".format(
240+
ANALYZE events.calls_{table};""".format(
246241
output_root_dir=output_root_dir,
247242
table=date.strftime("%Y%m%d"),
248243
end_date=(date + datetime.timedelta(days=1)).strftime("%Y%m%d"),

flowdb/testdata/bin/generate_synthetic_data_sql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ def log_duration(job: str, **kwargs):
672672
attach_sql.append(
673673
(
674674
f"Attaching events.{sub}_{table}",
675-
f"ALTER TABLE events.{sub}_{table} INHERIT events.{sub};",
675+
f"ALTER TABLE events.{sub} ATTACH PARTITION events.{sub}_{table} FOR VALUES FROM ('{table}') TO ('{(date + datetime.timedelta(days=1)).strftime('%Y%m%d')}');",
676676
)
677677
)
678678
if args.cluster:

flowdb/testdata/test_data/py/zz_convert_events_to_parquet.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,14 @@
3939

4040
MOUNT_PARQUET_SQL = """
4141
DROP TABLE events.{table_name};
42-
CREATE FOREIGN TABLE events.{table_name} ()
43-
INHERITS (events.{event_type})
42+
CREATE FOREIGN TABLE events.{table_name}
43+
PARTITION OF events.{event_type}
44+
FOR VALUES FROM ('{start_date}') TO ('{end_date}')
4445
SERVER parquet_srv
4546
OPTIONS(
4647
filename '{parquet_path}'
4748
);
48-
49-
--ALTER TABLE events.{event_type}
50-
--ATTACH PARTITION events.{table_name}
51-
--FOR VALUES FROM ('{start_date}') TO ('{end_date}');
52-
49+
ANALYZE events.{table_name};
5350
"""
5451

5552
PARQUET_COL_DTYPE_MAPPING = {

flowdb/testdata/test_data/sql/ingest_calls.sql

Lines changed: 8 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flowdb/testdata/test_data/sql/ingest_mds.sql

Lines changed: 8 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flowdb/testdata/test_data/sql/ingest_sms.sql

Lines changed: 7 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)