Synchronize DynamoDB to Athena

Context

Imagine that you have AWS DynamoDB tables called books and orders. Table books is relatively small but frequently updated and read. On the other hand table orders is huge but documents are immutable.

We want to be able to query documents from DynamoDB tables in AWS Athena. For table books it is important that we reflect changes in Athena near realtime, minutes are OK. However orders can be synchronize at least once a day.

The very basic setup for books looks like this:

---
title: Books
theme: forest
darkMode: true
---
flowchart TD
    cron(EventBridge Scheduler) -- runs every minute --> lambda(Lambda Function)
    lambda -- full scan --> ddb_books("DynamoDB books")
    lambda -- save as a new file --> S3
    lambda -- remove previous file --> S3

For orders

---
title: Orders
theme: dark
---
flowchart TD
    cron(EventBridge Scheduler) -- runs every 24h --> lambda(Lambda Function)
    lambda -- trigger full export --> ddb_orders("DynamoDB order")
    ddb_orders -- wait for export to complete and copy files --> S3
    lambda -- remove old files --> S3

The problem

Due to the nature of AWS Athena at the beginning of query execution it list all files to read data from. During the time when Athena is reading data we can’t manipulate files (in our case – remove them). It throws an error then:

HIVE_CANNOT_OPEN_SPLIT: Error opening Hive split 

Requirements

Near realtime

It is important to keep books table up to date in Athena as quickly as it’s possible, delay of maximum a minute or two.

Cost effective

For sure we can utilize Kinesis and stream to S3, save each record as separate file but it will cost a fortune. For table orders we don’t have to be near realtime. We can synchronize it every 24h. It’s fine.

Solutions to maintain

We would like to limit number of solution to minimum. It would be great to keep only one type of solution.

Options

I did a research and narrowed it down to few options to solve the problem.

Locks

To prevent reading while modifying files we can implement locking mechanism to wait until writes finish. It’s not going to solve entire problem, when writes start during reading it still going to fail.

Costs

We need to keep lock in one place, it can be Redis, if you use one. I don’t recommend spinning up new instance of Redis just for this. There are other solution like DynamoDB.

Firehose

AWS has a dedicated service to handle that, it’s called AWS Firehose.

Amazon Data Firehose provides the easiest way to acquire, transform, and deliver data streams within seconds to data lakes, data warehouses, and analytics services. To use Amazon Data Firehose, you set up a stream with a source, destination, and required transformations. Amazon Data Firehose continuously processes the stream, automatically scales based on the amount of data available, and delivers it within seconds.

Costs

To ingest 20GB of data a day it costs monthly 33.46 USD. Costs of data transformation is not included.

Athena merge into

AWS Athena has support for Apache Iceberg™, which includes the powerful merge into feature. It can basically perform operation of upserting records without blocking any other readings.

Spark 3 added support for MERGE INTO queries that can express row-level updates.

Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated in an overwrite commit.

More about merge into: https://iceberg.apache.org/docs/latest/spark-writes/#merge-into

Example

merge into library.books as b 
using (
    select m.name
        , m.id
        , m.author
        , m.summary
        , from_unixtime_nanos(cast(m.date as bigint)) published_at
    from library.books_source m
) as m
on (m.id = b.id)
when matched
    then update set name = m.name
                , author = m.author
                , summary = m.summary
                , published_at = m.published_at
when not matched
    then insert (
        id
        , name
        , address
        , email
        , ts
    )
    values (
        m.id
        , m.name
        , m.address
        , m.email
        , m.ts
    )

Costs

There is no addition upfront costs of using merge into. Please keep in mind that we lose control over the files structure. We can’t predict how many files Apache Iceberg is going to create under the hood. We can manipulate it to some degree by using bucketing and partitions but it’s limited.

Solution

The solution that I choose is a hybrid, for orders table I’m going to test AWS Firehose because it doesn’t sound super expensive, but for books I will utilize AWS merge into approach.

Merge into flow for books table

First we need a AWS S3 Bucket for two tables, books and books_source. Each should have different prefix.

  1. books will be a place where Athena administrate files for records in Apache Iceberg table
  2. books_source will be a permanent table with temporary data, will serve as a source table for records to be merged into books

Flow

  1. AWS Lambda function listens on any insert/update/delete in DynamoDB books table and save it to S3 with books_source prefix and the current timestamp as file name
  2. AWS EventBridge Scheduler is set to run every minute and run merge into Athena’s prepared statement
  3. Store in memory the current timestamp
  4. Lambda function runs merge_into query only for records BEFORE current timestamp
  5. Lambda function cleans up all files in bucket_source BEFORE current timestamp

How successful the solution is?

In 6 months I’m going to revisit this post and tell you more about how the solution worked for me.