본문으로 건너뛰기

03-100. 데이터 통합 및 병합

데이터 통합 및 병합

데이터 통합 및 병합은 여러 데이터 소스(Data source, 데이터를 가져오는 곳)에서 가져온 데이터를 하나의 일관된 데이터셋으로 결합하는 과정이다. 데이터 분석에서 매우 중요한 단계로, 다양한 데이터를 효과적으로 활용하기 위해 필수적이다.

데이터 통합, 병합의 유형

1. 수직 통합 (Vertical Integration)

  • 여러 데이터셋을 위아래로 쌓아서 결합
  • 테이블 데이터에서 행을 덧붙이는 결합
  • 동일한 구조를 가진 데이터를 결합할 때 사용
  • 예: 여러 기간의 판매 데이터를 하나로 합치기

2. 수평 통합 (Horizontal Integration)

  • 여러 데이터셋을 좌우로 결합
  • 테이블 데이터에서 열을 덧붙이는 결합
  • 공통 키(Key)를 기준으로 데이터를 병합
  • 예: 고객 정보와 구매 이력을 결합

데이터 병합 방법

데이터 통합 및 병합의 중요성

데이터 통합 및 병합은 데이터 분석의 출발점이자 핵심 단계입니다. 다양한 출처에서 수집된 데이터가 하나의 일관된 데이터셋으로 결합되어야만, 신뢰할 수 있는 분석 결과를 도출할 수 있습니다. 데이터가 분산되어 있거나 중복, 누락, 불일치가 존재한다면 분석의 정확도가 크게 떨어질 수 있습니다.

실제 활용 예시

  • 여러 부서에서 관리하는 고객 데이터를 하나로 통합하여, 마케팅 전략 수립에 활용
  • 온라인과 오프라인 매장의 판매 데이터를 병합하여, 전체 매출 추이 분석
  • 외부 데이터(날씨, 경제 지표 등)와 내부 데이터를 결합해 인사이트 도출

통합/병합 시 주의할 점

  • 데이터의 스키마(구조) 일치 여부 확인: 컬럼명, 데이터 타입, 단위 등
  • 중복 데이터 처리: 통합 과정에서 중복된 행이 생길 수 있으므로, 중복 제거 필요
  • 결측치 처리: 병합 후 결측치가 발생하는 경우 적절한 처리 방법(삭제, 대체 등) 선택
  • 키 값의 일관성: 병합 기준이 되는 키 값이 정확히 일치하는지 확인

1. concat을 사용한 데이터 결합

import pandas as pd

# 수직 통합 예시
df1 = pd.DataFrame({
'날짜': ['2023-01-01', '2023-01-02'],
'판매량': [100, 150]
})

df2 = pd.DataFrame({
'날짜': ['2023-01-03', '2023-01-04'],
'판매량': [200, 250]
})

# 수직 통합
vertical_combined = pd.concat([df1, df2], axis=0)
print("수직 통합 결과:")
print(vertical_combined)

# 수평 통합 예시
df3 = pd.DataFrame({
'고객ID': [1, 2, 3],
'이름': ['김철수', '이영희', '박민수']
})

df4 = pd.DataFrame({
'고객ID': [1, 2, 3],
'구매금액': [50000, 75000, 100000]
})

# 수평 통합
horizontal_combined = pd.concat([df3, df4], axis=1)
print("\n수평 통합 결과:")
print(horizontal_combined)

2. merge를 사용한 데이터 병합

# merge를 사용한 데이터 병합
merged_df = pd.merge(df3, df4, on='고객ID')
print("\nmerge를 사용한 병합 결과:")
print(merged_df)

# 다양한 병합 방식
# left join
left_merged = pd.merge(df3, df4, on='고객ID', how='left')
print("\nleft join 결과:")
print(left_merged)

# right join
right_merged = pd.merge(df3, df4, on='고객ID', how='right')
print("\nright join 결과:")
print(right_merged)

# outer join
outer_merged = pd.merge(df3, df4, on='고객ID', how='outer')
print("\nouter join 결과:")
print(outer_merged)

# inner join
inner_merged = pd.merge(df3, df4, on='고객ID', how='inner')
print("\ninner join 결과:")
print(inner_merged)

데이터 통합 시 고려사항

1. 데이터 품질

  • 결측치 처리
  • 중복 데이터 처리
  • 데이터 일관성 확인

2. 성능 최적화

  • 대용량 데이터 처리 전략
  • 인덱스 활용
  • 병렬 처리 고려

3. 데이터 무결성

  • 키 값의 유일성 확인
  • 참조 무결성 검증
  • 데이터 타입 일치 확인

실무 적용 예시

1. 고객 데이터 통합

class CustomerDataIntegrator:
def __init__(self):
self.customer_data = None
self.purchase_data = None
self.integrated_data = None

def load_data(self, customer_file, purchase_file):
"""데이터 로드"""
self.customer_data = pd.read_csv(customer_file)
self.purchase_data = pd.read_csv(purchase_file)
print("데이터 로드 완료")

def preprocess_data(self):
"""데이터 전처리"""
# 결측치 처리
self.customer_data.fillna({'이메일': 'unknown'}, inplace=True)
self.purchase_data.fillna({'구매금액': 0}, inplace=True)

# 데이터 타입 변환
self.customer_data['가입일'] = pd.to_datetime(self.customer_data['가입일'])
self.purchase_data['구매일'] = pd.to_datetime(self.purchase_data['구매일'])

print("데이터 전처리 완료")

def integrate_data(self):
"""데이터 통합"""
# 고객 정보와 구매 정보 병합
self.integrated_data = pd.merge(
self.customer_data,
self.purchase_data,
on='고객ID',
how='left'
)

