3 minutes
Stream Cloudwatch logs to Elastic search
AWS cloudwatch is a neat feature that is well integrated with the rest of AWS services. However it is not the most userfriendly interface to read and process logs. Many AWS customers use Elasticsearch to store logs from various Systems. AWS Lambda logs are automatically sent to cloudwatch under respective log group of each Lambda function. While there is a well documented and packaged solution to stream logs from cloudwatch to AWS’s managed elastic search service, There is not enough documentation on how to stream logs to a customer managed elastic search system.In this post I am going to elaborate on how this can be achieved along with some working code that is deploayable.
We are going to create a lambda pythin function which is going to hook on to some chosen log groups. These log groups contain logs from lambda functions that we are interested in. Our python function would be triggered on each event of a log writen to these log groups. AWS lambda logs coming to log groups have a predefined structure and format. We are going to extract the key log information by filtering all the metadata we are not interested in. Later we are going to call Elastic search’s API to inser log data into our chosen index. This process can be summarised into these steps
- Read payload from each event of Cloud watch’s log groups
- Filter transform data into required format
- write to Elastic search’s index.
The reason I chose Python is simple . Using Python i can build things faster than any other language. There are plenty of librarries to do almost anything software related. It also has great community support while code is easy to read and understand.I feel it is a more forgiving language compared to other stricter languages like Go. We are going to use boto3 package to talk to AWS.
Reading from cloudwatch events
import boto3
import gzip
import json
import base64
def process_events(event, context):
cw_data = event['awslogs']['data']
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
payload = json.loads(uncompressed_payload)
log_events = payload['logEvents']
Writing to Elastic search
import os
from elasticsearch import Elasticsearch
import certifi
es = Elasticsearch(
[os.environ['ELASTIC_HOST']],
http_auth=(os.environ['ELASTIC_USER'],[os.environ['ELASTIC_PASSWORD']]),
use_ssl=True,
ca_certs=certifi.where(),
port=9200,
)
for log_event in log_events:
msg = log_event['message']
#print (log_event)
#print(msg)
is_log = msg.startswith('{ \"logId\": ')
print(is_log)
if is_log:
print (msg)
res = es.index(index=[os.environ['INDEX_NAME']], doc_type='lambda-log', body=msg)
What is happening in the code block above is we are creating an elastic search client calling it ‘es’. We are using message from each log event from the array of log events. Check if it is an event with log or just some metadata . Ig it is a log event then write it to elastic search into an index.
Latest full code of this functionality can be found here
458 Words
2020-04-26 00:00 +0000