Thomas Step

← Blog

I recently launched a Slack app to help with channel bloat! Simply installing it would help me out a bunch. I have 2/10 installations required to submit my app to the Slack Marketplace. Thanks for helping me reach that goal.

CloudWatch Logs to Elasticsearch Through Firehose

I recently needed to get CloudWatch Logs to an AWS hosted Elasticsearch cluster via Firehose, and I came across a few sticking points that were not as well documented as I would have hoped. There are quite a few AWS resources involved in getting all of this done. Logs were originally generated by a Lambda function, which wrote to a log group in CloudWatch Logs. I put a subscription filter on that log group with a CloudWatch destination in a different account. That destination forwarded the logs to a firehose delivery stream which used a Lambda function acting as a transformer before the logs reached their final destination of Elasticsearch. Like I said, quite a few resources. The “Lambda function acting as a transformer” is called a “processor” in CloudFormation but it is referred to in documentation about data transformation. I will be referring to this Lambda processor as a transformer since that is its main purpose for the intent of this post.

Getting the CloudWatch Subscription Filter and Destination set up were not too difficult, and there is decent documentation which you can find here. The one caveat that I will note is that in this tutorial, AWS is building the destination for a Kinesis Data Stream, not a Kinesis Data Firehose (sometimes referred to as a Delivery Stream), so in step 5 where they create the permissions policy with "Action": "kinesis:PutRecord" change kinesis to firehose like this "Action": "firehose:PutRecord". (After looking back through that tutorial it appears that they have added an example for Firehose on the same page. The goal for right now is just to get CloudWatch Logs to Firehose however you would like.)

By the time I got to Firehose, all of the infrastructure had already been set up by someone else. There were still a few remaining problems with the Lambda transformer that I had to stumble through though. Firstly, was the part where I missed how the payload from the transformer should be returned. I originally converted out of Base64, transformed the logs, converted back to Base64, and returned. I thought that Firehose would want the records to be formatted the same way they went in. After scouring Elasticsearch for my logs, I determined that I was wrong. I learned that there is a specific format that all transformed records need to be in. The returned payload needs to send back a flat JSON object that looks like this:

{
  "recordId": "<recordId from original record>",
  "result": "<Ok | Dropped | ProcessingFailed>",
  "data": "<base64 transformed record to send>"
}

I looked a little deeper into my mistake and how I should format the data field. CloudWatch Logs sent from a Subscription Filter come in batches but there was only one data field and one recordId for each batch. After a record from a CloudWatch Subscription Filter comes in and is Base64 decoded and unzipped it looks like this:

{
    "owner": "123456789012",
    "logGroup": "CloudTrail",
    "logStream": "123456789012_CloudTrail_us-east-1",
    "subscriptionFilters": [
        "Destination"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "31953106606966983378809025079804211143289615424298221568",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221569",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221570",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        }
    ]
}

That’s when I found a page mentioning combining multiple JSON documents into the same record. This gave me an answer to how I would convert multiple log events into a single Base64 string for the returned data field. After I successfully rewrote the transformer to output the transformed logEvents in single-line JSON, I tested it out. This worked whenever logEvents was only one object long, but it did not work whenever larger batches were passed in. I went back to the drawing board and read something about Elasticseach bulk inserts needing to be newline (\n) delimited. I added a newline in after every JSON object instead of leaving them as single-line JSON just to test it out. This time I got errors back from Elasticsearch saying Malformed content, found extra data after parsing: START_OBJECT. At least this was something concrete to go off of.

I started Googling and found a Reddit thread and AWS support ticket that were tied together. It turns out you need to reingest the records individually so ElasticSearch only receives one log at a time, and I found an example in a deep dark rabbit hole. A solution is mentioned that can be found through a lot of digging on AWS’s site, and I found the GitHub repo for that code. There is an example of reingestion near the bottom of that application. I ended up using putRecordBatch from the AWS SDK to put the logEvents back into Firehose as individual records.

When I converted over to reingesting logEvents as individual records, I finally saw logs in Elasticsearch. But now I saw too many logs. Strange. The logs were being duplicated. When I changed result to Dropped in the response for the original CloudWatch Logs batched record, I changed data to a hardcoded string. What a previously mentioned link (https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-status-model) fails to mention is that you can’t have data if result is Dropped or Firehose will reprocess the record. Once I got rid of the hardcoded string in data in the transformation Lambda’s response, everything clicked and I started seeing my logs in Elasticsearch without being duplicated.

The code in the aforementioned GitHub repo does something that I wanted to highlight. After Base64 decoding and uncompressing a record, check for record.messageType === 'DATA_MESSAGE' (Javascript). This condition signifies that the record is coming straight from CloudWatch Logs but a record without messageType means that the record is being reingested. Making this check also allows Firehose log producers to directly PUT to Firehose as we do in the transformer for reingestion. I would suggest making all of your transformations on the reingested logs only and do not make any logic along the lines of checking how many logEvents are coming from CloudWatch Logs. Just automatically reingest and handle the transformation on the reingested logs, it will make the logic much easier to debug.

There were numerous errors that I had to debug while getting this setup, and the answers were vague and difficult to find. I hope this can be your last stop if you are running into the same issues I was.

Categories: dev | ops | aws