S3 키 메타데이터 (이름/용량) 크롤러

IT/Python3 2021. 3. 18. 20:31
반응형

S3 전체가 약 50페타.

그런 버킷이 8세트...

전체를 크롤링 시, 프리픽스가 없는 상태라면 어쩔 수 없이 시간싸움으로 가게 된댜....

 

import boto3, json, pathlib, sys, asyncio
from datetime import datetime

class S3Util :
	def __init__(self, 분석할_버킷명, 파일명, 게임id=None, 테이블명=None, 연=None, 월=None, 일=None) :
		self.버킷명 = 분석할_버킷명
		self.게임id, self.테이블명 = 게임id, 테이블명 
		self.연, self.월, self.일 = 연, 월, 일
		self.파일명 = 파일명
	
	# s3 오브젝트 크롤링
	def s3_오브젝트_크롤링(self, prefix=None) :
		pathlib.Path("./result").mkdir(parents=True, exist_ok=True)

		s3c_lister = boto3.client("s3").get_paginator("list_objects_v2")
		페이저 = s3c_lister.paginate(Bucket = self.버킷명)
		if prefix != None : 페이저 = s3c_lister.paginate(Bucket = self.버킷명, Prefix=prefix)
		
		with open("./result/" + self.파일명, "w") as 로컬_원본파일 : 
			for 페이지 in 페이저 :
				for 응답 in 페이지["Contents"] :
					응답키 = 응답["Key"].replace("/", ",")
					로컬_원본파일.write( ",".join( ( 응답키, str(응답["Size"]), "\n" ) ) )
	
	# s3 오브젝트 크롤링 비동기 업그레이드 
	def s3_오브젝트_비동기_크롤링(self, prefix=None) :
		pathlib.Path("./result").mkdir(parents=True, exist_ok=True)

		s3c_lister = boto3.client("s3").get_paginator("list_objects_v2")
		페이저 = s3c_lister.paginate(Bucket = self.버킷명)
		if prefix != None : 페이저 = s3c_lister.paginate(Bucket = self.버킷명, Prefix=prefix)

		async def _s3_obj_list_rslt_file_write(파일, 응답키, 응답) :
			파일.write(",".join( ( 응답키, str(응답["Size"]), "\n" ) ))

		async def _s3_obj_list_async(파일) :
			async for 페이지 in 페이저 :
				for 응답 in 페이지["Contents"] :
					응답키 = 응답["Key"].replace("/", ",")
					await _s3_obj_list_rslt_file_write(파일, 응답키, 응답)

		with open("./result/" + self.파일명, "w") as 로컬_원본파일 :
			이벤트루프 = asyncio.get_event_loop()
			이벤트루프.run_until_complete(_s3_obj_list_async(로컬_원본파일))

	# 날짜별 총합
	def s3_크롤링오브젝트_날짜별총합(self, 취합할_파일명) :
		취합결과 = { "dummy" : "dummy" }
		with open('./result/' + 취합할_파일명, 'r') as 파일:
			for 파일한줄 in 파일 :
				스플릿된_라인 = 파일한줄.split(",")
				try : 
					yyyyMMdd = "-".join((스플릿된_라인[2], 스플릿된_라인[3], 스플릿된_라인[4]))
					이_파일의_용량 = int(스플릿된_라인[-2])
					try : 취합결과[yyyyMMdd] += 이_파일의_용량 
					except Exception as e : 취합결과[yyyyMMdd] = 이_파일의_용량
				except Exception as e :
					print("날짜별 파싱 오류 ")
					print(파일한줄)
	
		날짜출력_리스트 = []
		with open("yyyyMMddSum_" + 취합할_파일명 +".txt", "w") as file :
			for key in 취합결과.keys():
				날짜출력_리스트.append(key)

			날짜출력_리스트.sort()
			for element in 날짜출력_리스트 :
				쓸_내용 = ",".join( [ element, str( 취합결과[element] ) ] )
				file.write( 쓸_내용 + "\n" )

	# 게임id별 총합
	def s3_크롤링오브젝트_게임id별총합(self, 취합할_파일명) :
		취합결과 = { "dummy" : "dummy" }

		with open('./result/' + 취합할_파일명, 'r') as 파일:
			for 파일한줄 in 파일 :
				스플릿된_라인 = 파일한줄.split(",")
				게임id = str(스플릿된_라인[0])
				이_파일의_용량 = int(스플릿된_라인[-2])
				try : 취합결과[게임id] += 이_파일의_용량 
				except Exception as e : 취합결과[게임id] = 이_파일의_용량

		with open("gameIdSum_" + 취합할_파일명 +".txt", "w") as file :
			for k, v in 취합결과.items():
				쓸_내용 = ",".join( [ k, str(v) ] )
				file.write( 쓸_내용 + "\n")

	# 게임id-테이블id별 총합
	def s3_크롤링오브젝트_게임id_테이블별총합(self, 취합할_파일명) :
		취합결과 = { 
			"dummy" : { 
				"a" : 0 
			}}

		with open('./result/' + 취합할_파일명, 'r') as 파일:
			for 파일한줄 in 파일 :
				스플릿된_라인 = 파일한줄.split(",")
				게임id = str(스플릿된_라인[0])
				테이블id = str(스플릿된_라인[1])
				이_파일의_용량 = int(스플릿된_라인[-2])

				try : 취합결과[게임id] 
				except Exception as e : 취합결과[게임id] = {}

				try : 취합결과[게임id][테이블id] += 이_파일의_용량
				except Exception as e : 취합결과[게임id][테이블id] = 이_파일의_용량

		with open("gameId_and_tableId_Sum_" + 취합할_파일명 +".txt", "w") as file :
			for k, v in 취합결과.items():
				for x, y in v.items() :
					쓸_내용 = ",".join( [ k, x, str(y) ] )
					file.write( 쓸_내용 + "\n")
		
	# 게임id-테이블id-날짜별 총합
	def s3_크롤링오브젝트_게임id_테이블id_날짜별총합(self, 취합할_파일명) :
		취합결과 = {
			"gameid" : {
				"tableid" : {
					"2021-01-01" : 0
				}}}

		with open('./result/' + 취합할_파일명, 'r') as 파일:
			for 파일한줄 in 파일 :
				스플릿된_라인 = 파일한줄.split(",")
				게임id = str(스플릿된_라인[0])
				테이블id = str(스플릿된_라인[1])
				try : 
					yyyyMMdd = "-".join((스플릿된_라인[2], 스플릿된_라인[3], 스플릿된_라인[4]))
					이_파일의_용량 = int(스플릿된_라인[-2])

					try : 취합결과[게임id] 
					except Exception as e : 취합결과[게임id] = {}

					try : 취합결과[게임id][테이블id]
					except Exception as e : 취합결과[게임id][테이블id] = {}

					try : 취합결과[게임id][테이블id][yyyyMMdd] += 이_파일의_용량
					except Exception as e : 취합결과[게임id][테이블id][yyyyMMdd] = 이_파일의_용량 
				except Exception as e : 
					print("날짜 파싱 오류 데이터 발생")
					print(파일한줄)

		with open("gameId_and_tableId_and_yyyyMMdd_Sum_" + 취합할_파일명 +".txt", "w") as file :
			for k, v in 취합결과.items():
				for x, y in v.items() :
					for z, w in y.items() :
						쓸_내용 = ",".join( [k, x, z, str(w)] )
						file.write(쓸_내용 + "\n")

