chalog

i love 玄米パン あんなし

CloudWatch LogsからElasticsearch ServiceにLogs Streamする場合にIndex名を変えたい

概要

  • CloudWatch Logsは、Subscription設定時に作成されたLambda Functionを介してElasticsearch Serviceにデータを格納する。
  • Function名は LogsToElasticsearch_{Elasticsearch domain_name} という形式で作成される。
  • Function内では、CloudWatch LogsからLambda FunctionをInvokeする際にPayloadからLogGroupを取得し、IndexのType指定に使用している。
  • transform関数内でElasticsearchドメインへのリクエスト情報を生成しており、その際にIndex名は cwl-YYYY.mm.dd 固定で生成している。
  • このIndex名固定をどうにかしたい。

対応

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),
            ('0' + timestamp.getUTCDate()).slice(-2)
        ].join('.');

        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

        bulkRequestBody += [ 
            JSON.stringify(action), 
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}

cwl 部分をpayloadから取得したLogGroup名で置換し、CloudWatch LogGroup名をIndex名に利用する。

以下の例では /aws/lambda/staging/xxxxxx に変えている。

        var logGroup = payload.logGroup;
        logGroup = logGroup.replace(/\/aws\/lambda\//g, '').replace(/staging/g, '');
        var indexName = [
            logGroup + timestamp.getUTCFullYear(),
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2)
        ].join('.');

注意

Index名には使用できない文字が存在する為、その場合は別途置き換える必要がある。