목적
Google Spreadsheets 에서 수동으로 관리하는 데이터를 AWS Redshift의 테이블로 만들어 다른 데이터를 분석하는데 사용할 수 있도록 하자.
기각: Google Spreadsheets에서 바로 파일을 업로드 하면 안될까?
로컬 혹은 다른 서비스에서 Spreadsheets 속 데이터를 바로 읽어오려면 인증 ( Google CloudPlatform ) 을 해야 한다. 현재 올라가 있는 데이터는 회사 메일 주소를 갖고 있지 않는 사람 혹은 API에 대해 공유를 거부해 놓은 상태이니 service account 를 사용하려면 Google suits 에서 admin 이 그 권한을 부여해 줄 필요가 있다.
서비스 계정 외에는 본인 인증하는 수 밖에 없기 때문에 ( API key, OAuth Client IDs ) 기각되었다.
기각: s3 uploader(임의의 사이트) 에 업로드하면 안될까?
s3 uploader(임의의 이름) 라고 회사에서 개발자가 아닌 사람들이 데이터를 S3 버킷에 업로드 할 수 있도록 만든 사이트가 있다. 이번에도 여기에 파일을 올리면 안되는 것일까?
CloudFront 의 설정 변경 ( 구체적인 내용은 모른다 ) 으로 인해 외부 사람들에게도 접근 권한이 주어질 수 있기 때문에 기각되었다.
그리고 누가 어떤걸 언제 올렸는지 로그가 남지 않는다는 이유도 한 몫 했다.
AWS CloudFront 란
간단히 개발자 친화적 환경에서 짧은 지연 시간과 빠른 전송 속도로 데이터, 동영상, 애플리케이션 및 API를 전 세계 고객에게 안전하게 전송하는 고속 콘텐츠 전송 네트워크(CDN; Contents Delivery Network; 지리적 물리적으로 떨어져 있는 사용자에게 컨텐츠 제공자의 컨텐츠를 더 빠르게 제공할 수 있는 기술)이다.
API, AWS Management Console, AWS CloudFormation, CLI 및 SDK와 같이 이미 익숙한 AWS 도구를 사용하여 몇 분 만에 콘텐츠 전송 네트워크를 시작해 사용할 수 있다.
승인: Slack 에 csv 파일을 업로드하면 AWS S3 버킷에 알아서 uload 하는 봇을 만들자
이미 회사 메신저로 활발히 사용하고 있는 Slack 에 파일을 upload 하면 누가 언제 무엇을 upload 했는지에 대한 기록이 남으면서 회사 워크 스페이스에 join 하지 않은 외부 사람은 볼 수 없으니 ( 채널별 권한 설정도 가능 ) CSV 파일만 직접 올려주면 이 방법이 최적안이었다.
개발 내용
CSV 파일로 export 하기 전에 Google Spreadsheets 에서 커스텀 함수를 사용해 데이터 체크하기
필요하다면 CSV 파일로 export 하기 전에 테이블에 들어갈 형식으로 다른 worksheet 에 변형시킨다.
이번에는 unpivot 함수를 만드는 js 스크립트를 차용했다.
AWS S3 버킷에 upload 하고 싶은 데이터는 여러 사원이 편집하는 것이기 때문에 데이터 형식의 통일성이 보장되어있지 않다.
- 공란 표기: ' ' / '-' / 'ー'
- 한 칸에 여러 내용 표기: ',' / 일본어 전각 컴마1 / 일본어 전각 컴마2
때문에 통일해주지 않으면 CSV로 export 할 수 없도록 하는 장치를 위 unpivot 함수에 추가한다.
js 로는 처음 스크립트를 적어봤기 때문에 동작만 할 수 있는 수준이라 생각한다 ( 이번엔 체크해야 하는 정보가 3번째 열에 격납되어 있었다 ) .
// 레코드 중 하나라도 공란이 있다면 해당 레코드는 삭제
var emptyList = [];
for (k=0;k<ret.length;k++) {
if (!ret[k][2] || ret[k][2] == '' || ret[k][2] == '-' || ret[k][2] == 'ー') emptyList.push(ret[k]);
}
var diff = ret.filter(x => !emptyList.includes(x));
// 한 칸에 여러 내용이 있다면 복수 레코드로 분할
var split_tags = [], mid_tags = [], diff_k2;
for (k=0;k<diff.length;k++) {
trimed_tag = diff[k][2].trim();
if (diff[k][2].includes('、') || diff[k][2].includes(',') || diff[k][2].includes(', ')) {
throw new Error(`${diff[k][0]} 번 레코드: 스페이스 없는 반각 컴마로 나열되어 있는 것만 분할 대상으로 인식합니다.`);
}
if (trimed_tag[trimed_tag.length - 1] === ',') {
throw new Error(`컴마가 마지막 문자열이어서는 안됩니다:"${diff[k][0]}" 번 레코드의 "${trimed_tag}"`);
}
diff_k2 = diff[k][2].split(',');
diff_k2.forEach(function(tag){
split_tags.push([diff[k][0], diff[k][1], tag])
})
}
// define fixed column's header by titles parameter
if (titles.length === split_tags[0].length) {
split_tags[0] = titles;
} else throw new Error(`column 과 인수로 받은 header 수가 일치하지 않습니다: column ${split_tags[0].length}개、header ${titles.length}개`);
return split_tags;
Slack 의 Event API
Slack 에 파일을 업로드 하거나 메시지를 보냈을 때 등의 액션을 취했을 때 Slack API 에서 json 형태의 event 를 보낸다.
이번에는 Slack 에 파일을 upload 한 것에 대한 event 를 받아야 하니 filed_shared event 를 받도록 설정해야 하는데 이는 아래 순서대로 할 수 있다.
- Slack API 에서 새로운 app 을 만든다 ( Bot 선택 ).
Event Subscriptions
에서 endpoint url 을 인증받는다 (verified). ← 다음 섹션에서 설명Subscribe to bot events
에서file_shared
event 를 추가한다 ( 여기서files:read
라는 bot scope 이 추가된다 ) .OAuth & Permissions
에서files:read
scope 를 user scope 에 추가한다 ( 이 권한이 없으면 event json 에서 file.info 를 취득할 수 없다 ) .- app 을 install ( Incoming Webhook 을 사용하고 싶다면
chat:write
,char:write.customize
,incoming-webhook
을 bot scope 에 추가해야 한다. ) 해서 CSV 파일을 upload 할 채널에 add 한다.
결과적으로 아래 화면이 된다.
파일이 공유되었을 때 받게 되는 event json 은 아래와 같으며 file.info
를 취득하기 위해서는 event.file_id
가 필요하다.
{
"token": "FILE_TOKEN",
"team_id": "TEAM_ID",
"api_app_id": "API_APP_ID",
"event": {
"type": "file_shared",
"channel_id": "CHANNEL_ID",
"file_id": "FILE_ID",
"user_id": "USER_ID",
"file": {
"id": "FILE_ID"
},
"event_ts": "EVENT_ID"
},
"type": "event_callback",
"event_id": "EVENT_ID",
"event_time": 1592536871,
"authed_users": [
"AUTHED_USERS"
]
}
Event Subscriptions 의 request url(endpoint) 를 AWS ApiGateway 에서 발급 받아 verified 받기
AWS API Gateway 에서 발급 받을 수 있는 API 는 HTTP API, WEBSOCKET API, REST API, REST API private 4 종류이다.
이번에는 REST(Representational State Transfer) API를 발급받는다.
REST API 는 HTTP 의 POST, PUT, GET, DELETE 를 통해 자원 (url) 을 처리하는 네트워크 아키텍쳐이다.
이번에 필요한 메소드는 POST 와 GET 이다.
-
POST: verifying request url, chat.postMessage ( slack 에 메시지를 보낼 때 )
-
GET: files.info
AWS API Gateway 에서 url(endpoint) 획득하기
REST API 를 작성할 때 설정해야 하는 점은 아래와 같다.
- API name
- Endpoint Type: EDGE ( EDGE Optimized )
- method: POST, GET
- Integration type: Lambda ( Lambda_Proxy: false )
- Lambda function
- Integration Response: method code: 200
- Output passthrough: Yes
- Method Response: HTTP status code: 200
- Response Body for 200: application/json: Empty
하나라도 설정되어있지 않으면 event 가 제대로 전달되지 않는다.POST
와 GET
둘 다 아래 스크린샷과 같은 모습이 되면 된다.
위 설정은 CDK 로 아래와 같이 구현할 수 있다.
import * as cdk from "@aws-cdk/core";
import {
EmptyModel,
EndpointType,
Integration,
LambdaIntegration,
MockIntegration,
PassthroughBehavior,
RestApi
} from "@aws-cdk/aws-apigateway";
import {IFunction} from "@aws-cdk/aws-lambda";
export class EndpointGetter {
private readonly handler: IFunction;
private readonly lambdaIntegration: Integration;
constructor(lambdaHandler: IFunction) {
this.handler = lambdaHandler;
this.lambdaIntegration = this.setLambdaIntegration();
}
private setLambdaIntegration(): Integration {
return new LambdaIntegration(this.handler,
{
proxy: false,
integrationResponses: [{
statusCode: "200"
}]
}
);
}
private static getLambdaEndpoint(stack: cdk.Stack): RestApi {
return new RestApi(
stack, "subscribeSlackFileShared",
{
restApiName: "subscribeSlackFileShared",
endpointTypes: [EndpointType.EDGE],
deploy: true,
deployOptions: {
stageName: "{stageName}", // 설정하지 않으면 자동으로 이름이 정해진다
},
}
);
}
private static getResponseTarget(): {} {
return {
methodResponses: [{
statusCode: "200",
responseModels: {
"application/json": new EmptyModel()
},
}]
};
}
public addOptions(stack: cdk.Stack, httpMethods: string[]): void {
const endpoint = EndpointGetter.getLambdaEndpoint(stack);
httpMethods.forEach(method =>
endpoint.root.addMethod(method, this.lambdaIntegration, EndpointGetter.getResponseTarget())
);
}
}
verify 상태 만들기
Lambda 함수가 challenge 값을 반환해야 verify 상태가 되는데, 이는 가장 처음 인증받을 때 한 번을 제외하고는 하지 않아도 된다.
따라서 어떤 처리를 하는 함수를 구현하든 challenge
만 반환하는 함수를 만들면 된다. 어떤 event json 으로 verify 하는지는 아래에 적어 놓았다.
안그러면 Your URL didn't respond with the value of the challenge parameter.
라는 이유로 verify에 실패한다.
출처: https://qiita.com/mido_app/items/fcbbdb2bcce3edf0d3f5
# -*- coding: utf-8 -*-
import os
import json
import logging
import urllib.request
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context) -> str:
# 受け取ったイベント情報をCloud Watchログに出力
logging.info(json.dumps(event))
# Event APIの認証
if "challenge" in event:
return event.get("challenge")
위 형태를 갖춘 함수를 만들었는데도 불구하고 verify 상태가 되지 않는다면 아래 이유에 해당할 가능성이 높다.
-
challenge 는 반환할 수 있지만 다른 처리가 에러나 에러 json 이 반환되는 경우
-
REST API 가 위에서 첨부한 스크린샷과 다른 모양이 되어있을 경우
-
Lambda 함수 자체는 디버깅이 완료된 상태이지만 challenge 를 반환하기 전에 다른 처리를 해야만 하는 경우: 코멘트 아웃 해 놓으면 된다
-
테스트 event 가 아래가 아닌 경우
{ "token": "abcdeftg", "challenge": "abcdefg", "type": "url_verification" }
file_shared event 에서 file 정보를 취득해 CSV 파일 속 데이터 얻기
slacker 에서는 Lambda 함수에 보내지는 event 의 file_id 를 Slacker().files.info(file_id)
로 전달해주면 파일 정보를 얻을 수 있다. raw
는 데이터 전체 ( HTML, CSS 등 메타데이터 전부 ) 를 문자열로 반환하기 때문에 dict 형으로 반환하는 body
를 사용해야 한다.
SlackCSVInfoType
은 파일 정보가 들어있는 dict 를 collections.namedtuple
을 이용해 object 로 만든 것인데 이에 관해서는 다음 섹션에서 함께 설명할 것이다.
아래 코드에 적힌 error handling 에 대한 이야기도 다음 섹션에서 할 것이다.
def _get_file_info(self) -> SlackCSVInfoType:
# raw => str, body => dict
try:
return get_file_info_as_obj(self.slack.files.info(self.event['event']['file_id']).body)
except Exception as err:
self._process_checker.send_err_msg_if_slacker_err(err)
download 에 사용되는 url 문자열은 _get_file_info(self).file.['url_private_download']
에 들어있다.
url 을 그냥 열어버리면 js 스크립트 밖에 얻질 못하니 slacker 를 초기화 할 때 사용한 토큰을 이용한다.
import io
import tempfile
import pandas as pd
def _get_request_from_url(self):
# slack 에 upload 된 csv 파일을 다운로드 하는 url 취득
return requests.get(
url=self._file_info.file['url_private_download'],
headers={f'Authorization': f'Bearer {self._get_credentials()["token"]}'} # Bearer {Token} 이어야 됨
)
def _chunk_request(self, tmp: io.BufferedRandom):
for chunk in self._get_request_from_url().iter_content(chunk_size=1024):
if chunk:
tmp.write(chunk)
def _req2df(self) -> pd.DataFrame:
# request 정보를 pandas 의 DataFrame 으로 변환
with tempfile.TemporaryFile() as tmp:
self._chunk_request(tmp)
tmp.seek(0) # tmp 속을 볼 수 있게 함
df = pd.read_csv(tmp)
return df
Python 의 빌트인 모듈인 tempfile
을 사용하면 따로 여러 번 재실행되는 Lambda 환경에 고정 패스로 다운로드하는 일 없이 파일을 다룰 수 있다.
데이터 체크하기
데이터와 데이터를 copy 할 테이블이 일치하는지 알 수 있도록 CSV 파일 이름은 테이블 이름과 일치되도록 해야 한다.
때문에 테이블에 대한 메타데이터를 json 으로 설정파일을 만들어 별도 디렉토리에 저장해두고 upload 한 CSV 파일 이름과 일치하는 설정파일이 있는지 가장 먼저 체크한다.
Lambda 함수와 Glue job 을 연속으로 실행시키기 때문에 어떤 테이블을 만들지에 대한 설정 파일 등은 lambda_function.py
가 있는 곳과 다른 디렉토리 ( 제 3 디렉토리 ) 에서 작성했다. 때문에 S3에 있는 코드 버킷 ( 소스 코드만을 격납해두는 버킷이 있다면 ) 에서 설정 파일 속 내용을 읽어온다.
Lambda 함수는 파일을 S3 에서 다운로드 하게 되면 /tmp/
에 해야 하는데 이는 메모리를 차지하게 되고 메모리를 할당하는 만큼 요금이 발생한다.
이번에는 이를 방지하기 위해 직접 데이터를 byte 형태로 읽어오게 구현했다.
먼저 설정 파일은 값만 다를 뿐 같은 형태를 갖는 json 데이터이므로 collections.namedtuple
을 사용해 TableConfigType
(dict) 형을 정의한다.
사정이 있어 Python3.6 을 사용했지만 사실 Python3.7 이상이라면 TypedDict
를 사용해 데이터형을 지정해주는 것도 가능하다 하니 참고하면 좋다.
사실 아래 코드에서 각 key 에 대한 value 의 형태를 지정하고 싶다면 setting2obj
의 인수를 dict 형태로 통째로 받지 않고 데이터 형 힌트를 적은 args를 받게 하면 된다 ( 난 귀찮았다... ) .
from collections import namedtuple
from typing import Dict
# project: str
# name: str
# schema: str
# mappings: List[str]
# options: {}
TableConfigType = namedtuple('TableConfigType', [
'project',
'name',
'schema',
'mappings',
'options',
])
def setting2obj(config_dict: Dict) -> TableConfigType:
return TableConfigType(*config_dict.values())
아래와 같이 파일을 다운로드하지 않고 직접 데이터를 읽어와 TableConfigType
형으로 변형한다.
def _open_config_file(self) -> TableConfigType:
try:
config_dict = boto3.resource('s3').Object(
fetch_parameter('buckets/codes'),
f'src/spreadsheets/ss_uploader/configs/{self.table_name}.json'
)
return setting2obj(json.loads(config_dict.get()['Body'].read()))
except Exception as err:
self._process_checker.send_err_msg_if_filenotfound(err, self.table_name)
파일 이름이 틀렸다면 {self.table_name}.json
을 찾지 못할 테니 try, except 로 에러를 발생시키고 slack 에 메시지를 보낸다.
def post_msg(slack: Slacker, prefix: ('ERROR', 'PASS'), error_msg: str) -> None:
# Lambda 에서 slack 에 메시지를 보낼 때에는 이 함수만 사용
if prefix == 'ERROR':
emoji = ':interrobang:'
else:
emoji = ':marioluigi:'
slack.chat.post_message(
text=f'{prefix}{emoji}: {error_msg}',
channel='slack-uploader-test',
)
def send_err_msg_if_filenotfound(self, err: Exception, table_name: str) -> None:
# 코드 버킷에 {table_name}.json 가 없는 경우 에러 raise
post_msg(
self.slack,
'ERROR',
'error message'
)
raise err
파일이나 데이터에 아래와 같은 문제점을 발견한 경우 위와 같은 형식으로 에러를 내고 slack 에 메시지를 보낸다.
에러가 slack 에 통지되는 상황:
- slack 에 upload 한 CSV 파일 이름과 table 설정 파일 이름이 일치하지 않는 경우
- CSV 파일에 적혀 있는 header 와 table 설정 파일 속 header 정보가 일치하지 않는 경우
- CSV 파일에 있는 column 수와 table 설정 파일 속 column 수가 일치하지 않는 경우
- CSV 파일에 있는 데이터 형태와 table 설정 파일 속 데이터 형태가 일치하지 않는 경우 ( index 가 될 데이터는 가장 앞열이 되어야 함 )
- 코드 버킷에 table 설정 파일이 upload 되어있지 않은 경우
- Slacker 를 인증하는 정보가 틀렸을 경우
- 봇이 모든 작업을 처리하기 전에 upload 한 파일을 slack 에서 삭제하는 경우
- 설정파일이 TableConfigType 형태에 알맞지 않을 경우
- S3 에 upload 하는 작업이 실패했을 경우
- Lambda 함수가 실행된 뒤 연속해서 실행되어야 하는 ( boto3를 이용해 Lambda 함수에서 Glue job 을 실행시킴 ) job 이 없을 경우 ( job 이름이 명명규칙을 따르지 않았을 경우 )
- upload 되어야 할 path 로 upload 되지 못했을 경우
에러를 raise 하고 slack 에 메시지를 보내는 프로세스는 uploader_checker.py
에만 구현해 놓고 import 해서 사용한다.
S3 에 업로드하기
pandas.DataFrame
에서 CSV 파일로 변환할 때에도 고정 파일을 작성하지 않도록 tempfile
을 이용한다.
def upload(self) -> None:
# ProcessChecker 로 upload 프로세스를 감시
with tempfile.NamedTemporaryFile() as tmp:
self._data_frame.to_csv(tmp.name, encoding='utf-8')
try:
self._bucket.upload_file(tmp.name, self._path)
except botocore.exceptions.ClientError as client_err:
self._process_checker.trace_uploading(client_err.response["Error"]["Code"], self.table_name)
Lambda 함수가 한 번 실행되었을 때 retry 하는 것 막기
Lambda 함수는 실패할 경우 대기열에 60분간 쟁여두고 2번 retry 하는 것이 기본 설정 값이다.
이대로 Lambda 함수를 실행시키면 아래와 같이 Slack 에 메시지 파티가 일어나므로 미연에 방지해야 한다.
에러 내용을 보면 Lambda 함수가 통째로 반복 실행되고 있다는 것을 알 수 있다.
먼저 비동기 호출 retry 의 default 설정을 최소로 변경한다.
그래도 5 번 메시지가 오던 것이 4 번으로 밖에 줄어들질 않았다.
아마도 Lambda 함수가 1회 실행될 때 4번을 돌고 있은 것이라 생각된다.
const LambdaUploader = new lambda.Function(
stack,
UploaderLambdaName,
{
code: lambda.Code.fromAsset(
path.join(
__dirname,
"../../../src/spreadsheets/{uploader}/{csv_directory}/"
)
),
functionName: LambdaUploaderName,
handler: "lambda_function.lambda_handler",
timeout: cdk.Duration.seconds(180), // 이번 작업은 10초 안에 끝나므로
runtime: lambda.Runtime.PYTHON_3_6,
role: LambdaUploaderRole,
layers: [pandasLayer, slackerLayer],
maxEventAge: cdk.Duration.seconds(60), // default 는 6시간으로 대기열에 보존되는 시간을 최소로 설정
retryAttempts: 0, // default 는 2로 error 가 발생해도 retry 할 필요가 없으므로 ( 외부 요인으로 발생할 것이기 때문 ) 0으로 설정
}
);
구글링 결과 비동기적으로 Lambda 함수를 실행하면 API Gateway 로 한 번 AWS 측에서 한 번 대기열에 추가해 놓았다가 다시한 번 실행한다고 한다. 동기적으로 실행하면 되는 것일까 생각해 찾아보았다.
현재는 decrete 될 예정인 InvokeFunction()
을 통해 동기적으로 호출했다고 하는데 아마 이것의 새로운 버전이 LambdaInvoke()
인 것 같다.LambdaInvoke()
는 최신 버전에서만 사용할 수 있다.
// 1.39.0 だとしようできないので、moduleアップグレード
const submitJob = new tasks.LambdaInvoke(
stack,
'{LambdaUnloader}',
{
lambdaFunction: {LambdaUnloader},
payload: sfn.TaskInput.fromDataAt('$.Payload')
}
)
cdk deploy 해서 실험해 본 결과 메시지 파티 ( 4번 ) 가 되는 것은 똑같았다.
그러므로 아래 방법을 사용한다.
아래 방법은 Lambda 함수가 1회 실행할 때 lambda_handler
가 2회 이상 실행되는 것을 막는다.
import json
import random
idempotent = None
def lambda_handler(event, context):
global idempotent
first_invoke_seed = random.seed
local_idempotent = None
if idempotent is None:
idempotent = first_invoke_seed
local_idempotent = first_invoke_seed
if idempotent == local_idempotent:
pass
else:
raise KeyboardInterrupt('lambda tried to retry.')
사실 어떤 글에는 tempfile.NamedTemporaryFile()
을 사용할 때 temp.name
으로 처리하면 된다고 했는데 첫 실행부터 KeyboardInterrupt
가 raise 되었다.
사실 이렇게 hash 값으로 조절하는 것은 bad know-how 라고 하니 다른 방안을 찾을 때 까지만 유지하는 것이 좋겠다.
'AWS 노트' 카테고리의 다른 글
Docker이미지를 push할 때 Error response from daemon: No such image: {image}:{tag} 에러가 나면서 실패할 때 (0) | 2020.07.13 |
---|---|
An image does not exist locally with the tag: {account_id}.dkr.ecr.{region}.amazonaws.com/{image} 에러가 날 때 (0) | 2020.07.13 |
AWS 서비스 상에서 Python 라이브러리 사용하기 (0) | 2020.06.11 |
Parameter Store의 Parameter를 참조한 CDK stack을 deploy할 때 주의할 점 (0) | 2020.05.25 |
Redshit로 COPY가 되지 않을 때 대처법 (0) | 2020.05.20 |