AWS 노트

AWS Lambda 와 AWS API Gateway 로 Slack에 csv 파일이 upload 되면 알아서 데이터 체크해서 AWS S3에 업로드 하는 Bot 만들기

Jonchann 2020. 6. 28. 20:59

목적

Google Spreadsheets 에서 수동으로 관리하는 데이터를 AWS Redshift의 테이블로 만들어 다른 데이터를 분석하는데 사용할 수 있도록 하자.

 

기각: Google Spreadsheets에서 바로 파일을 업로드 하면 안될까?

로컬 혹은 다른 서비스에서 Spreadsheets 속 데이터를 바로 읽어오려면 인증 ( Google CloudPlatform ) 을 해야 한다. 현재 올라가 있는 데이터는 회사 메일 주소를 갖고 있지 않는 사람 혹은 API에 대해 공유를 거부해 놓은 상태이니 service account 를 사용하려면 Google suits 에서 admin 이 그 권한을 부여해 줄 필요가 있다.

서비스 계정 외에는 본인 인증하는 수 밖에 없기 때문에 ( API key, OAuth Client IDs ) 기각되었다.

 

기각: s3 uploader(임의의 사이트) 에 업로드하면 안될까?

s3 uploader(임의의 이름) 라고 회사에서 개발자가 아닌 사람들이 데이터를 S3 버킷에 업로드 할 수 있도록 만든 사이트가 있다. 이번에도 여기에 파일을 올리면 안되는 것일까?
CloudFront 의 설정 변경 ( 구체적인 내용은 모른다 ) 으로 인해 외부 사람들에게도 접근 권한이 주어질 수 있기 때문에 기각되었다.
그리고 누가 어떤걸 언제 올렸는지 로그가 남지 않는다는 이유도 한 몫 했다.

AWS CloudFront 란

간단히 개발자 친화적 환경에서 짧은 지연 시간과 빠른 전송 속도로 데이터, 동영상, 애플리케이션 및 API를 전 세계 고객에게 안전하게 전송하는 고속 콘텐츠 전송 네트워크(CDN; Contents Delivery Network; 지리적 물리적으로 떨어져 있는 사용자에게 컨텐츠 제공자의 컨텐츠를 더 빠르게 제공할 수 있는 기술)이다.

API, AWS Management Console, AWS CloudFormation, CLI 및 SDK와 같이 이미 익숙한 AWS 도구를 사용하여 몇 분 만에 콘텐츠 전송 네트워크를 시작해 사용할 수 있다.

 

승인: Slack 에 csv 파일을 업로드하면 AWS S3 버킷에 알아서 uload 하는 봇을 만들자

이미 회사 메신저로 활발히 사용하고 있는 Slack 에 파일을 upload 하면 누가 언제 무엇을 upload 했는지에 대한 기록이 남으면서 회사 워크 스페이스에 join 하지 않은 외부 사람은 볼 수 없으니 ( 채널별 권한 설정도 가능 ) CSV 파일만 직접 올려주면 이 방법이 최적안이었다.

개발 내용

CSV 파일로 export 하기 전에 Google Spreadsheets 에서 커스텀 함수를 사용해 데이터 체크하기

필요하다면 CSV 파일로 export 하기 전에 테이블에 들어갈 형식으로 다른 worksheet 에 변형시킨다.
이번에는 unpivot 함수를 만드는 js 스크립트를 차용했다.

 

AWS S3 버킷에 upload 하고 싶은 데이터는 여러 사원이 편집하는 것이기 때문에 데이터 형식의 통일성이 보장되어있지 않다.

  • 공란 표기: ' ' / '-' / 'ー'
  • 한 칸에 여러 내용 표기: ',' / 일본어 전각 컴마1 / 일본어 전각 컴마2

때문에 통일해주지 않으면 CSV로 export 할 수 없도록 하는 장치를 위 unpivot 함수에 추가한다.
js 로는 처음 스크립트를 적어봤기 때문에 동작만 할 수 있는 수준이라 생각한다 ( 이번엔 체크해야 하는 정보가 3번째 열에 격납되어 있었다 ) .

