1. 프로젝트 개요
•
프로젝트명: ServiceCodeCenter (SCC) - 데이터 전처리 파생 프로젝트
•
목표:
데이터를 수집, 가공 및 저장하는 일련의 ETL(Extract, Transform, Load) 파이프라인을 구축하여 개발자가 코드 기반으로 효율적인 데이터 처리 작업을 수행할 수 있도록 지원.
2. 프로젝트 목표 및 특징
1.
데이터 수집 (Ingestion)
•
소스(API, 파일, 데이터베이스)에서 데이터를 가져올 수 있는 모듈
2.
데이터 가공 (Processing)
•
결측치 처리, 형식 변환, 필터링 등의 전처리 로직.
3.
데이터 저장 (Storage)
•
TimescaleDB(PostgreSQL 기반) 또는 기타 데이터베이스에 저장.
4.
실행 환경 관리
•
Docker 및 Docker Compose를 통해 동일한 컨테이너 환경에서 개발 및 운영 지원.
•
Python 가상 환경(Conda) 관리를 통해 프로젝트의 의존성을 통제.
3. 주요 사용자 및 역할
사용자 유형 | 역할 | 목표 및 사용 목적 |
개발자 | 데이터 파이프라인 설계 및 구현 | 데이터 수집, 처리 및 저장 기능 개발 |
운영 관리자 | 실행 및 모니터링 | 파이프라인의 정상 작동 모니터링 및 장애 대응 |
4. 기능 요구사항
4.1 데이터 수집 (Ingestion)
1.
API, CSV, Excel, 데이터베이스 등 다양한 소스에서 데이터를 수집 가능.
2.
데이터 수집 주기를 Prefect를 통해 스케줄링 및 자동화.
3.
데이터 유입 시 로깅 및 모니터링 기능 제공.
4.2 데이터 가공 (Processing)
1.
데이터 정제(결측치 처리, 이상치 탐지, 타입 변환 등) 기능
2.
Pandas 및 NumPy를 활용한 사용자 정의 데이터 가공 지원.
3.
데이터 품질 검증 및 로깅 기능 포함.
4.3 데이터 저장 (Storage)
1.
TimescaleDB를 통해 시계열 및 관계형 데이터 저장.
4.4 모니터링 및 알림 (Monitoring)
1.
Prefect UI를 통해 파이프라인의 상태 및 리소스 사용량 모니터링
5. 워크플로우 예제 코드
from prefect import flow, task
import pandas as pd
import psycopg2
@task
def fetch_data():
"""CSV 파일로부터 데이터를 가져오기"""
df = pd.read_csv("data/sample_data.csv")
return df
@task
def clean_data(df):
"""데이터 정제 및 가공"""
df = df.dropna() # 결측값 제거
df['processed_value'] = df['value'] * 2
return df
@task
def save_to_db(df):
"""처리된 데이터를 DB에 저장"""
conn = psycopg2.connect("dbname=test user=postgres password=secret")
cursor = conn.cursor()
for _, row in df.iterrows():
cursor.execute("INSERT INTO processed_data (id, value) VALUES (%s, %s)",
(row['id'], row['processed_value']))
conn.commit()
cursor.close()
conn.close()
@flow
def etl_pipeline():
raw_data = fetch_data()
processed_data = clean_data(raw_data)
save_to_db(processed_data)
if __name__ == "__main__":
etl_pipeline()
Python
복사
실행 및 배포
python3 etl_pipeline.py
prefect deploy --name etl-pipeline --pool scc-process-pool
Shell
복사