-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path02_Flights_Project_Pipeline.py
More file actions
152 lines (119 loc) · 3.21 KB
/
02_Flights_Project_Pipeline.py
File metadata and controls
152 lines (119 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
#
# Bookings Data
#
@dlt.table(
name = "stage_bookings"
)
def stage_bookings():
df = spark.readStream.format("delta")\
.load("/Volumes/flights/bronze/bronze_volume/bookings/data/")
return df
@dlt.view(
name = "transformations_bookings"
)
def transformations_bookings():
df = spark.readStream.table("stage_bookings")
df = df.withColumn("amount", col("amount").cast(DoubleType()))\
.withColumn("modified_date", current_timestamp())\
.withColumn("booking_date", to_date(col("booking_date")))\
.drop("_rescued_data")
return df
rules = {
"rule1": "booking_id IS NOT NULL",
"rule2": "passenger_id IS NOT NULL"
}
@dlt.table(
name = "silver_bookings"
)
@dlt.expect_all_or_drop(rules)
def silver_bookings():
df = spark.readStream.table("transformations_bookings")
return df
#
# Flights Data
#
@dlt.view(
name = "transformations_flights"
)
def transformations_flights():
df = spark.readStream.format("delta")\
.load("/Volumes/flights/bronze/bronze_volume/flights/data/")\
.withColumn("modified_date", current_timestamp())\
.withColumn("flight_date", to_date(col("flight_date")))\
.drop("_rescued_data")
return df
# source info and code
# https://learn.microsoft.com/en-us/azure/databricks/dlt/cdc
dlt.create_streaming_table("silver_flights")
dlt.create_auto_cdc_flow(
target = "silver_flights",
source = "transformations_flights",
keys = ["flight_id"],
sequence_by = col("modified_date"),
stored_as_scd_type = 1
)
#
# Passengers Data
#
@dlt.view(
name = "transformations_pax"
)
def transformations_pax():
df = spark.readStream.format("delta")\
.load("/Volumes/flights/bronze/bronze_volume/customers/data/")\
.withColumn("modified_date", current_timestamp())\
.drop("_rescued_data")
return df
dlt.create_streaming_table("silver_pax")
dlt.create_auto_cdc_flow(
target = "silver_pax",
source = "transformations_pax",
keys = ["passenger_id"],
sequence_by = col("modified_date"),
stored_as_scd_type = 1
)
#
# Airports Data
#
@dlt.view(
name = "transformations_airports"
)
def transformations_airports():
df = spark.readStream.format("delta")\
.load("/Volumes/flights/bronze/bronze_volume/airports/data/")\
.withColumn("modified_date", current_timestamp())\
.drop("_rescued_data")
return df
dlt.create_streaming_table("silver_airports")
dlt.create_auto_cdc_flow(
target = "silver_airports",
source = "transformations_airports",
keys = ["airport_id"],
sequence_by = col("modified_date"),
stored_as_scd_type = 1
)
#
# Silver Business View
#
@dlt.table(
name = "silver_business"
)
def silver_business():
df = dlt.readStream("silver_bookings")\
.join(dlt.readStream("silver_pax"), ["passenger_id"])\
.join(dlt.readStream("silver_flights"), ["flight_id"])\
.join(dlt.readStream("silver_airports"), ["airport_id"])\
.drop("modified_date")
return df
#
# Materialized View
#
@dlt.table(
name = "silver_business_mv"
)
def silver_business_mv():
df = dlt.read("silver_business")
return df