Date Lakehouse Time Travel
It’s Tuesday afternoon, you’re sitting at your cubicle, and you’re typing away at your keyboard. Earlier in the day, you volunteered to pick up the ticket to modify the ingestion pipeline, but now, you regret everything. You ran the Spark job only to realize you were writing to the production tables. You tried to stop the job, but the damage was already done. You tell yourself it’s only a matter of time before the business analysts and data scientists become aware of the duplicate records. Finally, you muster up the courage to go tell your supervisor.
He responds in a voice reminiscent of boomer growing up in the 70s:
Dude… Haven’t you heard about time travel?
He then proceeds to bring you up to speed on Apache Iceberg…
Apache Iceberg
Apache Iceberg is an open source project created by Netflix. For those of you who are familiar with the Hadoop ecosystem, in the same way that Hbase provides Bigtable-like capabilities on top of HDFS, Iceberg provides Bigtable-like capabilities on top of any distributed file system (i.e. S3, GCS, ABS, HDFS, etc…). In addition to the open table format specification, Apache Iceberg provides atomic commits, concurrent writes, schema evolution, hidden partitioning, and yes, time travel. At the time of this writing, Spark, Flink Dremio, Amazon Athena, Amazon EMR, and Trino all support Iceberg. Whenever we execute a query against a table, under the hood, the engine calls the Iceberg APIs which then write to or read from metadata files that are used to track the data files. Whenever, data in an Iceberg table is overwritten, it actually keeps the old version of the data, and uses the metadata files to give the appearance that it’s no longer there. This process enables Iceberg to “travel back in time”, that is, rollback to a previous snapshot. As we highlighted above, time travel particularly useful in the event of a mistake such as deleting inserting, or overwriting rows in a production table.
If you’re interested in learning more about Iceberg, Jason Hughes gives an amazing talk on the subject.
Time Travel Example
If you visit the official Apache Iceberg blog, you’ll find a post on how to get up and running with Iceberg quickly. Go ahead and copy the Docker Compose file into an editor of your choice. Then open the command line and run docker-compose up -d
to start the containers in the background.
Start a Jupyter notebook server by running the following command:
docker exec -it spark-iceberg pyspark-notebook
Navigate to http://localhost:8888 and create a new notebook by clicking the New drop down menu in the top right corner.
There’s no need to start a new Spark session as one has already been created for us. We make use of the Spark session (accessible via the spark variable) to read the yellow taxi dataset that is already present in the Docker image.
taxis_2020_04_df = (spark
.read
.option("header", True)
.option("inferSchema", "true")
.csv("/home/iceberg/data/yello_tripdata_2020-04.csv"))
We then create an Iceberg table with the contents of the DataFrame.
taxis_2020_04_df.write.saveAsTable("taxis")
We verify it worked by executing a query against the table.
%%sql
SELECT COUNT(*) as count FROM taxis
count
237993
Next, we simulate new data being written to the data warehouse. We retrieve the taxi data for the next month and copy it to the Docker container.
docker cp yellow_tripdata_2020-05.csv spark-iceberg:/home/iceberg/data/yellow_tripdata_2020-05.csv
Then, we read the CSV and write its contents to the Iceberg table we created previously.
taxis_2020_05_df = (spark
.read
.option("header", True)
.option("inferSchema", "true")
.csv("/home/iceberg/data/yellow_tripdata_2020-05.csv"))
(taxis_2020_05_df.write
.format("iceberg")
.mode("append")
.save("taxis"))
We confirm that the count increased.
%%sql
SELECT COUNT(*) as count FROM taxis
count
586364
In another notebook, we import the libraries necessary for training a linear regression model.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
Then, we create a DataFrame with the contents of the taxis table.
taxis_df = spark.table("taxis")
For simplicity, we make use of a single feature: trip distance
vector_assembler = VectorAssembler(inputCols = ['trip_distance'], outputCol = 'features')
In Spark, prior to training the model, we must first vectorize the features.
df = vector_assembler.transform(taxis_df)
df = df.select(['features', 'fare_amount'])
df.show(3)
+--------+-----------+
|features|fare_amount|
+--------+-----------+
| [1.2]| 5.5|
| [3.4]| 12.5|
| [2.8]| 10.0|
+--------+-----------+
We set a portion of the data aside for testing.
train, test = df.randomSplit([0.75, 0.25], seed=12345)
We instantiate a new instance of the LinearRegression class, specifying the features column and label column.
lr = LinearRegression(
maxIter=10,
featuresCol='features',
labelCol='fare_amount'
)
We train the model.
lr_model = lr.fit(train)
Finally, we print the coefficient and intercept.
lr_model.coefficients
0.00019652929570484413
lr_model.intercept
13.845050892077083
In a separate notebook, we simulate accidently writing duplicate data to the data warehouse.
taxis_2020_05_df = (spark
.read
.option("header", True)
.option("inferSchema", "true")
.csv("/home/iceberg/data/yellow_tripdata_2020-05.csv"))
(taxis_2020_05_df.write
.format("iceberg")
.mode("append")
.save("taxis"))
If we re-run the notebook responsible for training the linear regression model, we obtain a completely different coefficient and intercept.
lr_model.coefficients
0.0001802168835686988
lr_model.intercept
14.410187910434347
Let’s see how we could go about rolling back. We can view the table’s history as follows:
%%sql --var history_df
SELECT * FROM taxis.history
As we can see there’s three snapshots. One for the month of April, one for the month of May, and a third for the duplicate data.
We can rollback to the previous snapshot by executing the following:
spark.sql(f"CALL demo.system.rollback_to_snapshot('taxis', 5258479407128618250)")
If we again view the history, we can see that it creates a new snapshot, and sets the is_current_ancestor
column to false for all the snapshots that were created after the snapshot specified in the rollback call.
%%sql
SELECT * FROM taxis.history
If we re-run our machine learning notebook, we get the same result as before.
lr_model.coefficients
0.00019652929570484413
lr_model.intercept
13.845050892077083
Conclusion
When operating a data warehouse where you have multiple parties creating their own table aggregations and data pipelines, you’re bound to encounter mistakes. For example, someone could accidently write data to the wrong table affecting downstream analytics. Fortunately, Apache Iceberg allows you to roll back to a previous snapshot of the data.