글
S3 키 메타데이터 (이름/용량) 크롤러
IT/Python3
2021. 3. 18. 20:31
반응형
S3 전체가 약 50페타.
그런 버킷이 8세트...
전체를 크롤링 시, 프리픽스가 없는 상태라면 어쩔 수 없이 시간싸움으로 가게 된댜....
import boto3, json, pathlib, sys, asyncio
from datetime import datetime
class S3Util :
def __init__(self, 분석할_버킷명, 파일명, 게임id=None, 테이블명=None, 연=None, 월=None, 일=None) :
self.버킷명 = 분석할_버킷명
self.게임id, self.테이블명 = 게임id, 테이블명
self.연, self.월, self.일 = 연, 월, 일
self.파일명 = 파일명
# s3 오브젝트 크롤링
def s3_오브젝트_크롤링(self, prefix=None) :
pathlib.Path("./result").mkdir(parents=True, exist_ok=True)
s3c_lister = boto3.client("s3").get_paginator("list_objects_v2")
페이저 = s3c_lister.paginate(Bucket = self.버킷명)
if prefix != None : 페이저 = s3c_lister.paginate(Bucket = self.버킷명, Prefix=prefix)
with open("./result/" + self.파일명, "w") as 로컬_원본파일 :
for 페이지 in 페이저 :
for 응답 in 페이지["Contents"] :
응답키 = 응답["Key"].replace("/", ",")
로컬_원본파일.write( ",".join( ( 응답키, str(응답["Size"]), "\n" ) ) )
# s3 오브젝트 크롤링 비동기 업그레이드
def s3_오브젝트_비동기_크롤링(self, prefix=None) :
pathlib.Path("./result").mkdir(parents=True, exist_ok=True)
s3c_lister = boto3.client("s3").get_paginator("list_objects_v2")
페이저 = s3c_lister.paginate(Bucket = self.버킷명)
if prefix != None : 페이저 = s3c_lister.paginate(Bucket = self.버킷명, Prefix=prefix)
async def _s3_obj_list_rslt_file_write(파일, 응답키, 응답) :
파일.write(",".join( ( 응답키, str(응답["Size"]), "\n" ) ))
async def _s3_obj_list_async(파일) :
async for 페이지 in 페이저 :
for 응답 in 페이지["Contents"] :
응답키 = 응답["Key"].replace("/", ",")
await _s3_obj_list_rslt_file_write(파일, 응답키, 응답)
with open("./result/" + self.파일명, "w") as 로컬_원본파일 :
이벤트루프 = asyncio.get_event_loop()
이벤트루프.run_until_complete(_s3_obj_list_async(로컬_원본파일))
# 날짜별 총합
def s3_크롤링오브젝트_날짜별총합(self, 취합할_파일명) :
취합결과 = { "dummy" : "dummy" }
with open('./result/' + 취합할_파일명, 'r') as 파일:
for 파일한줄 in 파일 :
스플릿된_라인 = 파일한줄.split(",")
try :
yyyyMMdd = "-".join((스플릿된_라인[2], 스플릿된_라인[3], 스플릿된_라인[4]))
이_파일의_용량 = int(스플릿된_라인[-2])
try : 취합결과[yyyyMMdd] += 이_파일의_용량
except Exception as e : 취합결과[yyyyMMdd] = 이_파일의_용량
except Exception as e :
print("날짜별 파싱 오류 ")
print(파일한줄)
날짜출력_리스트 = []
with open("yyyyMMddSum_" + 취합할_파일명 +".txt", "w") as file :
for key in 취합결과.keys():
날짜출력_리스트.append(key)
날짜출력_리스트.sort()
for element in 날짜출력_리스트 :
쓸_내용 = ",".join( [ element, str( 취합결과[element] ) ] )
file.write( 쓸_내용 + "\n" )
# 게임id별 총합
def s3_크롤링오브젝트_게임id별총합(self, 취합할_파일명) :
취합결과 = { "dummy" : "dummy" }
with open('./result/' + 취합할_파일명, 'r') as 파일:
for 파일한줄 in 파일 :
스플릿된_라인 = 파일한줄.split(",")
게임id = str(스플릿된_라인[0])
이_파일의_용량 = int(스플릿된_라인[-2])
try : 취합결과[게임id] += 이_파일의_용량
except Exception as e : 취합결과[게임id] = 이_파일의_용량
with open("gameIdSum_" + 취합할_파일명 +".txt", "w") as file :
for k, v in 취합결과.items():
쓸_내용 = ",".join( [ k, str(v) ] )
file.write( 쓸_내용 + "\n")
# 게임id-테이블id별 총합
def s3_크롤링오브젝트_게임id_테이블별총합(self, 취합할_파일명) :
취합결과 = {
"dummy" : {
"a" : 0
}}
with open('./result/' + 취합할_파일명, 'r') as 파일:
for 파일한줄 in 파일 :
스플릿된_라인 = 파일한줄.split(",")
게임id = str(스플릿된_라인[0])
테이블id = str(스플릿된_라인[1])
이_파일의_용량 = int(스플릿된_라인[-2])
try : 취합결과[게임id]
except Exception as e : 취합결과[게임id] = {}
try : 취합결과[게임id][테이블id] += 이_파일의_용량
except Exception as e : 취합결과[게임id][테이블id] = 이_파일의_용량
with open("gameId_and_tableId_Sum_" + 취합할_파일명 +".txt", "w") as file :
for k, v in 취합결과.items():
for x, y in v.items() :
쓸_내용 = ",".join( [ k, x, str(y) ] )
file.write( 쓸_내용 + "\n")
# 게임id-테이블id-날짜별 총합
def s3_크롤링오브젝트_게임id_테이블id_날짜별총합(self, 취합할_파일명) :
취합결과 = {
"gameid" : {
"tableid" : {
"2021-01-01" : 0
}}}
with open('./result/' + 취합할_파일명, 'r') as 파일:
for 파일한줄 in 파일 :
스플릿된_라인 = 파일한줄.split(",")
게임id = str(스플릿된_라인[0])
테이블id = str(스플릿된_라인[1])
try :
yyyyMMdd = "-".join((스플릿된_라인[2], 스플릿된_라인[3], 스플릿된_라인[4]))
이_파일의_용량 = int(스플릿된_라인[-2])
try : 취합결과[게임id]
except Exception as e : 취합결과[게임id] = {}
try : 취합결과[게임id][테이블id]
except Exception as e : 취합결과[게임id][테이블id] = {}
try : 취합결과[게임id][테이블id][yyyyMMdd] += 이_파일의_용량
except Exception as e : 취합결과[게임id][테이블id][yyyyMMdd] = 이_파일의_용량
except Exception as e :
print("날짜 파싱 오류 데이터 발생")
print(파일한줄)
with open("gameId_and_tableId_and_yyyyMMdd_Sum_" + 취합할_파일명 +".txt", "w") as file :
for k, v in 취합결과.items():
for x, y in v.items() :
for z, w in y.items() :
쓸_내용 = ",".join( [k, x, z, str(w)] )
file.write(쓸_내용 + "\n")
if __name__ == "__main__" :
버킷명, 게임id, 테이블id = None, None, None
try :
버킷명 = sys.argv[1]
게임id = sys.argv[2]
테이블id = sys.argv[3]
except Exception as e :
if 버킷명 == None :
print("버킷명 필수입니다.")
exit(-1)
if 게임id == None and 테이블id != None :
print("게임id 없이 테이블id로 크롤링할 수 없습니다.")
exit(-1)
# (1) 파일로 결과물을 출력 시, 이에 해당하는 날짜로 네이밍을 완성합니다.
# 또한, 결과물을 해당 경로의 result에 디렉토리로 만듭니다.
# 실행 시간 측정용으로, exeinfo 파일에 실행/종료 시간을 씁니다.
today = str(datetime.today()).replace(" ", "_")
파일명 = 버킷명 + "__" + str(게임id) + "__" + str(테이블id) + "__" + today
pathlib.Path("./result").mkdir(parents=True, exist_ok=True)
로컬_실행결과파일 = open("./result/" + 버킷명 + "_exeinfo_" + today, "w")
로컬_실행결과파일.write( f"실행 시작시간 : '{today}' \n" )
# (2) 버킷명 / 게임id / 테이블id 아규먼트를 기준으로, s3를 크롤링합니다.
데이터분석 = S3Util(버킷명, 파일명, 게임id, 테이블id)
if 게임id == None : 데이터분석.s3_오브젝트_크롤링()
elif 게임id != None and 테이블id == None : 데이터분석.s3_오브젝트_크롤링(게임id)
elif 게임id != None and 테이블id != None : 데이터분석.s3_오브젝트_크롤링(게임id + "/" + 테이블id)
else : print("입력값 분기처리 오류")
# (3) 종료 시간을 작성 후, 파일스트림 파이프를 닫습니다.
로컬_실행결과파일.write( f"실행 종료시간 : '{today}' \n" )
로컬_실행결과파일.close()
반응형
'IT > Python3' 카테고리의 다른 글
Venv (0) | 2021.08.24 |
---|---|
초대용량 처리 시, 이터레이터 (0) | 2021.03.22 |
비동기 처리 sample (0) | 2021.03.18 |
Python3 딕셔너리 엘리먼트 타입의 리스트 아이템 중복제거 (0) | 2020.12.16 |
쿠버네티스 노드 관제 커스터마이즈 (0) | 2020.11.06 |