Nginx Access Log 엘라스틱서치 조회 / S3 백업 람다

반응형

API  : https://elasticsearch-py.readthedocs.io/en/7.x/

 

Python Elasticsearch Client — Elasticsearch 7.16.0 documentation

This client was designed as very thin wrapper around Elasticsearch’s REST API to allow for maximum flexibility. This means that there are no opinions in this client; it also means that some of the APIs are a little cumbersome to use from Python. We have

elasticsearch-py.readthedocs.io

 

 

Nginx Access Log가 들어있는 ES 데이터를 1분단위 (혹은 시간 범위) + 매칭쿼리.

AWS S3로 파일적재. (코드 최적화는 아직 안된 상태며 기능 구현에 초점)

Nginx 로그 1만줄을 S3 파일로 (비동기로) 옮기는데 대략 10초가 안될 정도로 소요됨.

 

람다 스펙은 10G 최대로 테스트해본 상태였댜.

현재 단순 산술식으로 하면, 특정 필터랑 쿼리로 걸러진 Nginx 로그들을 6만줄 (분당 6만건 요청. 6만 TPM) 까지는 처리가 가능하다는 얘기가 나온댜

 

이건 1분마다 도는 그 시간인, 1분안에 끝내야 한다는 조건이 굳이 없다면

크론을 1분마다 돌리고 타임아웃 5분정도로 설정하면 충분히 Nginx Access Log를 정제해서 SRE 요소로 뽑을 수 있다는 얘기가 성립될 수 있댜 ㅇㅇ

즉 실제로 데이터 보여주는 기간에 시간 차를 두어서 1~5분 딜레이 정도면 SLI/SLO 를 준.실시간으로 (semi-realtime) 보여줄 수 있댜

 

 

조합해서 가져오는 파이썬 런타임 람다 업로드한댜.

람다 레이어와 환경 설정은 본 항목에선 배제한댜.

 

- 람다레이어 : 서버리스 런타임 중 필요로 하는 라이브러리 / 디펜던시를 가져오는 기능. 리눅스 기반에서 돌아가기에, 경로가 아마 /opt 이하로 라이브러리들이 떨궈짐

 

import json, os
import pprint as ppr
from elasticsearch import Elasticsearch
from dateutil import tz
from datetime import timedelta, datetime
import boto3


"""

"""

def lambda_handler(event, context):

    # (0) 환경설정
    환경 = os.environ.get('env')
    서비스명 = os.environ.get('servicename')
    서비스_path명 = f"/{서비스명}/"
    seoul_tz = tz.gettz('Asia/Seoul')
    
    호스트 = ""
    포트 = 80
    if 환경 == 'dev' :
        호스트 = "(개발환경_엘사_호스트)"
    elif 환경 == 'prd' : 
        호스트 = "(운영.라이브.상용 환경 엘사_호스트)"
    
    es = Elasticsearch(hosts=호스트, port=포트)
    
    # (1) 시간 설정 및 조회
    지금시간 = datetime.now(tz=seoul_tz)
    기준시간 = datetime(지금시간.year, 지금시간.month, 지금시간.day, 지금시간.hour, 지금시간.minute, 00, tzinfo=seoul_tz)
    기준시간2 = 기준시간 - timedelta(minutes=1)
    기준시간1 = 기준시간 - timedelta(minutes=2)
    
    results = es.search(
    	index="k8s-gateway*",
    	body={
    		"size" : 10000,
    		"_source": {"includes": [ "log" ],},
    		"query": {
    			"bool": {
    				"must": 
    				[
    				 	{
    						"range": {
    							"time": {
    								"from"	: 기준시간1,
    								"to"	: 기준시간2
    							}}},
    				# 		"range": {
    				# 			"time": {
    				# 				"from"	: datetime(2021, 12, 22, 9, 00, 00 , tzinfo=seoul_tz),
    				# 				"to"	: datetime(2021, 12, 22, 9, 30, 00 , tzinfo=seoul_tz)
    				# 			}}},
    					{ 
    						"match": {"log": f"{서비스_path명}"} , }
    				] }} })
    
    리스트_nginx로그 = []
    
    for result in results['hits']['hits']: 
        리스트_nginx로그.append(result['_source'])
        #print('score:', result['_score'], 'source:', result['_source'])
    print("----------------")
    #for i in 리스트_nginx로그: print(i)
    #print(len(리스트_nginx로그))
    #print(리스트_nginx로그)
    
    # (2) s3 파일 생성한다
    s3 = boto3.Session().resource('s3')
    s3 = boto3.client('s3')

    버킷명 = "앙몬드가_넣을_버킷명"
    키파일 = 서비스명 + "/" + str(기준시간).replace("+09:00", "").replace(" ","_")
    
    # 안에 내용물이 딕셔너리다보니, join이 안되는거 같음 ㅇㅇ..  
    # 페이로드 = "\n".join(리스트_nginx로그)
    페이로드 = []
    
    print(f"지금 로그 개수 {len(리스트_nginx로그)}" )
    
    for 로그한줄 in 리스트_nginx로그 : 
        버퍼 = str(로그한줄).split(" ")
        시간 = 버퍼[4] + 버퍼[5]
        URL = 버퍼[6] + " " + 버퍼[7] + " " + 버퍼[8]
        리퀘스트코드 = 버퍼[9]
        
        리퀘스트레이턴시 = ""
        for index, value in enumerate(버퍼):
        	if "[gateway-to-" in value:
        		리퀘스트레이턴시 = 버퍼[index-1]
        		break
        
        리스판스코드 = 버퍼[-2]
        리스판스레이턴시 = 버퍼[-3]
        
        #print(f"{시간}  |  {URL}  |  {리퀘스트코드}  |  {리퀘스트레이턴시}  |  {리스판스코드}  |  {리스판스레이턴시}")
        
        변환한_한줄 = "|".join([시간,URL,리퀘스트코드,리퀘스트레이턴시,리스판스코드,리스판스레이턴시])
        페이로드.append( 변환한_한줄 )
        
    페이로드 = '\n'.join(페이로드) + "\n"

    response = s3.put_object( 
        Bucket=버킷명,
        Body=페이로드,
        Key=키파일
    )

 

 

반응형

설정

트랙백

댓글