온라인 데이터 연동
현대의 데이터 분석에서는 클라우드 기반 데이터 소스와의 연동이 필수적이다. 클라우드 스토리지, 온라인 서비스, SaaS 플랫폼 등에서 데이터를 효율적으로 가져오는 방법을 알아보자.
클라우드 스토리지 사용하기
AWS S3 데이터 접근과 boto3
Amazon S3는 가장 널리 사용되는 클라우드 스토리지 서비스 중 하나다.
import boto3
import pandas as pd
from io import StringIO
# AWS 자격 증명 설정 (환경 변수 또는 AWS CLI 설정 필요)
s3_client = boto3.client('s3')
# S3 버킷 목록 확인
response = s3_client.list_buckets()
print("S3 버킷 목록:")
for bucket in response['Buckets']:
print(f"- {bucket['Name']}")
# S3에서 CSV 파일 읽기
def read_csv_from_s3(bucket_name, file_key):
"""S3에서 CSV 파일을 직접 pandas DataFrame으로 읽기"""
try:
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
csv_content = response['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(csv_content))
return df
except Exception as e:
print(f"S3 파일 읽기 오류: {e}")
return None
# 사용 예시
df = read_csv_from_s3('my-data-bucket', 'data/sales_2023.csv')
# S3에 DataFrame 저장
def save_df_to_s3(df, bucket_name, file_key):
"""DataFrame을 S3에 CSV로 저장"""
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
s3_client.put_object(
Bucket=bucket_name,
Key=file_key,
Body=csv_buffer.getvalue()
)
print(f"파일 저장 완료: s3://{bucket_name}/{file_key}")
# S3 폴더의 모든 파일 처리
def process_s3_folder(bucket_name, folder_prefix):
"""S3 폴더 내 모든 CSV 파일을 처리"""
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=folder_prefix
)
dataframes = []
if 'Contents' in response:
for obj in response['Contents']:
if obj['Key'].endswith('.csv'):
print(f"처리 중: {obj['Key']}")
df = read_csv_from_s3(bucket_name, obj['Key'])
if df is not None:
df['source_file'] = obj['Key']
dataframes.append(df)
if dataframes:
combined_df = pd.concat(dataframes, ignore_index=True)
return combined_df
else:
return pd.DataFrame()
# 사용 예시
combined_data = process_s3_folder('my-data-bucket', 'monthly_reports/')
Google Cloud Storage 연동
from google.cloud import storage
import pandas as pd
from io import StringIO
# Google Cloud Storage 클라이언트 생성
# 서비스 계정 키 파일이 필요함
client = storage.Client.from_service_account_json('path/to/service-account-key.json')
def read_csv_from_gcs(bucket_name, blob_name):
"""GCS에서 CSV 파일 읽기"""
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
# 파일 내용을 문자열로 다운로드
content = blob.download_as_text()
df = pd.read_csv(StringIO(content))
return df
def upload_df_to_gcs(df, bucket_name, blob_name):
"""DataFrame을 GCS에 업로드"""
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
# DataFrame을 CSV 문자열로 변환
csv_string = df.to_csv(index=False)
# GCS에 업로드
blob.upload_from_string(csv_string, content_type='text/csv')
print(f"업로드 완료: gs://{bucket_name}/{blob_name}")
# GCS 버킷의 모든 파일 나열
def list_gcs_files(bucket_name, prefix=''):
"""GCS 버킷의 파일 목록 조회"""
bucket = client.bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)
files = []
for blob in blobs:
files.append({
'name': blob.name,
'size': blob.size,
'created': blob.time_created
})
return pd.DataFrame(files)
# 사용 예시
files_df = list_gcs_files('my-gcs-bucket', 'data/')
print(files_df)
클라우드 비용 최적화 팁
import boto3
from datetime import datetime, timedelta
def optimize_s3_costs():
"""S3 비용 최적화를 위한 분석"""
s3 = boto3.client('s3')
# 버킷별 객체 크기 분석
def analyze_bucket_usage(bucket_name):
response = s3.list_objects_v2(Bucket=bucket_name)
total_size = 0
file_count = 0
old_files = []
if 'Contents' in response:
for obj in response['Contents']:
total_size += obj['Size']
file_count += 1
# 90일 이상 된 파일 찾기
if obj['LastModified'] < datetime.now(obj['LastModified'].tzinfo) - timedelta(days=90):
old_files.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified']
})
return {
'total_size_gb': total_size / (1024**3),
'file_count': file_count,
'old_files': old_files
}
# 사용 예시
analysis = analyze_bucket_usage('my-data-bucket')
print(f"총 크기: {analysis['total_size_gb']:.2f} GB")
print(f"파일 수: {analysis['file_count']}")
print(f"90일 이상 된 파일: {len(analysis['old_files'])}개")
# 자동 아카이빙 설정
def setup_s3_lifecycle_policy(bucket_name):
"""S3 라이프사이클 정책 설정"""
s3 = boto3.client('s3')
lifecycle_config = {
'Rules': [
{
'ID': 'ArchiveOldFiles',
'Status': 'Enabled',
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA' # Infrequent Access
},
{
'Days': 90,
'StorageClass': 'GLACIER' # Glacier
}
]
}
]
}
s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
print(f"라이프사이클 정책 설정 완료: {bucket_name}")
Google Drive와 Colab 연동
Google Drive API 활용법
from google.colab import drive
from google.colab import auth
from googleapiclient.discovery import build
import pandas as pd
# Google Drive 마운트 (Colab에서)
drive.mount('/content/drive')
# 파일 직접 읽기
df = pd.read_csv('/content/drive/MyDrive/data/sales_data.csv')
# Google Drive API 사용
auth.authenticate_user()
def list_drive_files():
"""Google Drive 파일 목록 조회"""
service = build('drive', 'v3')
# 파일 목록 조회
results = service.files().list(
pageSize=10,
fields="nextPageToken, files(id, name, mimeType, size)"
).execute()
items = results.get('files', [])
if not items:
print('파일을 찾을 수 없습니다.')
else:
files_data = []
for item in items:
files_data.append({
'name': item['name'],
'id': item['id'],
'type': item['mimeType'],
'size': item.get('size', 'N/A')
})
return pd.DataFrame(files_data)
# Google Sheets 연동
def read_google_sheet(sheet_id, range_name):
"""Google Sheets에서 데이터 읽기"""
service = build('sheets', 'v4')
result = service.spreadsheets().values().get(
spreadsheetId=sheet_id,
range=range_name
).execute()
values = result.get('values', [])
if values:
# 첫 번째 행을 헤더로 사용
df = pd.DataFrame(values[1:], columns=values[0])
return df
else:
return pd.DataFrame()
# 사용 예시
# sheet_id = '1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms'
# df = read_google_sheet(sheet_id, 'A1:E10')
Colab에서 파일 업로드/다운로드
from google.colab import files
import pandas as pd
import io
# 파일 업로드
def upload_and_process():
"""파일 업로드 후 처리"""
uploaded = files.upload()
for filename in uploaded.keys():
print(f'업로드된 파일: {filename}')
# 파일 확장자에 따라 처리
if filename.endswith('.csv'):
df = pd.read_csv(io.BytesIO(uploaded[filename]))
elif filename.endswith('.xlsx'):
df = pd.read_excel(io.BytesIO(uploaded[filename]))
else:
print(f"지원하지 않는 파일 형식: {filename}")
continue
print(f"데이터 형태: {df.shape}")
print(df.head())
return df
# 처리된 데이터 다운로드
def download_processed_data(df, filename):
"""처리된 데이터를 다운로드"""
if filename.endswith('.csv'):
df.to_csv(filename, index=False)
elif filename.endswith('.xlsx'):
df.to_excel(filename, index=False)
files.download(filename)
print(f"다운로드 완료: {filename}")
# 사용 예시
# df = upload_and_process()
# processed_df = df.groupby('category').sum()
# download_processed_data(processed_df, 'processed_data.csv')
대용량 파일 처리 전략
import os
from google.colab import drive
def process_large_file_in_chunks(file_path, chunk_size=10000):
"""대용량 파일을 청크 단위로 처리"""
# 파일 크기 확인
file_size = os.path.getsize(file_path)
print(f"파일 크기: {file_size / (1024**2):.2f} MB")
# 청크 단위로 처리
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 각 청크에 대한 처리 (예: 필터링, 집계 등)
processed_chunk = chunk[chunk['value'] > 0] # 예시 필터링
# 메모리 사용량 모니터링
if len(chunks) > 0 and len(chunks) % 10 == 0:
print(f"처리된 청크 수: {len(chunks)}")
chunks.append(processed_chunk)
# 결과 병합
result = pd.concat(chunks, ignore_index=True)
print(f"최종 데이터 형태: {result.shape}")
return result
# 메모리 효율적인 집계
def memory_efficient_aggregation(file_path, group_col, agg_cols):
"""메모리 효율적인 집계 처리"""
aggregated_data = {}
for chunk in pd.read_csv(file_path, chunksize=5000):
chunk_agg = chunk.groupby(group_col)[agg_cols].sum()
for idx, row in chunk_agg.iterrows():
if idx in aggregated_data:
for col in agg_cols:
aggregated_data[idx][col] += row[col]
else:
aggregated_data[idx] = {col: row[col] for col in agg_cols}
# 결과를 DataFrame으로 변환
result_data = []
for key, values in aggregated_data.items():
row = {group_col: key}
row.update(values)
result_data.append(row)
return pd.DataFrame(result_data)
# 사용 예시
# result = memory_efficient_aggregation(
# '/content/drive/MyDrive/large_sales_data.csv',
# 'category',
# ['sales_amount', 'quantity']
# )
실시간 데이터 스트리밍
웹소켓을 통한 실시간 데이터 수집
import websocket
import json
import pandas as pd
from datetime import datetime
import threading
class RealTimeDataCollector:
def __init__(self):
self.data_buffer = []
self.max_buffer_size = 1000
def on_message(self, ws, message):
"""웹소켓 메시지 처리"""
try:
data = json.loads(message)
data['timestamp'] = datetime.now()
self.data_buffer.append(data)
# 버퍼가 가득 차면 처리
if len(self.data_buffer) >= self.max_buffer_size:
self.process_buffer()
except json.JSONDecodeError:
print(f"JSON 파싱 오류: {message}")
def on_error(self, ws, error):
print(f"웹소켓 오류: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("웹소켓 연결 종료")
def process_buffer(self):
"""버퍼의 데이터를 처리"""
if self.data_buffer:
df = pd.DataFrame(self.data_buffer)
# 데이터 처리 로직 (예: 집계, 필터링 등)
processed_df = self.analyze_data(df)
# 결과 저장 또는 전송
self.save_processed_data(processed_df)
# 버퍼 초기화
self.data_buffer = []
def analyze_data(self, df):
"""실시간 데이터 분석"""
# 예시: 시간대별 집계
df['hour'] = df['timestamp'].dt.hour
hourly_stats = df.groupby('hour').agg({
'value': ['mean', 'sum', 'count']
}).round(2)
return hourly_stats
def save_processed_data(self, df):
"""처리된 데이터 저장"""
filename = f"realtime_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(filename)
print(f"데이터 저장 완료: {filename}")
def start_collection(self, websocket_url):
"""실시간 데이터 수집 시작"""
ws = websocket.WebSocketApp(
websocket_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
ws.run_forever()
# 사용 예시
# collector = RealTimeDataCollector()
# collector.start_collection("wss://api.example.com/realtime")
정리
온라인 데이터 연동은 현대 데이터 분석의 핵심 요소다. 클라우드 스토리지 활용부터 실시간 데이터 수집까지, 다양한 온라인 데이터 소스와 효율적으로 연동하는 방법을 익혀두면 분석의 범위와 깊이를 크게 확장할 수 있다. 특히 비용 최적화와 대용량 데이터 처리 전략은 실무에서 매우 중요한 고려사항이다.
다음 섹션에서는 데이터 제공 사이트 활용에 대해 알아보겠다.