IT/AWS Lambda & Resources Trouble Shooting

AWS 엘라스틱서치 스냅샷/특정인덱스 를 복원하는 람다

클라우드 개발자 앙몬드 2020. 10. 26. 16:16
반응형

앙몬드입니댜

이번에는 엘라스틱서치와 관련한 사항입니댜

 

현재 저희 환경의 경우, cost optimize를 위한 방안을 여러 각도로 시도해보고 있는데,

제 관할에 있는 AWS 엘라스틱서치 노드 클러스터도 비용 절약 측면에 포함되어 있습니댜

 

현재 상황은 다음과 같습니다

 (1) 실시간으로 엘라스틱서치에서 검색 가능한 다큐먼트는 2주치만 유지하고, 이외에는 삭제한다.

 (2) 단, 이에 대한 스냅샷은 유지하되, 스냅샷은 4주만 보관하고 그 이전의 날짜는 삭제한다.

 

다음과 같은 라이프사이클링을 실행하기 위한 람다가 배치성으로 실행되고 있죠

 

단, 개발팀 측 문의로는 과거 이슈에 대하여 디버깅할 필요가 있어, 이에 대한 로그를 복구해줄 것을 요구했는데요.

그에 따라 스냅샷으로 들어간 엘라스틱서치 데이터들을 복원하는 람다를 한번 개발해보았습니다.

 

물론 가장 쉬운 것은, 모든 스냅샷을 통틀어 복구하는 것이 가장 단순하지만, 어쨌든 회사 정책에 따라 비용 최적화란 흐름을 무시할 순 없다보니, 특정 스냅샷의 특정 인덱스만 복구해보는 람다를 개발하는 방향으로 가게 되었습니댜 :)

 

AWS 엘라스틱서치 복구의 경우, AWS 화이트페이퍼는 curl / http 로 호출하는 request 모듈을 사용하는 방법으로 가이드가 되어 있으니, 이 방법이 편하시다면 가이드를 따라가시는 것도 한 방법입니다 :)

docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/es-managedomains-snapshots.html

 

Amazon Elasticsearch Service 인덱스 스냅샷 작업 - Amazon Elasticsearch Service

기본 샤드 중 일부만 관련 인덱스에 사용할 수 있는 경우, 스냅샷의 state가 PARTIAL이 될 수 있습니다. 이 값은 최소한 샤드 하나의 데이터가 제대로 저장되지 않았음을 의미합니다. 부분 스냅샷에

docs.aws.amazon.com

 

자 이제 다시 돌아와서.. 앙몬드 쪽의 방향으로 돌려보면 람다 코드가 있는데요...

단, 이 람다를 돌리기 위한 조건!!!

 (1) requests_aws4auth 라이브러리와 elasticsearch 라이브러리가 람다 레이어에 포함되어 있어야 합니댜!!!!

 (2) 위에 꺼 딱 하나 입니댜 (아 그리고 IAM과 네트워크 환경설정은 당연히 되어있겠죠? 훗) 

 

환경변수로 입력값을 제어하면 됩니댜 :)

import json, boto3, os
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

"""
AWS 엘라스틱서치 스냅샷 복구 람다

@since 2020-10-26
@author 클라우드 개발자 앙몬드

[2020-10-26] 최초생성 (클라우드 개발자 앙몬드)
"""

#### 환경변수 선언부
실행옵션 = os.environ['OPTION_FOR_EXECUTION']
스냅샷명 = os.environ["QUERY_SNAPSHOT_NAME"]
복구할_인덱스명 = os.environ["INDEX_TO_RESTORE"]
엘라스틱서치_엔드포인트 = os.environ["ES_ENDPOINT"]
레파지토리 = os.environ["ES_SNAPSHOT_REPOSITORY"]

호스트 = 엘라스틱서치_엔드포인트.replace("https://", "")
토큰 = boto3.Session().get_credentials()
aws_인증문서 = AWS4Auth(토큰.access_key, 토큰.secret_key, "ap-northeast-2", "es", session_token=토큰.token)
es_커넥터 = Elasticsearch(hosts = [{'host': 호스트, 'port': 443}],http_auth = aws_인증문서,use_ssl = True,verify_certs = True,connection_class = RequestsHttpConnection,timeout = 300)

def lambda_handler(event, context):
    
    """ 
        (0) 환경변수 설정 및 확인 
            [실행옵션]
                query_snapshot : 스냅샷 전체 조회
                query_snapshot_indices : 특정 스냅샷에 담긴 인덱스 전체조회 
                restore : 인덱스 복구
                default는 query_snapshot 모드입니다. 
            [스냅샷명]
                query_snapshot_indices 모드와 restore 모드에서 사용됩니다.
                2개 모드에서, 스냅샷명이 없을 경우 오류를 리턴합니다.
            [복구할_인덱스명]
                복구할 인덱스는 단건으로만 가능하게 제한한다.
                스냅샷명 / 인덱스명이 미리 지정되어 있지 않다면 오류를 리턴한다
    """
    print(f"{'*'*70}")
    print(f"입력된 실행옵션 : {실행옵션}")
    print(f"입력된 인덱스 : {복구할_인덱스명}")
    print(f"{'*'*70}")
    
    """
        (1) ES 스냅샷 조회 및 복구 처리 람다 코어
    """
    if 실행옵션 == "query_snapshot_indices" : # 스냅샷의 인덱스 조회모드
        print("\n\n\n")
        print("[query_snapshot_indices] 모드 - 특정 스냅샷의 인덱스 조회") 
        리스트_스냅샷 = es_커넥터.snapshot.get(repository=레파지토리, snapshot="_all")["snapshots"]
        필터링된_스냅샷 = list(filter(lambda x : x["snapshot"] == 스냅샷명, 리스트_스냅샷))[0]
        print(f"{'*'*70}")
        print(*필터링된_스냅샷['indices'], sep='\n')
        #print([ES인덱스 for ES인덱스 in 필터링된_스냅샷['indices']])
        print(f"{'*'*70}")
        
        
    elif 실행옵션 == "restore" : # 스냅샷 복구모드
        print("\n\n\n")
        print("[query_snapshot_indices] 모드 - 특정 스냅샷의 인덱스 조회") 
        결과 = es_커넥터.snapshot.restore(
            레파지토리, 스냅샷명, 
            body={
                "indices": 복구할_인덱스명,
                "include_global_state": False   # 얜 뭐지
            }
        ) 
        print(f"{'*'*70}")
        print(결과)
        
        
    else : # 복구가능 스냅샷 전체 조회모드
        print("\n\n\n")
        print("[query_snapshot / default] 모드 - 스냅샷 전체 조회")
        리스트_스냅샷 = es_커넥터.snapshot.get(repository=레파지토리, snapshot="_all")["snapshots"]
        print(f"{'*'*70}")
        for dict_스냅샷아이템 in 리스트_스냅샷 :
            print(dict_스냅샷아이템["snapshot"])
        print(f"{'*'*70}") 
    
    # TODO implement
    return {
        'statusCode': 200,
        'body': json.dumps('Success!!!')
    }

 

 

 

반응형