// 레코드 중 하나라도 공란이 있다면 해당 레코드는 삭제
  var emptyList = [];
  for (k=0;k<ret.length;k++) {
    if (!ret[k][2] || ret[k][2] == '' || ret[k][2] == '-' || ret[k][2] == 'ー') emptyList.push(ret[k]);
  }
  var diff = ret.filter(x => !emptyList.includes(x));

  // 한 칸에 여러 내용이 있다면 복수 레코드로 분할
  var split_tags = [], mid_tags = [], diff_k2; 
  for (k=0;k<diff.length;k++) {
    trimed_tag = diff[k][2].trim();
    if (diff[k][2].includes('、') || diff[k][2].includes(',') || diff[k][2].includes(', ')) {
      throw new Error(`${diff[k][0]} 번 레코드: 스페이스 없는 반각 컴마로 나열되어 있는 것만 분할 대상으로 인식합니다.`);
    }
    if (trimed_tag[trimed_tag.length - 1] === ',') {
      throw new Error(`컴마가 마지막 문자열이어서는 안됩니다:"${diff[k][0]}" 번 레코드의 "${trimed_tag}"`);
    }
    diff_k2 = diff[k][2].split(',');
    diff_k2.forEach(function(tag){
      split_tags.push([diff[k][0], diff[k][1], tag])
    })
  }

  // define fixed column's header by titles parameter
  if (titles.length === split_tags[0].length) {
    split_tags[0] = titles;
  } else throw new Error(`column 과 인수로 받은 header 수가 일치하지 않습니다: column ${split_tags[0].length}개、header ${titles.length}개`);

  return split_tags;

Slack 의 Event API

Slack 에 파일을 업로드 하거나 메시지를 보냈을 때 등의 액션을 취했을 때 Slack API 에서 json 형태의 event 를 보낸다.
이번에는 Slack 에 파일을 upload 한 것에 대한 event 를 받아야 하니 filed_shared event 를 받도록 설정해야 하는데 이는 아래 순서대로 할 수 있다.

 

  1. Slack API 에서 새로운 app 을 만든다 ( Bot 선택 ).
  2. Event Subscriptions 에서 endpoint url 을 인증받는다 (verified). ← 다음 섹션에서 설명
  3. Subscribe to bot events 에서 file_shared event 를 추가한다 ( 여기서 files:read 라는 bot scope 이 추가된다 ) .
  4. OAuth & Permissions 에서 files:read scope 를 user scope 에 추가한다 ( 이 권한이 없으면 event json 에서 file.info 를 취득할 수 없다 ) .
  5. app 을 install ( Incoming Webhook 을 사용하고 싶다면 chat:write, char:write.customize, incoming-webhook을 bot scope 에 추가해야 한다. ) 해서 CSV 파일을 upload 할 채널에 add 한다.

결과적으로 아래 화면이 된다.

 

파일이 공유되었을 때 받게 되는 event json 은 아래와 같으며 file.info를 취득하기 위해서는 event.file_id가 필요하다.

{
    "token": "FILE_TOKEN",
    "team_id": "TEAM_ID",
    "api_app_id": "API_APP_ID",
    "event": {
        "type": "file_shared",
        "channel_id": "CHANNEL_ID",
        "file_id": "FILE_ID",
        "user_id": "USER_ID",
        "file": {
            "id": "FILE_ID"
        },
        "event_ts": "EVENT_ID"
    },
    "type": "event_callback",
    "event_id": "EVENT_ID",
    "event_time": 1592536871,
    "authed_users": [
        "AUTHED_USERS"
    ]
}

 

Event Subscriptions 의 request url(endpoint) 를 AWS ApiGateway 에서 발급 받아 verified 받기

AWS API Gateway 에서 발급 받을 수 있는 API 는 HTTP API, WEBSOCKET API, REST API, REST API private 4 종류이다.
이번에는 REST(Representational State Transfer) API를 발급받는다.
REST API 는 HTTP 의 POST, PUT, GET, DELETE 를 통해 자원 (url) 을 처리하는 네트워크 아키텍쳐이다.
이번에 필요한 메소드는 POST 와 GET 이다.

  • POST: verifying request url, chat.postMessage ( slack 에 메시지를 보낼 때 )

  • GET: files.info

