Context
Imagine that you have AWS DynamoDB tables called books and orders. Table books is relatively small but frequently updated and read. On the other hand table orders is huge but documents are immutable.
We want to be able to query documents from DynamoDB tables in AWS Athena. For table books it is important that we reflect changes in Athena near realtime, minutes are OK. However orders can be synchronize at least once a day.
The very basic setup for books looks like this:
---
title: Books
theme: forest
darkMode: true
---
flowchart TD
cron(EventBridge Scheduler) -- runs every minute --> lambda(Lambda Function)
lambda -- full scan --> ddb_books("DynamoDB books")
lambda -- save as a new file --> S3
lambda -- remove previous file --> S3
For orders
---
title: Orders
theme: dark
---
flowchart TD
cron(EventBridge Scheduler) -- runs every 24h --> lambda(Lambda Function)
lambda -- trigger full export --> ddb_orders("DynamoDB order")
ddb_orders -- wait for export to complete and copy files --> S3
lambda -- remove old files --> S3
The problem
Due to the nature of AWS Athena at the beginning of query execution it list all files to read data from. During the time when Athena is reading data we can’t manipulate files (in our case – remove them). It throws an error then:
HIVE_CANNOT_OPEN_SPLIT: Error opening Hive split
Requirements
Near realtime
It is important to keep books table up to date in Athena as quickly as it’s possible, delay of maximum a minute or two.
Cost effective
For sure we can utilize Kinesis and stream to S3, save each record as separate file but it will cost a fortune. For table orders we don’t have to be near realtime. We can synchronize it every 24h. It’s fine.
Solutions to maintain
We would like to limit number of solution to minimum. It would be great to keep only one type of solution.
Options
I did a research and narrowed it down to few options to solve the problem.
Locks
To prevent reading while modifying files we can implement locking mechanism to wait until writes finish. It’s not going to solve entire problem, when writes start during reading it still going to fail.
Costs
We need to keep lock in one place, it can be Redis, if you use one. I don’t recommend spinning up new instance of Redis just for this. There are other solution like DynamoDB.
Firehose
AWS has a dedicated service to handle that, it’s called AWS Firehose.
Amazon Data Firehose provides the easiest way to acquire, transform, and deliver data streams within seconds to data lakes, data warehouses, and analytics services. To use Amazon Data Firehose, you set up a stream with a source, destination, and required transformations. Amazon Data Firehose continuously processes the stream, automatically scales based on the amount of data available, and delivers it within seconds.
Costs
To ingest 20GB of data a day it costs monthly 33.46 USD. Costs of data transformation is not included.
Athena merge into
AWS Athena has support for Apache Iceberg™, which includes the powerful merge into feature. It can basically perform operation of upserting records without blocking any other readings.
Spark 3 added support for MERGE INTO queries that can express row-level updates.
Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated in an overwrite commit.
More about merge into: https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
Example
merge into library.books as b
using (
select m.name
, m.id
, m.author
, m.summary
, from_unixtime_nanos(cast(m.date as bigint)) published_at
from library.books_source m
) as m
on (m.id = b.id)
when matched
then update set name = m.name
, author = m.author
, summary = m.summary
, published_at = m.published_at
when not matched
then insert (
id
, name
, address
, email
, ts
)
values (
m.id
, m.name
, m.address
, m.email
, m.ts
)
Costs
There is no addition upfront costs of using merge into. Please keep in mind that we lose control over the files structure. We can’t predict how many files Apache Iceberg is going to create under the hood. We can manipulate it to some degree by using bucketing and partitions but it’s limited.
Solution
The solution that I choose is a hybrid, for orders table I’m going to test AWS Firehose because it doesn’t sound super expensive, but for books I will utilize AWS merge into approach.
Merge into flow for books table
First we need a AWS S3 Bucket for two tables, books and books_source. Each should have different prefix.
bookswill be a place where Athena administrate files for records in Apache Iceberg tablebooks_sourcewill be a permanent table with temporary data, will serve as a source table for records to be merged intobooks
Flow
- AWS Lambda function listens on any insert/update/delete in DynamoDB
bookstable and save it to S3 withbooks_sourceprefix and the current timestamp as file name - AWS EventBridge Scheduler is set to run every minute and run
merge intoAthena’s prepared statement - Store in memory the current timestamp
- Lambda function runs
merge_intoquery only for records BEFORE current timestamp - Lambda function cleans up all files in
bucket_sourceBEFORE current timestamp
How successful the solution is?
In 6 months I’m going to revisit this post and tell you more about how the solution worked for me.