XGBoost + Optuna로 주식 폭락/급등 전조를 탐지하는 방법 — AI 미국주식 분석 플랫폼 Passive 개발일지 #2


이 글은 AI 기반 미국주식 분석 플랫폼 Passive를 혼자 구축하면서 겪은 기술적 선택과 실패를 기록한 개발일지 시리즈입니다. 두 번째 주제는 XGBoost + Optuna를 활용한 폭락/급등 전조 탐지기 구현 과정입니다.

1편: GaussianHMM으로 주식 시장 국면을 분류하는 방법


XGBoost 기반 주식 폭락 탐지란 무엇인가

XGBoost를 활용한 주식 폭락/급등 전조 탐지는 머신러닝 퀀트 투자 전략에서 단기 리스크 관리의 핵심 기법이다. 시장 국면을 파악하는 것(1편)과 그 국면이 방향 전환 직전인지를 구분하는 것은 완전히 다른 문제다.

감정적 시장이라고 해도 당장 폭락이 오는 건 아니다. 반대로 이성적으로 보이는 시장에서도 전조 신호가 이미 쌓이고 있는 경우가 있다. ETF 포트폴리오 투자자에게 이 두 가지를 실시간으로 구분하는 신호가 필요했다. 그게 Passive 신호 탭의 출발점이었다.


왜 HMM 국면 결과만으로는 부족했는가

GaussianHMM은 “지금 어떤 국면인가”를 알려줄 뿐, “이 국면이 얼마나 위험한가”는 말해주지 않는다. 폭락과 급등을 직접 타깃으로 예측하는 분류 모델이 필요했다.

XGBoost를 선택한 이유는 세 가지다.

첫째, 트리 기반이라 피처 간 비선형 관계를 잘 포착한다. 금융 데이터는 선형 관계보다 복잡한 패턴이 많다. 둘째, 노이즈가 많은 환경에서도 비교적 안정적이다. 셋째, SHAP을 활용해 예측 근거를 설명할 수 있어 서비스에 붙이기 적합하다.

import xgboost as xgb
from sklearn.model_selection import train_test_split

# 기본 XGBoost 분류기 설정
model = xgb.XGBClassifier(
    objective='multi:softprob',  # 3-class 확률 출력
    num_class=3,                 # 0: 중립, 1: 폭락 전조, 2: 급등 전조
    eval_metric='mlogloss',
    use_label_encoder=False,
    random_state=42
)

3-class 분류 설계: 폭락, 급등, 중립

타깃을 이진 분류가 아닌 3-class로 설계했다. 폭락 전조(0), 중립(1), 급등 전조(2) 세 가지다.

이진 분류로 “폭락 여부”만 예측하면 급등 전조를 놓친다. ETF 포트폴리오 투자자에게는 위험 신호와 기회 신호를 동시에 제공하는 게 더 실용적이다.

import pandas as pd
import numpy as np

def create_target_labels(returns: pd.Series,
                         crash_threshold: float = -0.05,
                         surge_threshold: float = 0.05,
                         forward_days: int = 5) -> pd.Series:
    """
    타깃 레이블 생성
    - 이후 5거래일 내 최대 낙폭 > crash_threshold → 폭락 전조 (0)
    - 이후 5거래일 내 최대 상승폭 > surge_threshold → 급등 전조 (2)
    - 그 외 → 중립 (1)
    """
    labels = pd.Series(1, index=returns.index)  # 기본값: 중립

    for i in range(len(returns) - forward_days):
        future_returns = returns.iloc[i+1:i+forward_days+1]
        max_drawdown = future_returns.min()
        max_surge = future_returns.max()

        if max_drawdown < crash_threshold:
            labels.iloc[i] = 0  # 폭락 전조
        elif max_surge > surge_threshold:
            labels.iloc[i] = 2  # 급등 전조
        # 둘 다 해당하면 낙폭 우선 (보수적 접근)

    return labels

# 클래스 불균형 확인
print(labels.value_counts(normalize=True))
# 중립: ~85%, 폭락 전조: ~8%, 급등 전조: ~7%

클래스 불균형 문제가 있었다. 폭락과 급등 전조는 전체 데이터의 극히 일부다. scale_pos_weight 파라미터로 클래스 가중치를 조정했다.

# 클래스 가중치 계산
class_counts = labels.value_counts()
total = len(labels)

# XGBoost sample_weight 방식으로 처리
sample_weights = labels.map({
    0: total / (3 * class_counts[0]),  # 폭락 전조 가중치 상향
    1: total / (3 * class_counts[1]),  # 중립 가중치 하향
    2: total / (3 * class_counts[2])   # 급등 전조 가중치 상향
})