# 구매 정보 집계
purchase_summary = self.purchase_data.groupby('고객ID').agg({
'구매금액': ['sum', 'mean', 'count']
}).reset_index()

# 집계 정보 병합
self.integrated_data = pd.merge(
self.integrated_data,
purchase_summary,
on='고객ID',
how='left'
)

print("데이터 통합 완료")

def analyze_integrated_data(self):
"""통합 데이터 분석"""
if self.integrated_data is None:
return "통합된 데이터가 없습니다."

analysis = {
'총 고객 수': len(self.integrated_data),
'평균 구매금액': self.integrated_data['구매금액'].mean(),
'최대 구매금액': self.integrated_data['구매금액'].max(),
'구매 고객 비율': (self.integrated_data['구매금액'].notna().sum() / len(self.integrated_data)) * 100
}

return analysis

# 사용 예시
integrator = CustomerDataIntegrator()

# 샘플 데이터 생성
customer_data = pd.DataFrame({
'고객ID': [1, 2, 3, 4],
'이름': ['김철수', '이영희', '박민수', '최지영'],
'가입일': ['2023-01-01', '2023-01-15', '2023-02-01', '2023-02-15'],
'이메일': ['kim@email.com', 'lee@email.com', 'park@email.com', None]
})

purchase_data = pd.DataFrame({
'고객ID': [1, 1, 2, 3],
'구매일': ['2023-01-10', '2023-02-10', '2023-01-20', '2023-02-05'],
'구매금액': [50000, 75000, 100000, 30000]
})

# 데이터 저장
customer_data.to_csv('customer_data.csv', index=False)
purchase_data.to_csv('purchase_data.csv', index=False)

# 데이터 통합 실행
integrator.load_data('customer_data.csv', 'purchase_data.csv')
integrator.preprocess_data()
integrator.integrate_data()

# 분석 결과 출력
analysis_results = integrator.analyze_integrated_data()
print("\n통합 데이터 분석 결과:")
for key, value in analysis_results.items():
print(f"{key}: {value}")

2. 실시간 데이터 통합

import time
from datetime import datetime

class RealTimeDataIntegrator:
def __init__(self):
self.data_buffer = []
self.integrated_data = pd.DataFrame()

def add_data(self, new_data):
"""새로운 데이터 추가"""
self.data_buffer.append({
'timestamp': datetime.now(),
'data': new_data
})

def process_buffer(self):
"""버퍼 데이터 처리"""
if not self.data_buffer:
return

# 버퍼의 데이터를 하나의 데이터프레임으로 변환
new_df = pd.DataFrame([item['data'] for item in self.data_buffer])

# 기존 데이터와 통합
self.integrated_data = pd.concat([self.integrated_data, new_df], ignore_index=True)

# 버퍼 비우기
self.data_buffer = []

print(f"데이터 처리 완료: {len(new_df)}건 추가됨")

def get_statistics(self):
"""현재 통합 데이터 통계"""
if self.integrated_data.empty:
return "통합된 데이터가 없습니다."

return {
'총 레코드 수': len(self.integrated_data),
'최근 업데이트': self.integrated_data.index.max(),
'컬럼 수': len(self.integrated_data.columns)
}

# 사용 예시
integrator = RealTimeDataIntegrator()

# 실시간 데이터 추가 시뮬레이션
for i in range(3):
new_data = {
'ID': i + 1,
'값': i * 100,
'시간': datetime.now().strftime('%H:%M:%S')
}
integrator.add_data(new_data)
time.sleep(1) # 1초 대기

# 데이터 처리
integrator.process_buffer()

# 통계 확인
stats = integrator.get_statistics()
print("\n실시간 데이터 통합 통계:")
for key, value in stats.items():
print(f"{key}: {value}")

데이터 통합 및 병합의 도전과제

1. 데이터 품질 문제

  • 서로 다른 소스의 데이터 형식 불일치
  • 결측치와 이상치 처리
  • 데이터 중복 문제

2. 성능 이슈

  • 대용량 데이터 처리 시간
  • 메모리 사용량 최적화
  • 실시간 처리 요구사항

3. 데이터 일관성

  • 스키마 변경 관리
  • 데이터 버전 관리
  • 데이터 정합성 검증

해결 방안

1. 데이터 품질 개선

def improve_data_quality(df):
"""데이터 품질 개선 함수"""
# 결측치 처리
df.fillna(method='ffill', inplace=True)

# 중복 제거
df.drop_duplicates(inplace=True)

# 데이터 타입 최적화
for col in df.select_dtypes(include=['object']):
if df[col].nunique() < 50:
df[col] = df[col].astype('category')

return df

2. 성능 최적화

def optimize_performance(df):
"""성능 최적화 함수"""
# 인덱스 설정
if 'ID' in df.columns:
df.set_index('ID', inplace=True)

# 메모리 사용량 최적화
for col in df.columns:
if df[col].dtype == 'float64':
df[col] = df[col].astype('float32')
elif df[col].dtype == 'int64':
df[col] = df[col].astype('int32')

return df

요약

핵심 포인트

  1. 데이터 통합은 여러 소스의 데이터를 하나로 결합하는 과정
  2. 수직 통합과 수평 통합의 차이점 이해
  3. 다양한 병합 방법(concat, merge)의 활용
  4. 데이터 품질과 성능 최적화의 중요성

실무 적용 시 고려사항

  1. 데이터 품질 관리
  2. 성능 최적화
  3. 데이터 무결성 유지
  4. 실시간 처리 요구사항

데이터 통합 및 병합은 데이터 분석의 기초가 되는 중요한 과정이다. 적절한 도구와 방법을 사용하여 효율적으로 데이터를 통합하고, 품질을 유지하면서 성능을 최적화하는 것이 중요하다.