MongoDB에서 데이터 추출
이번 예제에서는 집합(collection)에서 MongoDB 문서(document)의 하위 집합을 추출하는 방법을 다루고 있습니다!
=> 이 예제에서 MongoDB 집합에서 문서는 웹 서버와 같은 일부 시스템에서 기록된 이벤트를 나타냄
(env) pip install pymongo
=> MongoDB 데이터베이스에 연결하려면 먼저 PyMongo 라이브러리를 설치해야 한다
그 후 MongoDB의 Atlas에서 무료 MongoDB 클러스터를 생성하고, 데이터베이스를 생성한다!
(env) pip install dnspython
=> MongoDB Atlas에서 호스팅하는 클러스터에 연결할 때 pymongo를 사용하려면 dnspython이라는 파이썬 라이브러리를 하나 더 설치해야 함
=> 다음으로 데이터를 추출할 MongoDB 인스턴스에 대한 연결 정보를 pipeline.conf 파일에 새 섹션으로 추가한다
추출 스크립트를 만들고 실행하기 전에 작업할 샘플 데이터를 삽입할 sample_mongodb.py 라는 파일을 만들어준다!
from pymongo import MongoClient
import datetime
import configparser
# mongo_config 값을 로드
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name= parser.get("mongo_config", "database")
collection_name = parser.get("mongo_config", "collection")
mongo_client = MongoClient("mongodb+srv://" + username
+ ":" + password
+ "@" + hostname
+ "/" + database_name
+ "?retryWrites=true&"
+ "w=majority&ssl=true&"
+ "ssl_cert_reqs=CERT_NONE")
# 컬렉션이 위치한 db에 연결
mongo_db = mongo_client[database_name]
# 문서를 쿼리할 컬렉션을 선택
mongo_collection = mongo_db[collection_name]
event_1 = {
"event_id" : 1,
"event_timestamp" : datetime.datetime.today(),
"event_name" : "signup"
}
event_2 = {
"event_id" : 2,
"event_timestamp" : datetime.datetime.today(),
"event_name" : "pageview"
}
event_3 = {
"event_id" : 3,
"event_timestamp" : datetime.datetime.today(),
"event_name" : "login"
}
# 3개 문서 입력
mongo_collection.insert_one(event_1)
mongo_collection.insert_one(event_2)
mongo_collection.insert_one(event_3)
=> 이렇게 collection에 문서 3개가 insert 되었다
이제 mongo_extract.py 라는 새로운 파이썬 스크립트를 만들면 된다!
from pymongo import MongoClient
import csv
import boto3
import datetime
from datetime import timedelta
import configparser
# mongo_config 값을 로드
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name= parser.get("mongo_config", "database")
collection_name = parser.get("mongo_config", "collection")
mongo_client = MongoClient("mongodb+srv://" + username
+ ":" + password
+ "@" + hostname
+ "/" + database_name
+ "?retryWrites=true&"
+ "w=majority&ssl=true&"
+ "ssl_cert_reqs=CERT_NONE")
# 컬렉션이 위치한 db에 연결
mongo_db = mongo_client[database_name]
# 문서를 쿼리할 컬렉션을 선택
mongo_collection = mongo_db[collection_name]
코드를 살펴보면 csv 라이브러리를 가져와 추출된 데이터를 수집 로드 단계에서 데이터 웨어하우스로 쉽게 가져올 수 있는 플랫 파일로 정형화하고 쓸 수 있다!
또한, MongoDB 컬렉션의 샘플 이벤트 데이터를 반복할 수 있게 몇 가지 datetime 함수를 사용한다!
이제 추출할 문서를 조회할 차례인데 mongo_collection의 .find() 기능을 호출하여 찾으려는 문서를 쿼리할 수 있다
start_date = datetime.datetime.today() + time
delta(days = -1)
end_date = start_date + timedelta(days = 1)
mongo_query = { "$and" : [ { "event_timestamp" : { "$gte" : start_date } }, { "event_timestamp" : { "$lt" : end_date } } ] }
event_docs = mongo_collection.find(mongo_query, batch_size = 3000)
=> 이 코드에서는 event_timestamp 필드에 스크립트에서 정의한 두 날짜 사이의 값을 가진 모든 문서를 가져옴!
(이 코드의 결과는 결과 문서를 반복하는 데 사용할 event_docs라는 커서다! )
PyMongo는 각 배치에 대해 MongoDB 호스트를 왕복하는데 예를 들어 result_docs Cursor에 6000개의 결과가 있으면 파이썬 스크립트가 실행 중인 시스템으로 모든 문서를 가져오려면 MongoDB 호스트로 두 번 이동하게 된다!
# 결과를 저장할 빈 리스트를 생성
all_events = []
# 커서를 통해 반복 작업
for doc in event_docs :
# 기본 값을 포함
event_id = str(doc.get("event_id", -1))
event_timestamp = doc.get("event_timestamp", None)
event_name = doc.get("event_name", None)
# 리스트에 모든 이벤트 속성을 추가
current_event = []
current_event.append(event_id)
current_event.append(event_timestamp)
current_event.append(event_name)
# 이벤트의 최종 리스트에 이벤트를 추가
all_events.append(current_event)
여기서는 doc.get() 함수 호출(-1 또는 none)에 기본값을 넣었는데 그 이유는 비정형 문서 데이터의 특성상 문서에서 필드가 모두 누락될 수 있기 때문이다
=> 반복하는 각 문서에 'event_name' 또는 다른 필드가 있다고 가정할 수 없다!
(이 경우 오류를 발생시키는 대신 none 값을 반환하도록 doc.get()에 지시함)
event_docs의 모든 이벤트를 반복하고 나면 all_events 목록은 CSV 파일에 쓸 준비가 된다!
export_file = "events_export.csv"
with open(export_file, 'w') as fp :
csvw = csv.writer(fp, delimiter = "|")
csvw.writerows(all_events)
fp.close()
이제 이렇게 쓴 CSV 파일을 Boto3 라이브러리를 이용해 지난 포스팅에 구성했던 S3 버킷에 업로드한다
# aws_boto_credentials 값을 로드
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client("s3", aws_access_key_id = access_key, aws_secret_access_key = secret_key)
s3_file = export_file
s3.upload_file(export_file, bucket_name, s3_file)
이렇게 CSV 파일도 잘 작성되는걸 확인할 수 있고 S3 버킷에도 업로드 됨!
REST API
REST API는 데이터를 추출하는 흔한 방법이다
=> 조직에서 만들고 유지관리하거나 외부 서비스/공급업체의 API에 관계없이 데이터 추출에는 공통 패턴이 있다!
1. API 엔드포인트로 HTTP GET 요청을 보낸다.
2. JSON 형식일 가능성이 높은 응답을 수락한다.
3. 응답을 구문 분석하고 나중에 데이터 웨어하우스에 로드할 수 있는 CSV 파일로 변환(평탄화) 한다.
개인적으로는 JSON 형식으로 온 응답을 구문 분석(parsing)하여 플랫 파일(CSV)에 저장하지만, 데이터를 데이터 웨어하우스에 로드하기 위해 JSON 형식으로 저장할 수도 있다!
이 예제에서는 Open Notify라는 API에 연결한다!
API에는 여러 개의 엔드포인트가 있으며, 각 엔드포인트에는 우주에서 일어나는 일에 대한 NASA의 데이터가 반환된다
=> 여기서는 국제 우주 정거장이 주어진 위치를 통과할 다섯 개의 순차적 응답을 반환하는 엔드포인트를 쿼리할 것이다
근데 이제 해당 api 가 removed 되었다는... 코드만 살펴보겠습니다!
(env) pip install requests
=> API를 쿼리하고 파이썬에서 응답을 처리하려면 requests 라이브러리를 설치해야 한다
(Requests 라이브러리를 통해 파이썬에서 HTTP 요청 및 응답을 쉽게 사용할 수 있다)
import requests
import json
import configparser
import csv
import boto3
lat = 42.36
lon = 71.05
lat_log_params = {"lat" : lat, "lon" : lon}
api_response = requests.get("http://api.open-notify.org/iss-pass.json", params=lat_log_params)
# 응답 내용에서 json 객체 생성
response_json = json.loads(api_response.content)
all_passes = []
for response in response_json['response'] :
current_pass = []
# 요청에서 위도/경도를 저장
current_pass.append(lat)
current_pass.append(lon)
# 통과 시 지속 시간과 상승 시간을 저장
current_pass.append(response['duration'])
current_pass.append(response['risetime'])
all_passes.append(current_pass)
export_file = "export_file.csv"
with open(export_file, 'w') as fp :
csvw = csv.writer(fp, delimiter='|')
csvw.writerows(all_passes)
fp.close()
# aws_boto_credentials 값을 로드
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
access_key = parser.get('aws_boto_credentials', 'access_key')
secret_key = parser.get('aws_boto_credentials', 'secret_key')
bucket_name = parser.get('aws_boto_credentials', 'bucket_name')
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3.upload_file(export_file, bucket_name, export_file)
이제 요청을 사용하여 API 엔드포인트를 쿼리하고 응답을 받은 후 결과 JSON을 확인할 수 있다!
카프카 및 Debezium을 통한 스트리밍 데이터 수집
MySQL 이진 로그 또는 Postgres WALs와 같은 CDC 시스템을 통해 데이터를 수집할 때 프레임 워크의 도움이 필요하다!
Debezium은 여러 오픈 소스 서비스로 구성된 분산 시스템으로 일반적인 CDC 시스템에서 행 수준 변경을 캡처한 후 다른 시스템에서 사용할 수 있는 이벤트로 스트리밍해주는 시스템이다!
Debezium 설치에는 세 가지 주요 구성 요소가 있다
- 아파치 주키퍼는 분산 환경을 관리하고 각 서비스의 구성을 처리
- 아파치 카프카는 확장성이 뛰어난 데이터 파이프라인을 구축하는 데 일반적으로 사용되는 분산 스트리밍 플랫폼
- 아파치 카프카 커넥트는 데이터를 카프카를 통해 쉽게 스트리밍할 수 있도록 카프카를 다른 시스템과 연결하는 도구
(커넥터는 MySQL 및 Postgres와 같은 시스템용으로 구축되었으며 CDC 시스템의 데이터를 카프카 토픽으로 변환)
카프카는 토픽별로 정리된 메시지를 교환한다
=> 하나의 시스템은 토픽에 게시(publish)할 수 있는 반면, 하나 이상의 시스템은 토픽을 소비(consume) 하거나 구독(subscribe)할 수 있다!
Debezium은 이러한 시스템을 함께 연결하고 일반적인 CDC 구현을 위한 커넥터를 포함한다
=> MySQL 이진 로그 및 Postgres WAL을 수신(listen)할 수 있는 커넥터가 이미 있고 이런 데이터는 카프카를 통해 토픽의 레코드로 라우팅되고 다른 커넥터를 사용하여 S3 버킷, Snowflake 또는 Redshift 와 같은 데이터 웨어하우스를 대상으로 소비된다!
'Data Engineering > Data Pipeline' 카테고리의 다른 글
데이터 수집 : 데이터 로드 (1) | 2024.07.03 |
---|---|
데이터 수집 : 데이터 추출 (MySQL) (1) | 2024.06.29 |
일반적인 데이터 파이프라인 패턴 (1) | 2024.05.15 |
최신 데이터 인프라 (0) | 2024.05.12 |
데이터 파이프라인 소개 (0) | 2024.05.06 |