AWS API Gateway 에서 url(endpoint) 획득하기

REST API 를 작성할 때 설정해야 하는 점은 아래와 같다.

  • API name
  • Endpoint Type: EDGE ( EDGE Optimized )
  • method: POST, GET
  • Integration type: Lambda ( Lambda_Proxy: false )
  • Lambda function
  • Integration Response: method code: 200
  • Output passthrough: Yes
  • Method Response: HTTP status code: 200
    • Response Body for 200: application/json: Empty

하나라도 설정되어있지 않으면 event 가 제대로 전달되지 않는다.
POSTGET 둘 다 아래 스크린샷과 같은 모습이 되면 된다.

 

위 설정은 CDK 로 아래와 같이 구현할 수 있다.

import * as cdk from "@aws-cdk/core";
import {
    EmptyModel,
    EndpointType,
    Integration,
    LambdaIntegration,
    MockIntegration,
    PassthroughBehavior,
    RestApi
} from "@aws-cdk/aws-apigateway";
import {IFunction} from "@aws-cdk/aws-lambda";

export class EndpointGetter {
    private readonly handler: IFunction;
    private readonly lambdaIntegration: Integration;

    constructor(lambdaHandler: IFunction) {
        this.handler = lambdaHandler;
        this.lambdaIntegration = this.setLambdaIntegration();
    }

    private setLambdaIntegration(): Integration {
        return new LambdaIntegration(this.handler,
            {
                proxy: false,
                integrationResponses: [{
                    statusCode: "200"
                }]
            }
        );
    }

    private static getLambdaEndpoint(stack: cdk.Stack): RestApi {
        return new RestApi(
            stack, "subscribeSlackFileShared",
            {
                restApiName: "subscribeSlackFileShared",
                endpointTypes: [EndpointType.EDGE],
                deploy: true,
                deployOptions: {
                    stageName: "{stageName}",  // 설정하지 않으면 자동으로 이름이 정해진다
                },
            }
        );
    }

    private static getResponseTarget(): {} {
        return {
            methodResponses: [{
                statusCode: "200",
                responseModels: {
                    "application/json": new EmptyModel()
                },
            }]
        };
    }

    public addOptions(stack: cdk.Stack, httpMethods: string[]): void {
        const endpoint = EndpointGetter.getLambdaEndpoint(stack);
        httpMethods.forEach(method =>
            endpoint.root.addMethod(method, this.lambdaIntegration, EndpointGetter.getResponseTarget())
        );
    }
}

 

verify 상태 만들기

Lambda 함수가 challenge 값을 반환해야 verify 상태가 되는데, 이는 가장 처음 인증받을 때 한 번을 제외하고는 하지 않아도 된다.
따라서 어떤 처리를 하는 함수를 구현하든 challenge 만 반환하는 함수를 만들면 된다. 어떤 event json 으로 verify 하는지는 아래에 적어 놓았다.
안그러면 Your URL didn't respond with the value of the challenge parameter.라는 이유로 verify에 실패한다.

출처: https://qiita.com/mido_app/items/fcbbdb2bcce3edf0d3f5

# -*- coding: utf-8 -*-
import os
import json
import logging
import urllib.request

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context) -> str:

    # 受け取ったイベント情報をCloud Watchログに出力
    logging.info(json.dumps(event))

    # Event APIの認証
    if "challenge" in event:
        return event.get("challenge")

 

위 형태를 갖춘 함수를 만들었는데도 불구하고 verify 상태가 되지 않는다면 아래 이유에 해당할 가능성이 높다.

  • challenge 는 반환할 수 있지만 다른 처리가 에러나 에러 json 이 반환되는 경우

  • REST API 가 위에서 첨부한 스크린샷과 다른 모양이 되어있을 경우

  • Lambda 함수 자체는 디버깅이 완료된 상태이지만 challenge 를 반환하기 전에 다른 처리를 해야만 하는 경우: 코멘트 아웃 해 놓으면 된다

  • 테스트 event 가 아래가 아닌 경우

    {
      "token": "abcdeftg",
      "challenge": "abcdefg",
      "type": "url_verification"
    }

     

