All Articles

Automatic Creation of Athena partitions for Firehose delivery streams

AWS Firehose allows you to create delivery streams which would collect the data and store it in S3 in plain files. It is very convenient if you want to save some data from your live system for offline processing. Firehose automatically puts your data into a hierarchy of S3 folders organized as follows year/month/day/hour.

AWS Athena allows querying the data stored by Firehose delivery streams. The problem is that by default Athena will scan the data for all dates which will be quite expensive. To reduce the amount of scanned data, Athena allows you define partitions, for example, for every day. This way you restrict the amount of data scanned for a particular query. Unfortunately, automatic partitioning that Athen offers is not compatible with the folder structure produced by the Firehose. But there is a way to automate the creation of partitions using AWS Lambda.

The following code can be used in a Lambda function to create a partition for the current day automatically:

const AWS = require('aws-sdk');

const athena = new AWS.Athena({
  version: '2017-05-18',
  region: 'us-east-1',
});

async function runQuery({ QueryString, UniqueRequestId }) {
  console.log('running query', QueryString);
  const params = {
    QueryString,
    ResultConfiguration: {
      OutputLocation: 's3://YOUR_RESULTS_BUCKET',
    },
    ClientRequestToken: UniqueRequestId,
  };
  return await athena.startQueryExecution(params).promise();
}

async function getQueryExecution(QueryExecutionId) {
  const params = {
    QueryExecutionId,
  };
  return await athena.getQueryExecution(params).promise();
}

async function delay(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function createPartition(
  year,
  month,
  day,
) {
  const date = new Date()
    .toISOString()
    .split('T')[0]
    .split('-');
  yeay = year || date[0];
  month = month || date[1];
  day = day || date[2];
  const { QueryExecutionId } = await runQuery({
    QueryString: `ALTER TABLE default.YOUR_TABLE_NAME
    ADD PARTITION (year='${year}',month='${month}',day='${day}')
    location 's3://YOUR_S3_DATA_BUCKET/${year}/${month}/${day}/'`,
    UniqueRequestId: `automatic-firehose-partitioning-${year}-${month}-${day}`,
  });
  for (let attempt = 0; attempt < 10; attempt++) {
    const result = await getQueryExecution(QueryExecutionId);
    const state = result.QueryExecution.Status.State;
    switch (state) {
      case 'RUNNING':
      case 'QUEUED':
        console.log(
          'query is queued or running, retrying in ',
          Math.pow(2, attempt + 1) * 100,
          'ms',
        );
        await delay(Math.pow(2, attempt + 1) * 100);
        break;
      case 'SUCCEEDED':
        return true;
      case 'FAILED':
        console.log('query failed');
        throw new Error(result.QueryExecution.Status.StateChangeReason);
      case 'CANCELLED':
        console.log('query is cancelled');
        return;
    }
  }
}

exports.createPartition = createPartition;

Query execution in Athena is asynchronous and requires an S3 bucket to store results. You can replace YOUR_RESULTS_BUCKET with the name of an S3 bucket where you want to store query results. YOUR_S3_DATA_BUCKET is the name of the bucket where Firehose puts your data. createPartition function runs the query to create a partition for the current day by default. It makes ten attempts to fetch the result of the execution and prints the outcome.

The critical part for this to work is to have correct IAM permissions on the Lambda function:

- Effect: Allow
  Action:
  - athena:*
  Resource:
  - "*"
- Effect: Allow
  Action:
    - glue:CreateDatabase
    - glue:DeleteDatabase
    - glue:GetDatabase
    - glue:GetDatabases
    - glue:UpdateDatabase
    - glue:CreateTable
    - glue:DeleteTable
    - glue:BatchDeleteTable
    - glue:UpdateTable
    - glue:GetTable
    - glue:GetTables
    - glue:BatchCreatePartition
    - glue:CreatePartition
    - glue:DeletePartition
    - glue:BatchDeletePartition
    - glue:UpdatePartition
    - glue:GetPartition
    - glue:GetPartitions
    - glue:BatchGetPartition
  Resource:
    - "*"
- Effect: Allow
  Action:
    - s3:GetBucketLocation
    - s3:GetObject
    - s3:ListBucket
    - s3:ListBucketMultipartUploads
    - s3:ListMultipartUploadParts
    - s3:AbortMultipartUpload
    - s3:CreateBucket
    - s3:PutObject
  Resource:
    - arn:aws:s3:::aws-athena-query-results-*
    - arn:aws:s3:::YOUR_RESULTS_BUCKET*
    - arn:aws:s3:::YOUR_S3_DATA_BUCKET*

You can set up the Lambda function to run every night (at 1 am) using CloudWatch Events. For this create a rule with the following cron expression 0 01 * * ? * using AWS Console.

Discussion