피처 설계: 기술 지표 + 심리 지표 + 펀더멘털

모델 입력 피처는 기술적 지표, 심리 지표, 펀더멘털 계열 세 그룹으로 나뉜다.

기술적 지표 그룹

def calc_technical_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    기술적 지표 계산
    - 가격 자체가 아닌 상대적 위치와 변화 속도
    - ETF별 가격 스케일 차이 무관하게 적용 가능
    """
    features = pd.DataFrame(index=df.index)

    # 수익률 계열
    for period in [1, 2, 3, 5, 10, 20]:
        features[f'return_{period}d'] = df['close'].pct_change(period)

    # 이동평균 이격도
    for ma in [5, 20, 60, 120, 200]:
        features[f'ma_ratio_{ma}'] = df['close'] / df['close'].rolling(ma).mean() - 1

    # RSI
    delta = df['close'].diff()
    gain = delta.clip(lower=0).rolling(14).mean()
    loss = (-delta.clip(upper=0)).rolling(14).mean()
    features['rsi_14'] = 100 - (100 / (1 + gain / loss))

    # 볼린저 밴드 포지션
    sma20 = df['close'].rolling(20).mean()
    std20 = df['close'].rolling(20).std()
    features['bb_position'] = (df['close'] - sma20) / (2 * std20)

    # 실현 변동성
    features['vol_5d'] = df['close'].pct_change().rolling(5).std() * np.sqrt(252)
    features['vol_20d'] = df['close'].pct_change().rolling(20).std() * np.sqrt(252)
    features['vol_ratio'] = features['vol_5d'] / features['vol_20d']

    # 최대 낙폭 (60일)
    features['max_drawdown_60d'] = (
        df['close'] / df['close'].rolling(60).max() - 1
    )

    # MACD 히스토그램
    ema12 = df['close'].ewm(span=12).mean()
    ema26 = df['close'].ewm(span=26).mean()
    macd = ema12 - ema26
    signal = macd.ewm(span=9).mean()
    features['macd_hist'] = macd - signal

    return features

심리 지표 + 펀더멘털 그룹

def calc_macro_features(vix: pd.Series,
                        vix3m: pd.Series,
                        hy_spread: pd.Series,
                        noise_score: pd.Series) -> pd.DataFrame:
    """
    심리 + 펀더멘털 피처
    - GaussianHMM noise_score를 컨텍스트 피처로 연결
    - 두 모델이 느슨하게 연결되는 구조
    """
    features = pd.DataFrame()

    features['vix'] = vix
    features['vix_term'] = vix / vix3m        # 기간 구조 역전
    features['hy_spread'] = hy_spread          # 신용 환경
    features['noise_score'] = noise_score      # HMM 국면 점수 (컨텍스트)

    # VIX 변화율
    features['vix_chg_5d'] = vix.pct_change(5)
    features['vix_chg_20d'] = vix.pct_change(20)

    # HY 스프레드 변화
    features['hy_chg_20d'] = hy_spread.diff(20)

    return features

Optuna 하이퍼파라미터 최적화

XGBoost의 성능은 파라미터에 크게 좌우된다. Optuna로 50회 베이지안 최적화를 수행했다.

import optuna
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
import xgboost as xgb

def objective(trial: optuna.Trial) -> float:
    """
    Optuna 목표 함수: F1-macro 최대화
    - 단순 정확도 대신 F1-macro 사용
    - 이유: 클래스 불균형에서 모두 중립 예측해도 정확도가 높게 나오는 함정 방지
    """
    params = {
        'max_depth': trial.suggest_int('max_depth', 3, 8),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
        'n_estimators': trial.suggest_int('n_estimators', 100, 500),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
        'objective': 'multi:softprob',
        'num_class': 3,
        'eval_metric': 'mlogloss',
        'use_label_encoder': False,
        'random_state': 42
    }

    # StratifiedKFold: 클래스 비율 유지
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    f1_scores = []

    for train_idx, val_idx in cv.split(X, y):
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]
        w_train = sample_weights[train_idx]

        model = xgb.XGBClassifier(**params)
        model.fit(
            X_train, y_train,
            sample_weight=w_train,
            eval_set=[(X_val, y_val)],
            early_stopping_rounds=30,
            verbose=False
        )

        y_pred = model.predict(X_val)
        f1_scores.append(f1_score(y_val, y_pred, average='macro'))

    return np.mean(f1_scores)


# 50회 탐색
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, show_progress_bar=True)

best_params = study.best_params
print(f"최적 F1-macro: {study.best_value:.4f}")
print(f"최적 파라미터: {best_params}")

Platt Scaling: 출력 확률 보정

XGBoost가 출력하는 예측 확률이 실제 발생 확률과 일치하지 않는 경우가 있다. 모델이 “70%“라고 해도 실제로는 50%에 불과할 수 있다.

사용자에게 “폭락 위험도 72%“라는 수치를 보여줄 때 그 수치가 실제 발생 확률과 유사해야 신뢰할 수 있다. Platt Scaling으로 확률을 보정했다.

from sklearn.calibration import CalibratedClassifierCV
from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt

# Platt Scaling: 로지스틱 회귀로 확률 보정
calibrated_model = CalibratedClassifierCV(
    base_model,
    method='sigmoid',  # Platt Scaling
    cv='prefit'        # 이미 학습된 모델에 적용
)

# 별도 검증 세트로 보정 학습
calibrated_model.fit(X_calib, y_calib)

# 보정 전후 비교 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, class_name in enumerate(['폭락 전조', '중립', '급등 전조']):
    prob_true_before, prob_pred_before = calibration_curve(
        (y_calib == i).astype(int),
        base_model.predict_proba(X_calib)[:, i],
        n_bins=10
    )
    prob_true_after, prob_pred_after = calibration_curve(
        (y_calib == i).astype(int),
        calibrated_model.predict_proba(X_calib)[:, i],
        n_bins=10
    )

    axes[i].plot(prob_pred_before, prob_true_before, label='보정 전')
    axes[i].plot(prob_pred_after, prob_true_after, label='보정 후')
    axes[i].plot([0, 1], [0, 1], 'k--', label='완벽한 보정')
    axes[i].set_title(class_name)
    axes[i].legend()

plt.tight_layout()
plt.savefig('calibration_curve.png')

SHAP: 예측 근거 설명

XGBoost가 폭락 전조 확률 75%를 출력해도 “왜 이 신호가 나왔는가”를 설명할 수 없으면 서비스에 붙이기 어렵다. SHAP으로 각 예측의 피처 기여도를 계산했다.

import shap

# SHAP TreeExplainer: XGBoost 전용, 빠른 계산
explainer = shap.TreeExplainer(calibrated_model.estimator)
shap_values = explainer.shap_values(X_test)

# Global SHAP: 전체 피처 중요도
shap.summary_plot(shap_values[0], X_test,  # [0]: 폭락 전조 클래스
                  feature_names=feature_names,
                  plot_type='bar')

# Local SHAP: 특정 예측의 근거
def explain_prediction(model, explainer, x_single, feature_names):
    """
    단일 예측 설명 텍스트 생성
    서비스에 "이런 이유로 폭락 위험도가 높습니다" 형태로 표시
    """
    shap_vals = explainer.shap_values(x_single.reshape(1, -1))

    # 폭락 전조 클래스(0)의 SHAP 기여도
    crash_shap = shap_vals[0][0]
    top_features = sorted(
        zip(feature_names, crash_shap),
        key=lambda x: abs(x[1]),
        reverse=True
    )[:3]

    explanations = []
    for feat_name, shap_val in top_features:
        direction = "상승" if shap_val > 0 else "하락"
        explanations.append(f"{feat_name} ({direction} 기여)")

    return f"주요 원인: {', '.join(explanations)}"

# 결과 예시:
# "주요 원인: vix_term (상승 기여), hy_spread (상승 기여), noise_score (상승 기여)"

Walk-Forward 백테스트

시계열 데이터라 일반적인 k-fold 교차 검증을 쓸 수 없다. Walk-Forward 방식으로 검증했다.

def walk_forward_backtest(X: np.ndarray,
                          y: np.ndarray,
                          sample_weights: np.ndarray,
                          train_size: int = 756,   # 약 3년
                          test_size: int = 63,      # 약 3개월
                          step_size: int = 63) -> dict:
    """
    Walk-Forward 백테스트
    - 각 검증 시점에서 그 시점 이전 데이터만 학습
    - 미래 데이터 누수 완전 차단
    """
    results = {
        'f1_macro': [],
        'crash_precision': [],   # 폭락 전조 정밀도 (과탐지 억제)
        'surge_recall': []       # 급등 전조 재현율 (놓치지 않기 위해)
    }

    for start in range(0, len(X) - train_size - test_size, step_size):
        train_end = start + train_size
        test_end = train_end + test_size

        X_train = X[start:train_end]
        y_train = y[start:train_end]
        w_train = sample_weights[start:train_end]

        X_test = X[train_end:test_end]
        y_test = y[train_end:test_end]

        # 해당 시점까지의 데이터로만 학습
        model = xgb.XGBClassifier(**best_params)
        model.fit(X_train, y_train, sample_weight=w_train, verbose=False)

        # Platt Scaling 적용
        calib = CalibratedClassifierCV(model, method='sigmoid', cv='prefit')
        calib.fit(X_train[-100:], y_train[-100:])

        y_pred = calib.predict(X_test)

        results['f1_macro'].append(f1_score(y_test, y_pred, average='macro'))
        results['crash_precision'].append(
            precision_score(y_test, y_pred, labels=[0], average='macro', zero_division=0)
        )
        results['surge_recall'].append(
            recall_score(y_test, y_pred, labels=[2], average='macro', zero_division=0)
        )

    return {
        'f1_macro_mean': np.mean(results['f1_macro']),
        'crash_precision_mean': np.mean(results['crash_precision']),
        'surge_recall_mean': np.mean(results['surge_recall'])
    }

# 결과 예시:
# f1_macro: 0.58, crash_precision: 0.64, surge_recall: 0.61

배포 후 발견한 문제들

피처 스케일 불일치

학습 시점과 추론 시점에서 피처 분포가 조금씩 달라지는 현상이 생겼다. 특히 VIX 수준이 학습 기간과 크게 다른 시장 환경이 오면 모델이 극단적인 확률을 출력했다.

def clip_features_to_training_range(X: np.ndarray,
                                     train_stats: dict) -> np.ndarray:
    """
    피처 클리핑: 학습 데이터 1~99 퍼센타일로 제한
    - 극단적으로 벗어난 값이 들어오면 학습 분포의 최대/최소로 클리핑
    - 분포 이탈로 인한 극단적 예측 방지
    """
    X_clipped = X.copy()
    for i, feat_name in enumerate(feature_names):
        p01 = train_stats[feat_name]['p01']
        p99 = train_stats[feat_name]['p99']
        X_clipped[:, i] = np.clip(X_clipped[:, i], p01, p99)
    return X_clipped

# 학습 시 통계 저장
train_stats = {}
for i, feat_name in enumerate(feature_names):
    train_stats[feat_name] = {
        'p01': np.percentile(X_train[:, i], 1),
        'p99': np.percentile(X_train[:, i], 99)
    }

# pkl로 모델 번들에 포함
model_bundle = {
    'model': calibrated_model,
    'train_stats': train_stats,
    'feature_names': feature_names,
    'best_params': best_params
}

과탐지 문제

보정 전에는 폭락 전조 알림이 너무 자주 발생했다. Platt Scaling과 임계값 상향 조정으로 어느 정도 해결했다.

def predict_with_threshold(model, X: np.ndarray,
                            crash_threshold: float = 0.45,
                            surge_threshold: float = 0.40) -> np.ndarray:
    """
    임계값 조정: 기본 0.33이 아닌 상향된 임계값 적용
    - 폭락 전조: 45% 이상일 때만 신호 발생 (과탐지 억제)
    - 급등 전조: 40% 이상일 때만 신호 발생
    """
    proba = model.predict_proba(X)

    predictions = np.ones(len(X), dtype=int)  # 기본값: 중립

    crash_mask = proba[:, 0] >= crash_threshold
    surge_mask = proba[:, 2] >= surge_threshold

    predictions[crash_mask] = 0   # 폭락 전조
    predictions[surge_mask] = 2   # 급등 전조

    # 둘 다 해당하면 폭락 우선 (보수적 접근)
    predictions[crash_mask & surge_mask] = 0

    return predictions, proba

이 기능을 만들면서 배운 것

3-class 분류는 이진 분류보다 훨씬 어렵다. 특히 클래스 불균형과 클래스 간 임계값 조정이 복잡하게 얽혀 있다. 폭락 전조의 정밀도를 높이면 급등 전조의 재현율이 떨어지는 트레이드오프가 있다. 최적화할 단일 지표를 F1-macro 하나로 고정한 결정이 중요했다.

SHAP을 서비스에 붙이는 결정이 신뢰도를 높였다. 숫자 하나만 보여주는 것과 “이 숫자가 이런 이유로 나왔다”를 함께 보여주는 것은 사용자 반응이 달랐다. 금융 데이터 시각화에서 설명 가능성은 정확도만큼 중요하다.

Optuna 50회 탐색은 충분히 효과가 있었지만, 최적 파라미터가 매월 재학습 때마다 조금씩 달라진다는 것도 배웠다. 고정 파라미터를 쓰는 것보다 주기적으로 재탐색하는 구조가 낫다.

Passive 직접 확인하기


다음 글 예고

다음 개발일지에서는 5모델 앙상블 + GJR-GARCH로 ETF 30일 예측 파이프라인을 구축하는 방법을 다룬다. XGBoost, CatBoost, RandomForest, Ridge, SVR 앙상블 구조와 Monte Carlo 신뢰구간 생성까지 전 과정을 기록한다.