file_shared event 에서 file 정보를 취득해 CSV 파일 속 데이터 얻기

slacker 에서는 Lambda 함수에 보내지는 event 의 file_id 를 Slacker().files.info(file_id)로 전달해주면 파일 정보를 얻을 수 있다. raw는 데이터 전체 ( HTML, CSS 등 메타데이터 전부 ) 를 문자열로 반환하기 때문에 dict 형으로 반환하는 body를 사용해야 한다.


SlackCSVInfoType은 파일 정보가 들어있는 dict 를 collections.namedtuple을 이용해 object 로 만든 것인데 이에 관해서는 다음 섹션에서 함께 설명할 것이다.

아래 코드에 적힌 error handling 에 대한 이야기도 다음 섹션에서 할 것이다.

def _get_file_info(self) -> SlackCSVInfoType:
    # raw => str, body => dict
    try:
        return get_file_info_as_obj(self.slack.files.info(self.event['event']['file_id']).body)
    except Exception as err:
        self._process_checker.send_err_msg_if_slacker_err(err)

 

download 에 사용되는 url 문자열은 _get_file_info(self).file.['url_private_download']에 들어있다.
url 을 그냥 열어버리면 js 스크립트 밖에 얻질 못하니 slacker 를 초기화 할 때 사용한 토큰을 이용한다.

import io
import tempfile

import pandas as pd


def _get_request_from_url(self):
    # slack 에 upload 된 csv 파일을 다운로드 하는 url 취득
    return requests.get(
        url=self._file_info.file['url_private_download'],
        headers={f'Authorization': f'Bearer {self._get_credentials()["token"]}'}  # Bearer {Token} 이어야 됨
    )

def _chunk_request(self, tmp: io.BufferedRandom):
    for chunk in self._get_request_from_url().iter_content(chunk_size=1024):
        if chunk:
           tmp.write(chunk)

def _req2df(self) -> pd.DataFrame:
    # request 정보를 pandas 의 DataFrame 으로 변환
    with tempfile.TemporaryFile() as tmp:
        self._chunk_request(tmp)
        tmp.seek(0)  # tmp 속을 볼 수 있게 함
        df = pd.read_csv(tmp)
    return df

Python 의 빌트인 모듈인 tempfile을 사용하면 따로 여러 번 재실행되는 Lambda 환경에 고정 패스로 다운로드하는 일 없이 파일을 다룰 수 있다.

 

데이터 체크하기

데이터와 데이터를 copy 할 테이블이 일치하는지 알 수 있도록 CSV 파일 이름은 테이블 이름과 일치되도록 해야 한다.
때문에 테이블에 대한 메타데이터를 json 으로 설정파일을 만들어 별도 디렉토리에 저장해두고 upload 한 CSV 파일 이름과 일치하는 설정파일이 있는지 가장 먼저 체크한다.

Lambda 함수와 Glue job 을 연속으로 실행시키기 때문에 어떤 테이블을 만들지에 대한 설정 파일 등은 lambda_function.py가 있는 곳과 다른 디렉토리 ( 제 3 디렉토리 ) 에서 작성했다. 때문에 S3에 있는 코드 버킷 ( 소스 코드만을 격납해두는 버킷이 있다면 ) 에서 설정 파일 속 내용을 읽어온다.


Lambda 함수는 파일을 S3 에서 다운로드 하게 되면 /tmp/에 해야 하는데 이는 메모리를 차지하게 되고 메모리를 할당하는 만큼 요금이 발생한다.
이번에는 이를 방지하기 위해 직접 데이터를 byte 형태로 읽어오게 구현했다.

 

먼저 설정 파일은 값만 다를 뿐 같은 형태를 갖는 json 데이터이므로 collections.namedtuple을 사용해 TableConfigType(dict) 형을 정의한다.
사정이 있어 Python3.6 을 사용했지만 사실 Python3.7 이상이라면 TypedDict를 사용해 데이터형을 지정해주는 것도 가능하다 하니 참고하면 좋다.

