MongoDB์์ ๋ฐ์ดํฐ ์ถ์ถ
์ด๋ฒ ์์ ์์๋ ์งํฉ(collection)์์ MongoDB ๋ฌธ์(document)์ ํ์ ์งํฉ์ ์ถ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ค๋ฃจ๊ณ ์์ต๋๋ค!
=> ์ด ์์ ์์ MongoDB ์งํฉ์์ ๋ฌธ์๋ ์น ์๋ฒ์ ๊ฐ์ ์ผ๋ถ ์์คํ ์์ ๊ธฐ๋ก๋ ์ด๋ฒคํธ๋ฅผ ๋ํ๋
(env) pip install pymongo
=> MongoDB ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐํ๋ ค๋ฉด ๋จผ์ PyMongo ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ค์นํด์ผ ํ๋ค
๊ทธ ํ MongoDB์ Atlas์์ ๋ฌด๋ฃ MongoDB ํด๋ฌ์คํฐ๋ฅผ ์์ฑํ๊ณ , ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ์์ฑํ๋ค!
(env) pip install dnspython
=> MongoDB Atlas์์ ํธ์คํ ํ๋ ํด๋ฌ์คํฐ์ ์ฐ๊ฒฐํ ๋ pymongo๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด dnspython์ด๋ผ๋ ํ์ด์ฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ํ๋ ๋ ์ค์นํด์ผ ํจ
=> ๋ค์์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ MongoDB ์ธ์คํด์ค์ ๋ํ ์ฐ๊ฒฐ ์ ๋ณด๋ฅผ pipeline.conf ํ์ผ์ ์ ์น์ ์ผ๋ก ์ถ๊ฐํ๋ค
์ถ์ถ ์คํฌ๋ฆฝํธ๋ฅผ ๋ง๋ค๊ณ ์คํํ๊ธฐ ์ ์ ์์ ํ ์ํ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ํ sample_mongodb.py ๋ผ๋ ํ์ผ์ ๋ง๋ค์ด์ค๋ค!
from pymongo import MongoClient
import datetime
import configparser
# mongo_config ๊ฐ์ ๋ก๋
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name= parser.get("mongo_config", "database")
collection_name = parser.get("mongo_config", "collection")
mongo_client = MongoClient("mongodb+srv://" + username
+ ":" + password
+ "@" + hostname
+ "/" + database_name
+ "?retryWrites=true&"
+ "w=majority&ssl=true&"
+ "ssl_cert_reqs=CERT_NONE")
# ์ปฌ๋ ์
์ด ์์นํ db์ ์ฐ๊ฒฐ
mongo_db = mongo_client[database_name]
# ๋ฌธ์๋ฅผ ์ฟผ๋ฆฌํ ์ปฌ๋ ์
์ ์ ํ
mongo_collection = mongo_db[collection_name]
event_1 = {
"event_id" : 1,
"event_timestamp" : datetime.datetime.today(),
"event_name" : "signup"
}
event_2 = {
"event_id" : 2,
"event_timestamp" : datetime.datetime.today(),
"event_name" : "pageview"
}
event_3 = {
"event_id" : 3,
"event_timestamp" : datetime.datetime.today(),
"event_name" : "login"
}
# 3๊ฐ ๋ฌธ์ ์
๋ ฅ
mongo_collection.insert_one(event_1)
mongo_collection.insert_one(event_2)
mongo_collection.insert_one(event_3)
=> ์ด๋ ๊ฒ collection์ ๋ฌธ์ 3๊ฐ๊ฐ insert ๋์๋ค
์ด์ mongo_extract.py ๋ผ๋ ์๋ก์ด ํ์ด์ฌ ์คํฌ๋ฆฝํธ๋ฅผ ๋ง๋ค๋ฉด ๋๋ค!
from pymongo import MongoClient
import csv
import boto3
import datetime
from datetime import timedelta
import configparser
# mongo_config ๊ฐ์ ๋ก๋
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name= parser.get("mongo_config", "database")
collection_name = parser.get("mongo_config", "collection")
mongo_client = MongoClient("mongodb+srv://" + username
+ ":" + password
+ "@" + hostname
+ "/" + database_name
+ "?retryWrites=true&"
+ "w=majority&ssl=true&"
+ "ssl_cert_reqs=CERT_NONE")
# ์ปฌ๋ ์
์ด ์์นํ db์ ์ฐ๊ฒฐ
mongo_db = mongo_client[database_name]
# ๋ฌธ์๋ฅผ ์ฟผ๋ฆฌํ ์ปฌ๋ ์
์ ์ ํ
mongo_collection = mongo_db[collection_name]
์ฝ๋๋ฅผ ์ดํด๋ณด๋ฉด csv ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ๊ฐ์ ธ์ ์ถ์ถ๋ ๋ฐ์ดํฐ๋ฅผ ์์ง ๋ก๋ ๋จ๊ณ์์ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค๋ก ์ฝ๊ฒ ๊ฐ์ ธ์ฌ ์ ์๋ ํ๋ซ ํ์ผ๋ก ์ ํํํ๊ณ ์ธ ์ ์๋ค!
๋ํ, MongoDB ์ปฌ๋ ์ ์ ์ํ ์ด๋ฒคํธ ๋ฐ์ดํฐ๋ฅผ ๋ฐ๋ณตํ ์ ์๊ฒ ๋ช ๊ฐ์ง datetime ํจ์๋ฅผ ์ฌ์ฉํ๋ค!
์ด์ ์ถ์ถํ ๋ฌธ์๋ฅผ ์กฐํํ ์ฐจ๋ก์ธ๋ฐ mongo_collection์ .find() ๊ธฐ๋ฅ์ ํธ์ถํ์ฌ ์ฐพ์ผ๋ ค๋ ๋ฌธ์๋ฅผ ์ฟผ๋ฆฌํ ์ ์๋ค
start_date = datetime.datetime.today() + time
delta(days = -1)
end_date = start_date + timedelta(days = 1)
mongo_query = { "$and" : [ { "event_timestamp" : { "$gte" : start_date } }, { "event_timestamp" : { "$lt" : end_date } } ] }
event_docs = mongo_collection.find(mongo_query, batch_size = 3000)
=> ์ด ์ฝ๋์์๋ event_timestamp ํ๋์ ์คํฌ๋ฆฝํธ์์ ์ ์ํ ๋ ๋ ์ง ์ฌ์ด์ ๊ฐ์ ๊ฐ์ง ๋ชจ๋ ๋ฌธ์๋ฅผ ๊ฐ์ ธ์ด!
(์ด ์ฝ๋์ ๊ฒฐ๊ณผ๋ ๊ฒฐ๊ณผ ๋ฌธ์๋ฅผ ๋ฐ๋ณตํ๋ ๋ฐ ์ฌ์ฉํ event_docs๋ผ๋ ์ปค์๋ค! )
PyMongo๋ ๊ฐ ๋ฐฐ์น์ ๋ํด MongoDB ํธ์คํธ๋ฅผ ์๋ณตํ๋๋ฐ ์๋ฅผ ๋ค์ด result_docs Cursor์ 6000๊ฐ์ ๊ฒฐ๊ณผ๊ฐ ์์ผ๋ฉด ํ์ด์ฌ ์คํฌ๋ฆฝํธ๊ฐ ์คํ ์ค์ธ ์์คํ ์ผ๋ก ๋ชจ๋ ๋ฌธ์๋ฅผ ๊ฐ์ ธ์ค๋ ค๋ฉด MongoDB ํธ์คํธ๋ก ๋ ๋ฒ ์ด๋ํ๊ฒ ๋๋ค!
# ๊ฒฐ๊ณผ๋ฅผ ์ ์ฅํ ๋น ๋ฆฌ์คํธ๋ฅผ ์์ฑ
all_events = []
# ์ปค์๋ฅผ ํตํด ๋ฐ๋ณต ์์
for doc in event_docs :
# ๊ธฐ๋ณธ ๊ฐ์ ํฌํจ
event_id = str(doc.get("event_id", -1))
event_timestamp = doc.get("event_timestamp", None)
event_name = doc.get("event_name", None)
# ๋ฆฌ์คํธ์ ๋ชจ๋ ์ด๋ฒคํธ ์์ฑ์ ์ถ๊ฐ
current_event = []
current_event.append(event_id)
current_event.append(event_timestamp)
current_event.append(event_name)
# ์ด๋ฒคํธ์ ์ต์ข
๋ฆฌ์คํธ์ ์ด๋ฒคํธ๋ฅผ ์ถ๊ฐ
all_events.append(current_event)
์ฌ๊ธฐ์๋ doc.get() ํจ์ ํธ์ถ(-1 ๋๋ none)์ ๊ธฐ๋ณธ๊ฐ์ ๋ฃ์๋๋ฐ ๊ทธ ์ด์ ๋ ๋น์ ํ ๋ฌธ์ ๋ฐ์ดํฐ์ ํน์ฑ์ ๋ฌธ์์์ ํ๋๊ฐ ๋ชจ๋ ๋๋ฝ๋ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค
=> ๋ฐ๋ณตํ๋ ๊ฐ ๋ฌธ์์ 'event_name' ๋๋ ๋ค๋ฅธ ํ๋๊ฐ ์๋ค๊ณ ๊ฐ์ ํ ์ ์๋ค!
(์ด ๊ฒฝ์ฐ ์ค๋ฅ๋ฅผ ๋ฐ์์ํค๋ ๋์ none ๊ฐ์ ๋ฐํํ๋๋ก doc.get()์ ์ง์ํจ)
event_docs์ ๋ชจ๋ ์ด๋ฒคํธ๋ฅผ ๋ฐ๋ณตํ๊ณ ๋๋ฉด all_events ๋ชฉ๋ก์ CSV ํ์ผ์ ์ธ ์ค๋น๊ฐ ๋๋ค!
export_file = "events_export.csv"
with open(export_file, 'w') as fp :
csvw = csv.writer(fp, delimiter = "|")
csvw.writerows(all_events)
fp.close()
์ด์ ์ด๋ ๊ฒ ์ด CSV ํ์ผ์ Boto3 ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ด์ฉํด ์ง๋ ํฌ์คํ ์ ๊ตฌ์ฑํ๋ S3 ๋ฒํท์ ์ ๋ก๋ํ๋ค
# aws_boto_credentials ๊ฐ์ ๋ก๋
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client("s3", aws_access_key_id = access_key, aws_secret_access_key = secret_key)
s3_file = export_file
s3.upload_file(export_file, bucket_name, s3_file)
์ด๋ ๊ฒ CSV ํ์ผ๋ ์ ์์ฑ๋๋๊ฑธ ํ์ธํ ์ ์๊ณ S3 ๋ฒํท์๋ ์ ๋ก๋ ๋จ!
REST API
REST API๋ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ๋ ํํ ๋ฐฉ๋ฒ์ด๋ค
=> ์กฐ์ง์์ ๋ง๋ค๊ณ ์ ์ง๊ด๋ฆฌํ๊ฑฐ๋ ์ธ๋ถ ์๋น์ค/๊ณต๊ธ์ ์ฒด์ API์ ๊ด๊ณ์์ด ๋ฐ์ดํฐ ์ถ์ถ์๋ ๊ณตํต ํจํด์ด ์๋ค!
1. API ์๋ํฌ์ธํธ๋ก HTTP GET ์์ฒญ์ ๋ณด๋ธ๋ค.
2. JSON ํ์์ผ ๊ฐ๋ฅ์ฑ์ด ๋์ ์๋ต์ ์๋ฝํ๋ค.
3. ์๋ต์ ๊ตฌ๋ฌธ ๋ถ์ํ๊ณ ๋์ค์ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ๋ก๋ํ ์ ์๋ CSV ํ์ผ๋ก ๋ณํ(ํํํ) ํ๋ค.
๊ฐ์ธ์ ์ผ๋ก๋ JSON ํ์์ผ๋ก ์จ ์๋ต์ ๊ตฌ๋ฌธ ๋ถ์(parsing)ํ์ฌ ํ๋ซ ํ์ผ(CSV)์ ์ ์ฅํ์ง๋ง, ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ๋ก๋ํ๊ธฐ ์ํด JSON ํ์์ผ๋ก ์ ์ฅํ ์๋ ์๋ค!
์ด ์์ ์์๋ Open Notify๋ผ๋ API์ ์ฐ๊ฒฐํ๋ค!
API์๋ ์ฌ๋ฌ ๊ฐ์ ์๋ํฌ์ธํธ๊ฐ ์์ผ๋ฉฐ, ๊ฐ ์๋ํฌ์ธํธ์๋ ์ฐ์ฃผ์์ ์ผ์ด๋๋ ์ผ์ ๋ํ NASA์ ๋ฐ์ดํฐ๊ฐ ๋ฐํ๋๋ค
=> ์ฌ๊ธฐ์๋ ๊ตญ์ ์ฐ์ฃผ ์ ๊ฑฐ์ฅ์ด ์ฃผ์ด์ง ์์น๋ฅผ ํต๊ณผํ ๋ค์ฏ ๊ฐ์ ์์ฐจ์ ์๋ต์ ๋ฐํํ๋ ์๋ํฌ์ธํธ๋ฅผ ์ฟผ๋ฆฌํ ๊ฒ์ด๋ค
๊ทผ๋ฐ ์ด์ ํด๋น api ๊ฐ removed ๋์๋ค๋... ์ฝ๋๋ง ์ดํด๋ณด๊ฒ ์ต๋๋ค!
(env) pip install requests
=> API๋ฅผ ์ฟผ๋ฆฌํ๊ณ ํ์ด์ฌ์์ ์๋ต์ ์ฒ๋ฆฌํ๋ ค๋ฉด requests ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ค์นํด์ผ ํ๋ค
(Requests ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ํตํด ํ์ด์ฌ์์ HTTP ์์ฒญ ๋ฐ ์๋ต์ ์ฝ๊ฒ ์ฌ์ฉํ ์ ์๋ค)
import requests
import json
import configparser
import csv
import boto3
lat = 42.36
lon = 71.05
lat_log_params = {"lat" : lat, "lon" : lon}
api_response = requests.get("http://api.open-notify.org/iss-pass.json", params=lat_log_params)
# ์๋ต ๋ด์ฉ์์ json ๊ฐ์ฒด ์์ฑ
response_json = json.loads(api_response.content)
all_passes = []
for response in response_json['response'] :
current_pass = []
# ์์ฒญ์์ ์๋/๊ฒฝ๋๋ฅผ ์ ์ฅ
current_pass.append(lat)
current_pass.append(lon)
# ํต๊ณผ ์ ์ง์ ์๊ฐ๊ณผ ์์น ์๊ฐ์ ์ ์ฅ
current_pass.append(response['duration'])
current_pass.append(response['risetime'])
all_passes.append(current_pass)
export_file = "export_file.csv"
with open(export_file, 'w') as fp :
csvw = csv.writer(fp, delimiter='|')
csvw.writerows(all_passes)
fp.close()
# aws_boto_credentials ๊ฐ์ ๋ก๋
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
access_key = parser.get('aws_boto_credentials', 'access_key')
secret_key = parser.get('aws_boto_credentials', 'secret_key')
bucket_name = parser.get('aws_boto_credentials', 'bucket_name')
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3.upload_file(export_file, bucket_name, export_file)
์ด์ ์์ฒญ์ ์ฌ์ฉํ์ฌ API ์๋ํฌ์ธํธ๋ฅผ ์ฟผ๋ฆฌํ๊ณ ์๋ต์ ๋ฐ์ ํ ๊ฒฐ๊ณผ JSON์ ํ์ธํ ์ ์๋ค!
์นดํ์นด ๋ฐ Debezium์ ํตํ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์์ง
MySQL ์ด์ง ๋ก๊ทธ ๋๋ Postgres WALs์ ๊ฐ์ CDC ์์คํ ์ ํตํด ๋ฐ์ดํฐ๋ฅผ ์์งํ ๋ ํ๋ ์ ์ํฌ์ ๋์์ด ํ์ํ๋ค!
Debezium์ ์ฌ๋ฌ ์คํ ์์ค ์๋น์ค๋ก ๊ตฌ์ฑ๋ ๋ถ์ฐ ์์คํ ์ผ๋ก ์ผ๋ฐ์ ์ธ CDC ์์คํ ์์ ํ ์์ค ๋ณ๊ฒฝ์ ์บก์ฒํ ํ ๋ค๋ฅธ ์์คํ ์์ ์ฌ์ฉํ ์ ์๋ ์ด๋ฒคํธ๋ก ์คํธ๋ฆฌ๋ฐํด์ฃผ๋ ์์คํ ์ด๋ค!
Debezium ์ค์น์๋ ์ธ ๊ฐ์ง ์ฃผ์ ๊ตฌ์ฑ ์์๊ฐ ์๋ค
- ์ํ์น ์ฃผํคํผ๋ ๋ถ์ฐ ํ๊ฒฝ์ ๊ด๋ฆฌํ๊ณ ๊ฐ ์๋น์ค์ ๊ตฌ์ฑ์ ์ฒ๋ฆฌ
- ์ํ์น ์นดํ์นด๋ ํ์ฅ์ฑ์ด ๋ฐ์ด๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ๋ ๋ฐ ์ผ๋ฐ์ ์ผ๋ก ์ฌ์ฉ๋๋ ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ
- ์ํ์น ์นดํ์นด ์ปค๋ฅํธ๋ ๋ฐ์ดํฐ๋ฅผ ์นดํ์นด๋ฅผ ํตํด ์ฝ๊ฒ ์คํธ๋ฆฌ๋ฐํ ์ ์๋๋ก ์นดํ์นด๋ฅผ ๋ค๋ฅธ ์์คํ
๊ณผ ์ฐ๊ฒฐํ๋ ๋๊ตฌ
(์ปค๋ฅํฐ๋ MySQL ๋ฐ Postgres์ ๊ฐ์ ์์คํ ์ฉ์ผ๋ก ๊ตฌ์ถ๋์์ผ๋ฉฐ CDC ์์คํ ์ ๋ฐ์ดํฐ๋ฅผ ์นดํ์นด ํ ํฝ์ผ๋ก ๋ณํ)
์นดํ์นด๋ ํ ํฝ๋ณ๋ก ์ ๋ฆฌ๋ ๋ฉ์์ง๋ฅผ ๊ตํํ๋ค
=> ํ๋์ ์์คํ ์ ํ ํฝ์ ๊ฒ์(publish)ํ ์ ์๋ ๋ฐ๋ฉด, ํ๋ ์ด์์ ์์คํ ์ ํ ํฝ์ ์๋น(consume) ํ๊ฑฐ๋ ๊ตฌ๋ (subscribe)ํ ์ ์๋ค!
Debezium์ ์ด๋ฌํ ์์คํ ์ ํจ๊ป ์ฐ๊ฒฐํ๊ณ ์ผ๋ฐ์ ์ธ CDC ๊ตฌํ์ ์ํ ์ปค๋ฅํฐ๋ฅผ ํฌํจํ๋ค
=> MySQL ์ด์ง ๋ก๊ทธ ๋ฐ Postgres WAL์ ์์ (listen)ํ ์ ์๋ ์ปค๋ฅํฐ๊ฐ ์ด๋ฏธ ์๊ณ ์ด๋ฐ ๋ฐ์ดํฐ๋ ์นดํ์นด๋ฅผ ํตํด ํ ํฝ์ ๋ ์ฝ๋๋ก ๋ผ์ฐํ ๋๊ณ ๋ค๋ฅธ ์ปค๋ฅํฐ๋ฅผ ์ฌ์ฉํ์ฌ S3 ๋ฒํท, Snowflake ๋๋ Redshift ์ ๊ฐ์ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค๋ฅผ ๋์์ผ๋ก ์๋น๋๋ค!
'๐ณ Data Engineering > Data Pipeline' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
๋ฐ์ดํฐ ์์ง : ๋ฐ์ดํฐ ๋ก๋ (1) | 2024.07.03 |
---|---|
๋ฐ์ดํฐ ์์ง : ๋ฐ์ดํฐ ์ถ์ถ (MySQL) (1) | 2024.06.29 |
์ผ๋ฐ์ ์ธ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ํจํด (1) | 2024.05.15 |
์ต์ ๋ฐ์ดํฐ ์ธํ๋ผ (0) | 2024.05.12 |
๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ์๊ฐ (0) | 2024.05.06 |