if __name__ == "__main__" : 
	버킷명, 게임id, 테이블id = None, None, None

	try :
		버킷명 = sys.argv[1]
		게임id = sys.argv[2]
		테이블id = sys.argv[3]
	except Exception as e :
		if 버킷명 == None :
			print("버킷명 필수입니다.")
			exit(-1)
		if 게임id == None and 테이블id != None :
			print("게임id 없이 테이블id로 크롤링할 수 없습니다.")
			exit(-1)
	
	# (1) 파일로 결과물을 출력 시, 이에 해당하는 날짜로 네이밍을 완성합니다.
	#		또한, 결과물을 해당 경로의 result에 디렉토리로 만듭니다.
	#		실행 시간 측정용으로, exeinfo 파일에 실행/종료 시간을 씁니다.
	today = str(datetime.today()).replace(" ", "_")
	파일명 = 버킷명 + "__" + str(게임id) + "__" + str(테이블id) + "__" + today

	pathlib.Path("./result").mkdir(parents=True, exist_ok=True)
	로컬_실행결과파일 = open("./result/" + 버킷명 + "_exeinfo_" + today, "w")
	로컬_실행결과파일.write( f"실행 시작시간 : '{today}' \n" )

	# (2) 버킷명 / 게임id / 테이블id 아규먼트를 기준으로, s3를 크롤링합니다.
	데이터분석 = S3Util(버킷명, 파일명, 게임id, 테이블id)
	if 게임id == None : 데이터분석.s3_오브젝트_크롤링() 
	elif 게임id != None and 테이블id == None : 데이터분석.s3_오브젝트_크롤링(게임id)
	elif 게임id != None and 테이블id != None : 데이터분석.s3_오브젝트_크롤링(게임id + "/" + 테이블id)
	else : print("입력값 분기처리 오류")

	# (3) 종료 시간을 작성 후, 파일스트림 파이프를 닫습니다.
	로컬_실행결과파일.write( f"실행 종료시간 : '{today}' \n" )
	로컬_실행결과파일.close()
반응형

설정

트랙백

댓글