사실 아래 코드에서 각 key 에 대한 value 의 형태를 지정하고 싶다면 setting2obj의 인수를 dict 형태로 통째로 받지 않고 데이터 형 힌트를 적은 args를 받게 하면 된다 ( 난 귀찮았다... ) .

from collections import namedtuple
from typing import Dict

# project: str
# name: str
# schema: str
# mappings: List[str]
# options: {}

TableConfigType = namedtuple('TableConfigType', [
    'project',
    'name',
    'schema',
    'mappings',
    'options',
])


def setting2obj(config_dict: Dict) -> TableConfigType:
    return TableConfigType(*config_dict.values())

 

아래와 같이 파일을 다운로드하지 않고 직접 데이터를 읽어와 TableConfigType형으로 변형한다.

def _open_config_file(self) -> TableConfigType:
    try:
        config_dict = boto3.resource('s3').Object(
            fetch_parameter('buckets/codes'),
            f'src/spreadsheets/ss_uploader/configs/{self.table_name}.json'
        )
        return setting2obj(json.loads(config_dict.get()['Body'].read()))
    except Exception as err:
        self._process_checker.send_err_msg_if_filenotfound(err, self.table_name)

 

파일 이름이 틀렸다면 {self.table_name}.json을 찾지 못할 테니 try, except 로 에러를 발생시키고 slack 에 메시지를 보낸다.

def post_msg(slack: Slacker, prefix: ('ERROR', 'PASS'), error_msg: str) -> None:
    # Lambda 에서 slack 에 메시지를 보낼 때에는 이 함수만 사용
    if prefix == 'ERROR':
        emoji = ':interrobang:'
    else:
        emoji = ':marioluigi:'

    slack.chat.post_message(
        text=f'{prefix}{emoji}: {error_msg}',
        channel='slack-uploader-test',
    )


def send_err_msg_if_filenotfound(self, err: Exception, table_name: str) -> None:
    # 코드 버킷에 {table_name}.json 가 없는 경우 에러 raise
    post_msg(
        self.slack,
        'ERROR',
        'error message'
    )
    raise err

파일이나 데이터에 아래와 같은 문제점을 발견한 경우 위와 같은 형식으로 에러를 내고 slack 에 메시지를 보낸다.
에러가 slack 에 통지되는 상황:

  • slack 에 upload 한 CSV 파일 이름과 table 설정 파일 이름이 일치하지 않는 경우
  • CSV 파일에 적혀 있는 header 와 table 설정 파일 속 header 정보가 일치하지 않는 경우
  • CSV 파일에 있는 column 수와 table 설정 파일 속 column 수가 일치하지 않는 경우
  • CSV 파일에 있는 데이터 형태와 table 설정 파일 속 데이터 형태가 일치하지 않는 경우 ( index 가 될 데이터는 가장 앞열이 되어야 함 )
  • 코드 버킷에 table 설정 파일이 upload 되어있지 않은 경우
  • Slacker 를 인증하는 정보가 틀렸을 경우
  • 봇이 모든 작업을 처리하기 전에 upload 한 파일을 slack 에서 삭제하는 경우
  • 설정파일이 TableConfigType 형태에 알맞지 않을 경우
  • S3 에 upload 하는 작업이 실패했을 경우
  • Lambda 함수가 실행된 뒤 연속해서 실행되어야 하는 ( boto3를 이용해 Lambda 함수에서 Glue job 을 실행시킴 ) job 이 없을 경우 ( job 이름이 명명규칙을 따르지 않았을 경우 )
  • upload 되어야 할 path 로 upload 되지 못했을 경우

에러를 raise 하고 slack 에 메시지를 보내는 프로세스는 uploader_checker.py에만 구현해 놓고 import 해서 사용한다.

 

S3 에 업로드하기

pandas.DataFrame에서 CSV 파일로 변환할 때에도 고정 파일을 작성하지 않도록 tempfile을 이용한다.

