Context
Imagine that you have AWS DynamoDB tables called books
and orders
. Table books
is relatively small but frequently updated and read. On the other hand table orders
is huge but documents are immutable.
We want to be able to query documents from DynamoDB tables in AWS Athena. For table books
it is important that we reflect changes in Athena near realtime, minutes are OK. However orders
can be synchronize at least once a day.
The very basic setup for books
looks like this:
--- title: Books theme: forest darkMode: true --- flowchart TD cron(EventBridge Scheduler) -- runs every minute --> lambda(Lambda Function) lambda -- full scan --> ddb_books("DynamoDB books") lambda -- save as a new file --> S3 lambda -- remove previous file --> S3
For orders
--- title: Orders theme: dark --- flowchart TD cron(EventBridge Scheduler) -- runs every 24h --> lambda(Lambda Function) lambda -- trigger full export --> ddb_orders("DynamoDB order") ddb_orders -- wait for export to complete and copy files --> S3 lambda -- remove old files --> S3
The problem
Due to the nature of AWS Athena at the beginning of query execution it list all files to read data from. During the time when Athena is reading data we can’t manipulate files (in our case – remove them). It throws an error then:
HIVE_CANNOT_OPEN_SPLIT: Error opening Hive split
Requirements
Near realtime
It is important to keep books
table up to date in Athena as quickly as it’s possible, delay of maximum a minute or two.
Cost effective
For sure we can utilize Kinesis and stream to S3, save each record as separate file but it will cost a fortune. For table orders
we don’t have to be near realtime. We can synchronize it every 24h. It’s fine.
Solutions to maintain
We would like to limit number of solution to minimum. It would be great to keep only one type of solution.
Options
I did a research and narrowed it down to few options to solve the problem.
Locks
To prevent reading while modifying files we can implement locking mechanism to wait until writes finish. It’s not going to solve entire problem, when writes start during reading it still going to fail.
Costs
We need to keep lock in one place, it can be Redis, if you use one. I don’t recommend spinning up new instance of Redis just for this. There are other solution like DynamoDB.
Firehose
AWS has a dedicated service to handle that, it’s called AWS Firehose.
Amazon Data Firehose provides the easiest way to acquire, transform, and deliver data streams within seconds to data lakes, data warehouses, and analytics services. To use Amazon Data Firehose, you set up a stream with a source, destination, and required transformations. Amazon Data Firehose continuously processes the stream, automatically scales based on the amount of data available, and delivers it within seconds.
Costs
To ingest 20GB of data a day it costs monthly 33.46 USD. Costs of data transformation is not included.
Athena merge into
AWS Athena has support for Apache Iceberg™, which includes the powerful merge into
feature. It can basically perform operation of upserting records without blocking any other readings.
Spark 3 added support for MERGE INTO queries that can express row-level updates.
Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated in an overwrite commit.
More about merge into
: https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
Example
merge into library.books as b
using (
select m.name
, m.id
, m.author
, m.summary
, from_unixtime_nanos(cast(m.date as bigint)) published_at
from library.books_source m
) as m
on (m.id = b.id)
when matched
then update set name = m.name
, author = m.author
, summary = m.summary
, published_at = m.published_at
when not matched
then insert (
id
, name
, address
, email
, ts
)
values (
m.id
, m.name
, m.address
, m.email
, m.ts
)
Costs
There is no addition upfront costs of using merge into
. Please keep in mind that we lose control over the files structure. We can’t predict how many files Apache Iceberg is going to create under the hood. We can manipulate it to some degree by using bucketing and partitions but it’s limited.
Solution
The solution that I choose is a hybrid, for orders
table I’m going to test AWS Firehose because it doesn’t sound super expensive, but for books
I will utilize AWS merge into
approach.
Merge into flow for books
table
First we need a AWS S3 Bucket for two tables, books
and books_source
. Each should have different prefix.
books
will be a place where Athena administrate files for records in Apache Iceberg tablebooks_source
will be a permanent table with temporary data, will serve as a source table for records to be merged intobooks
Flow
- AWS Lambda function listens on any insert/update/delete in DynamoDB
books
table and save it to S3 withbooks_source
prefix and the current timestamp as file name - AWS EventBridge Scheduler is set to run every minute and run
merge into
Athena’s prepared statement - Store in memory the current timestamp
- Lambda function runs
merge_into
query only for records BEFORE current timestamp - Lambda function cleans up all files in
bucket_source
BEFORE current timestamp
How successful the solution is?
In 6 months I’m going to revisit this post and tell you more about how the solution worked for me.