Amazon Redshift 웨어하우스를 대상으로 구성
저번 포스팅들을 통해 원하는 소스 시스템에서 데이터를 추출했는데 이제 Redshift 데이터 웨어하우스에 데이터를 로드하여 데이터 수집을 완료할 차례이다!
=> 로드 방법은 데이터 추출 산출물이 어떤 모습인지에 따라 다름
데이터 웨어하우스로 Amazon Redshift를 사용하는 경우 데이터를 추출한 후 로드하기 위해 S3와 통합하는 것은 매우 간단한다
우선 S3에서 읽기와 관련된 권한을 Redshift 클러스터에 직접 할당할 IAM 역할을 생성해야 하는데
IAM의 탐색 메뉴에서 역할을 선택하고 [역할 만들기]를 클릭한 후
선택할 AWS 서비스 목록에서 Redshift를 찾아 선택하고 [사용 사례 선택] 에서 'Redshift - Customizable' 을 선택한다
권한 정책 연결에서 AmazonS3ReadOnlyAccess 를 클릭하고 역할의 이름을 RedshiftLoadRole로 지정해주면 됨
이제 이렇게 생성한 IAM 역할을 Redshift 클러스터와 연결할 수 있음
비용 이슈 때문에 Amazon Redshift Serverless를 이용했는데 설정은 기본 설정 그대로 하였고
IAM 역할 연결에서 아까 만든 RedshiftLoadRole을 선택했다!
The list of Amazon Resource Names (ARNs) for customer roles must contain the ARN of the default IAM role when a default is specified.
=> IAM 역할 연결을 할 때 위와 같은 에러가 발생하면 RedshiftLoadRole을 기본값으로 설정해주면 됨!
이제 [작업 그룹 생성] 을 해줘야 한다
먼저 작업 그룹 생성 후, 퍼블릭 액세스를 허용하고 보안 그룹에서 Redshift에 대한 인바운드 규칙을 0.0.0.0/0 으로 해준다
그리고 새로운 네임스페이스를 생성해주며 작업 그룹을 하나 만들어주면 된다!
이제 pipeline.conf 파일에 하나의 섹션을 추가해줘야 하는데
Redshift 웨어하우스에 데이터 로드
S3에서 Redshift로 데이터를 로드하는 가장 효율적인 방법은 COPY 명령을 사용하는 것이다
=> COPY는 로드 중인 데이터를 대상 테이블의 기존 행에 추가함
(COPY는 Redshift 클러스터를 쿼리하는 데 사용하는 SQL 클라이언트나 Boto3 라이브러리를 사용하는 파이썬 스크립트에서 SQL 문으로 실행할 수 있다!)
기본적으로 COPY 명령은 입력 파일의 필드와 동일한 순서로 대상 테이블의 열에 데이터를 삽입하기 때문에
달리 지정하지 않는 한 이 예제에서 로드하는 CSV의 필드 순서는 Redshift의 대상 테이블에 있는 열의 순서와 일치해야 함!
(env) pip install psycopg2
=> 앞에서 구성한 Redshift 클러스터와 상호작용하려면 psycopg2 라이브러리를 설치해야 한다
이제 copy_to_redshift.py라는 새 파일을 만들고 세 개의 코드 블록을 추가한다
1. S3 버킷과 상호 작용하기 위해 boto3, Redshift 클러스터에서 COPY 명령을 실행하기 위해 psycopg2, 그리고 pipeline.conf 파일을 읽기 위해 configparser 라이브러리를 가져오기
2. psycopg2. connect 함수와 pipeline.conf 파일에 저장된 자격증명을 사용하여 Redshift 클러스터에 연결하기
3. psycopg2 Cursor 객체를 사용하여 COPY 명령을 실행
import boto3
import configparser
import psycopg2
parser = configparser.ConfigParser()
parser.read("../pipeline.conf")
dbname = parser.get("aws_creds", "database")
user = parser.get("aws_creds", "username")
password = parser.get("aws_creds", "password")
host = parser.get("aws_creds", "host")
port = parser.get("aws_creds", "port")
# redshift 클러스터 연결
rs_conn = psycopg2.connect( "dbname=" + dbname + " user=" + user + " password=" + password + " host=" + host + " port=" + port)
# account_id 및 iam_role을 로드
account_id = parser.get("aws_boto_credentials", "account_id")
iam_role = parser.get("aws_creds", "iam_role")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
# Redshift에 파일을 로드하기 위해 COPY 명령을 실행
file_path = ("s3://" + bucket_name + "/order_extract.csv")
role_string = ("arn:aws:iam::" + account_id + ":role/" + iam_role)
sql = "COPY public.Oreders"
sql = sql + " from %s "
sql = sql + " iam_role %s;"
# cursor 객체를 생성하고 COPY를 실행
cur = rs_conn.cursor()
cur.execute(sql, (file_path, role_string))
# cursor를 종료하고 트랜잭션을 커밋
cur.close()
rs_conn.commit()
# 연결을 종료
rs_conn.close()
위 스크립트를 실행하기 전에 대상 테이블이 아직 존재하지 않는 경우 먼저 생성해야 한다!!
=> Redshift 에서 쿼리 편집기를 통해 위의 SQL을 실행하면 됨
잘 로드된 것을 확인할 수 있다!
증분 및 전체 로드
데이터 전부가 추출된 경우 로딩 스크립트에 추가해야 할 사항이 하나 있다
=> COPY 작업을 실행 하기 전에 Redshift에서 대상 테이블을 잘라야 한다(TRUNCATE 사용)
# 대상 테이블을 truncate
sql = "TRUNCATE public.Orders;"
cur = rs_conn.cursor()
cur.execute(sql)
cur.close()
rs_conn.commit()
=> 이 부분을 COPY 명령 전에 추가해준다!
데이터가 증분 추출됐다면 대상 테이블을 자르면 안된다!
=> 테이블을 자르면 추출 작업의 마지막 실행에서 업데이트된 레코드만 남게 된다
(증분 추출의 방식에 따라 기존 데이터들이 사라질 수 있음)
이 경우 TRUNCATE 말고 COPY 명령을 사용하여 데이터를 로드하고 레코드가 마지막으로 업데이트된 시간을 나타내는 타임스탬프에 의존하여 나중에 어떤 레코드가 최신인지 식별하거나 과거 레코드를 다시 볼 수 있다!
기록 보관의 관점에서 볼 때 대상 테이블에 이 두 기록을 모두 가지고 있는 것이 이상적임
=> 예를 들면 Orders 테이블에서 주문이 이월 주문된 상태에 얼마나 오래 있었는지 알고 싶다면 기존 주문 정보와 새로 로드된 주문 정보 모두를 사용해야 함
CDC 로그에서 추출한 데이터 로드
데이터가 CDC 방법을 통해 추출된 경우 증분 추출된 데이터를 로드하는 프로세스와 유사하지만 삽입 및 업데이트된 레코드뿐만 아니라 삭제된 레코드에도 엑세스할 수 있다!
insert|1|Backordered|2020-06-01 12:00:00
update|1|Shipped|2020-06-09 12:00:25
delete|1|Shipped|2020-06-10 09:05:12
=> 주문 레코드가 업데이트된 다음 날 삭제되었음을 보여줌
위와 같은 상황에서 전체 추출에서는 레코드가 완전히 사라졌을 것이고, 증분 추출에서는 delete를 가져오지 않을 것이다
그러나, CDC의 경우 삭제 이벤트가 선택되어 CSV 파일에 포함된다!
삭제된 레코드를 수용하려면 이벤트 유형을 저장할 Redshift 웨어하우스의 대상 테이블에 열을 추가해야 함!
(Redshift는 열 기반 데이터 웨어하우스)
=> EventType 열을 추가하여 특정 OrderId에 대한 Insert, update, delete 등을 포함
데이터 파이프라인에서 데이터 수집의 목표는 소스에서 데이터를 효율적으로 추출하여 대상으로 로드하는 것
=> 특정 사용 사례에 대한 데이터를 모델링하는 로직은 파이프라인의 변화 단계에서 구현
파일 스토리지를 데이터 레이크로 사용
S3 버킷(또는 다른 클라우드 스토리지)에서 데이터를 추출하고 데이터 웨어하우스에 로드하지 않을 때도 있다!
=> 이렇게 정형화 또는 반정형화된 형태로 저장된 데이터를 데이터 레이크라고 한다
데이터 웨어하우스와 달리 데이터 레이크는 다양한 형식의 데이터를 원본 형식 또는 때에 따라 비정형 형식으로 저장한다
=> 데이터 레이크는 데이터를 저장하기에는 더 저렴하지만 웨어하우스의 정형화된 데이터와 같은 방식으로 쿼리하는 데 최적화 되어 있지 X
하지만, 최근 몇 년 동안 SQL에 익숙한 사용자가 데이터 레이크의 데이터 쿼리에 훨씬 더 쉽게 접근할 수 있도록 도와주는 도구 들이 등장함
- Amazon Athena는 사용자가 SQL을 사용하여 S3에 저장된 데이터를 쿼리할 수 있는 AWS 서비스
- Amazon Redshift Spectrum은 Redshift가 S3의 데이터에 외부 테이블로 액세스하고 Redshift 웨어하우스의 테이블과 함께 쿼리에서 이를 참조할 수 있도록 하는 서비스
정리를 해보자면 클라우드 스토리지 기반 데이터 레이크에 대용량 데이터를 저장하는 것이 웨어하우스에 저장하는 것보다 비용이 저렴하고
비정형 또는 반정형 데이터는 미리 정의된 스키마가 없기 때문에 저장된 데이터의 유형이나 속성을 변경하는 것이 웨어하우스 스키마를 수정하는 것보다 훨씬 쉽다
(JSON 문서는 데이터 레이크에서 접할 수 있는 반정형 데이터 유형의 예)
1. 데이터 구조가 자주 변경되는 경우 적어도 일정 시간 동안 데이터 레이크에 저장하는 것을 고려할 수 있다!
2. 데이터 과학자 또는 머신러닝 엔지니어는 프로젝트의 탐색 단계에서 데이터가 필요한 shape(모양)이 무엇인지 모를 수 있기에 원본 형태로 데이터 레이크에 대한 엑세스 권한을 부여하여 데이터의 속성을 결정할 수 있음
(그 후 웨어하우스의 테이블에 로드하는 것이 합리적인지 여부를 결정 -> 쿼리 최적화)
'Data Engineering > Data Pipeline' 카테고리의 다른 글
데이터 수집 : 데이터 추출 (MongoDB, REST API, 카프카 및 Debezium) (1) | 2024.07.02 |
---|---|
데이터 수집 : 데이터 추출 (MySQL) (1) | 2024.06.29 |
일반적인 데이터 파이프라인 패턴 (1) | 2024.05.15 |
최신 데이터 인프라 (0) | 2024.05.12 |
데이터 파이프라인 소개 (0) | 2024.05.06 |