def upload(self) -> None:
    # ProcessChecker 로 upload 프로세스를 감시
    with tempfile.NamedTemporaryFile() as tmp:
        self._data_frame.to_csv(tmp.name, encoding='utf-8')
        try:
            self._bucket.upload_file(tmp.name, self._path)
        except botocore.exceptions.ClientError as client_err:
            self._process_checker.trace_uploading(client_err.response["Error"]["Code"], self.table_name)

 

Lambda 함수가 한 번 실행되었을 때 retry 하는 것 막기

Lambda 함수는 실패할 경우 대기열에 60분간 쟁여두고 2번 retry 하는 것이 기본 설정 값이다.
이대로 Lambda 함수를 실행시키면 아래와 같이 Slack 에 메시지 파티가 일어나므로 미연에 방지해야 한다.

에러 내용을 보면 Lambda 함수가 통째로 반복 실행되고 있다는 것을 알 수 있다.

 

먼저 비동기 호출 retry 의 default 설정을 최소로 변경한다.

그래도 5 번 메시지가 오던 것이 4 번으로 밖에 줄어들질 않았다.

아마도 Lambda 함수가 1회 실행될 때 4번을 돌고 있은 것이라 생각된다.

const LambdaUploader = new lambda.Function(
        stack,
        UploaderLambdaName,
        {
            code: lambda.Code.fromAsset(
                path.join(
                    __dirname,
                    "../../../src/spreadsheets/{uploader}/{csv_directory}/"
                )
            ),
            functionName: LambdaUploaderName,
            handler: "lambda_function.lambda_handler",
            timeout: cdk.Duration.seconds(180),  // 이번 작업은 10초 안에 끝나므로
            runtime: lambda.Runtime.PYTHON_3_6,
            role: LambdaUploaderRole,
            layers: [pandasLayer, slackerLayer],
            maxEventAge: cdk.Duration.seconds(60),  // default 는 6시간으로 대기열에 보존되는 시간을 최소로 설정
            retryAttempts: 0,  // default 는 2로 error 가 발생해도 retry 할 필요가 없으므로 ( 외부 요인으로 발생할 것이기 때문 ) 0으로 설정
        }
    );

 

구글링 결과 비동기적으로 Lambda 함수를 실행하면 API Gateway 로 한 번 AWS 측에서 한 번 대기열에 추가해 놓았다가 다시한 번 실행한다고 한다. 동기적으로 실행하면 되는 것일까 생각해 찾아보았다.

현재는 decrete 될 예정인 InvokeFunction()을 통해 동기적으로 호출했다고 하는데 아마 이것의 새로운 버전이 LambdaInvoke()인 것 같다.
LambdaInvoke()는 최신 버전에서만 사용할 수 있다.

// 1.39.0 だとしようできないので、moduleアップグレード
    const submitJob = new tasks.LambdaInvoke(
        stack,
        '{LambdaUnloader}',
        {
            lambdaFunction: {LambdaUnloader},
            payload: sfn.TaskInput.fromDataAt('$.Payload')
        }
    )

cdk deploy 해서 실험해 본 결과 메시지 파티 ( 4번 ) 가 되는 것은 똑같았다.

 

그러므로 아래 방법을 사용한다.
아래 방법은 Lambda 함수가 1회 실행할 때 lambda_handler가 2회 이상 실행되는 것을 막는다.

import json
import random

idempotent = None


def lambda_handler(event, context):
    global idempotent
    first_invoke_seed = random.seed
    local_idempotent = None
    if idempotent is None:
        idempotent = first_invoke_seed
        local_idempotent = first_invoke_seed

    if idempotent == local_idempotent:
        pass
    else:
        raise KeyboardInterrupt('lambda tried to retry.')

사실 어떤 글에는 tempfile.NamedTemporaryFile()을 사용할 때 temp.name으로 처리하면 된다고 했는데 첫 실행부터 KeyboardInterrupt가 raise 되었다.


사실 이렇게 hash 값으로 조절하는 것은 bad know-how 라고 하니 다른 방안을 찾을 때 까지만 유지하는 것이 좋겠다.