GaussianHMM으로 주식 시장 국면을 분류하는 방법 — AI 미국주식 분석 플랫폼 Passive 개발일지 #1


이 글은 AI 기반 미국주식 분석 플랫폼 Passive를 혼자 구축하면서 겪은 기술적 선택과 실패를 기록한 개발일지 시리즈입니다. 첫 번째 주제는 GaussianHMM을 활용한 시장 국면 분류기 구현 과정입니다.


GaussianHMM 시장 국면 분류란 무엇인가

GaussianHMM(Gaussian Hidden Markov Model)을 활용한 주식 시장 국면 분류는 머신러닝 기반 퀀트 투자 전략의 핵심 기법 중 하나다. 시장이 현재 이성적으로 움직이는지, 감정에 의해 움직이는지를 데이터로 정량화하는 것이 목표다.

ETF 장기 투자자에게 가장 위험한 순간은 “지금 시장이 어떤 국면인가”를 구분하지 못할 때다. 주가가 오르고 있어도 펀더멘털 대비 고평가된 감정 주도 상승이라면 매수 타이밍이 아니다. 반대로 급락 구간도 펀더멘털이 견고하다면 공포에 의한 과매도일 수 있다.

이 판단을 자동화하는 것이 Passive 펀더멘털 탭의 출발점이었다.


왜 임계값 기반 분류 대신 HMM을 선택했는가

처음 시도한 방법은 단순 임계값 기반 분류였다. CAPE 30 이상이면 고평가, VIX 25 이상이면 공포 구간이라는 룰 기반 접근이었다. 문제는 각 지표가 독립적으로 신호를 내고, 충돌할 때 우선순위 기준이 없다는 점이었다.

Hidden Markov Model은 이 문제를 자연스럽게 해결한다. 시장의 실제 국면(“숨겨진 상태”)은 직접 관측할 수 없고, 우리가 보는 건 그 상태에서 나온 관측값(지표들)뿐이라는 구조가 금융 시장과 정확히 일치한다.

GaussianHMM을 선택한 이유는 연속형 관측값 처리 때문이다. CAPE, HY 스프레드, 실현 변동성 같은 금융 지표는 범주형이 아닌 연속형이고, GaussianHMM은 각 상태에서 관측값이 정규분포를 따른다고 가정해 이를 잘 처리한다.

from hmmlearn.hmm import GaussianHMM
from sklearn.preprocessing import RobustScaler
import numpy as np

# RobustScaler: 금융 데이터 이상치(2008, 2020 급락) 대응
scaler = RobustScaler()
X_scaled = scaler.fit_transform(features)

# full covariance 먼저 시도, 수치 불안정 시 diag 폴백
try:
    model = GaussianHMM(
        n_components=4,
        covariance_type="full",
        n_iter=200,
        random_state=42
    )
    model.fit(X_scaled)
except Exception:
    model = GaussianHMM(
        n_components=4,
        covariance_type="diag",
        n_iter=200,
        random_state=42
    )
    model.fit(X_scaled)

8개 피처 설계: 퀀트 투자 전략의 핵심

머신러닝 기반 시장 분류에서 피처 설계가 모델 성능을 좌우한다. Passive는 18년치 월별 데이터(약 219개월)를 기반으로 8개 피처를 사용한다.

fundamental_gap — 가격-이익 괴리도

12개월 주가 누적 로그 수익률에서 12개월 EPS 누적 로그 성장률을 뺀 값이다. 양수면 가격이 이익보다 빠르게 올랐다는 의미로 과열 신호다.

def calc_fundamental_gap(shiller_df):
    # Shiller CAPE 데이터 기반
    price_return = np.log(shiller_df['price']).diff(12)
    eps_growth = np.log(shiller_df['earnings']).diff(12)
    return (price_return - eps_growth).fillna(0)

erp_zscore — 주식 위험 프리미엄 z-score

주식 수익률(1/CAPE)에서 TIPS 실질금리를 뺀 ERP의 120개월 롤링 z-score 절대값이다. 절대값이 클수록 현재 프리미엄이 역사적으로 이례적인 수준임을 의미한다.

def calc_erp_zscore(cape, tips_rate):
    erp = (1 / cape) - tips_rate
    rolling_mean = erp.rolling(120).mean()
    rolling_std = erp.rolling(120).std()
    zscore = (erp - rolling_mean) / rolling_std
    return zscore.abs().fillna(0)

residual_corr — 잔차 수익률 상관관계

SPY 베타를 제거한 잔차 수익률 간 상관관계다. 감정 지배 구간에서는 개별 종목이 펀더멘털과 무관하게 같은 방향으로 움직이기 때문에 잔차 상관이 높아진다. 5개 섹터, 25개 종목 기준으로 계산한다.

def calc_residual_corr(returns_df, spy_returns):
    residuals = {}
    for ticker in returns_df.columns:
        beta = np.cov(returns_df[ticker], spy_returns)[0,1] / np.var(spy_returns)
        residuals[ticker] = returns_df[ticker] - beta * spy_returns
    residual_df = pd.DataFrame(residuals)
    corr_matrix = residual_df.rolling(60).corr()
    # 대각선 제외 평균 상관계수
    return corr_matrix.groupby(level=0).apply(
        lambda x: (x.sum().sum() - len(x)) / (len(x) * (len(x) - 1))
    )

vix_term — VIX 기간 구조

VIX를 VIX3M으로 나눈 값이다. 1보다 크면 단기 공포가 장기보다 큰 역전 상태를 의미한다. 감정적 시장의 대표적인 신호다.

나머지 네 개 피처인 dispersion(종목간 수익률 분산), amihud(비유동성 지표), hy_spread(HY 신용 스프레드), realized_vol(실현 변동성)은 시장 유동성과 신용 환경을 반영한다.


RobustScaler를 선택한 이유

금융 데이터에서 StandardScaler 대신 RobustScaler를 써야 하는 이유가 있다.

2008년 금융위기나 2020년 코로나 급락 같은 극단적 사건이 StandardScaler의 평균과 분산을 왜곡한다. 이 상태에서 스케일링하면 정상 구간 데이터가 오히려 이상치처럼 처리된다.

RobustScaler는 중앙값(median)과 IQR을 기준으로 스케일링하기 때문에 극단값의 영향을 받지 않는다.

# StandardScaler vs RobustScaler 비교
# 2008년 금융위기 구간이 포함된 데이터에서:

from sklearn.preprocessing import StandardScaler, RobustScaler
import pandas as pd

std_scaler = StandardScaler()
rob_scaler = RobustScaler()

# StandardScaler: 금융위기 극값이 mean/std를 왜곡
X_std = std_scaler.fit_transform(features)

# RobustScaler: median/IQR 기준, 극값에 강건
X_rob = rob_scaler.fit_transform(features)

# 정상 구간 데이터의 분포를 비교하면
# RobustScaler가 훨씬 균등하게 분포됨

4개 상태를 선택한 이유

상태 수 선택은 실험 기반이었다. 2개는 너무 단순하고 6개 이상은 해석 불가능한 상태가 생겼다. 4개가 이성, 약한 이성, 약한 감정, 강한 감정으로 해석 가능한 최적점이었다.

AIC/BIC 기준으로도 4개 상태가 가장 낮은 정보량 기준값을 보였다.

from hmmlearn.hmm import GaussianHMM

# 상태 수별 BIC 비교
bic_scores = {}
for n in range(2, 8):
    model = GaussianHMM(n_components=n, covariance_type="diag", n_iter=200)
    model.fit(X_scaled)
    # log-likelihood 기반 BIC 근사
    log_likelihood = model.score(X_scaled)
    n_params = n * n + n * X_scaled.shape[1] * 2  # 전이확률 + 가우시안 파라미터
    bic = -2 * log_likelihood * len(X_scaled) + n_params * np.log(len(X_scaled))
    bic_scores[n] = bic

optimal_n = min(bic_scores, key=bic_scores.get)
print(f"최적 상태 수: {optimal_n}")  # 결과: 4

noise_score 설계: 국면을 하나의 숫자로

HMM이 4개 상태를 분류해도 사용자에게 “현재 상태 2번”이라고 보여주는 건 의미가 없다. 직관적인 단일 점수로 변환해야 했다.

noise_score는 8개 피처의 가중합으로 계산한다. 실험을 통해 결정한 가중치 구조는 다음과 같다.

def calculate_noise_score(features_dict):
    """
    noise_score: 양수 = 감정적 시장 (DB 저장 기준)
    API 응답 시 부호 반전 → 사용자에게는 양수 = 이성적 시장
    """
    score = (
        features_dict['vix_term'] * 2.0 +           # 단기 공포 (최고 가중치)
        features_dict['realized_vol'] * 2.0 +        # 실현 변동성
        features_dict['hy_spread'] * 1.5 +           # 신용 환경
        features_dict['residual_corr'] * 1.0 +       # 동조화 정도
        abs(features_dict['fundamental_gap']) * 0.5 + # 방향보다 괴리 크기
        abs(features_dict['erp_zscore']) * 0.3 +     # 밸류에이션 이례성
        features_dict['dispersion'] * 0.8 +          # 종목 분산도
        features_dict['amihud'] * 0.6                # 유동성
    )
    return score

# API 레이어에서 부호 반전 (DB 컨벤션 분리)
def get_noise_score_for_display(raw_score):
    return -raw_score  # DB: 양수=감정 → 표시: 양수=이성

경량 예측: 10분마다 실시간 업데이트

전체 파이프라인(18년치 데이터 수집 + 모델 학습)은 3시간마다 실행된다. 그 사이에도 오늘의 국면은 업데이트해야 한다.

월별 피처(fundamental_gap, erp_zscore, vix_term, hy_spread)는 모델 번들에 캐싱하고, 실시간 계산 가능한 피처(residual_corr, dispersion, amihud, realized_vol)는 최근 60일 데이터로 재계산한다.

def lightweight_predict(model_bundle: dict, ticker_data: pd.DataFrame) -> dict:
    """
    경량 파이프라인: 10분 주기 실행
    - 월별 피처: 캐시에서 로드
    - 실시간 피처: 최근 60일로 재계산
    """
    # 캐시된 월별 피처
    cached_features = model_bundle['cached_monthly_features']

    # 실시간 재계산 피처 (yfinance 최신 데이터 기반)
    recent_data = ticker_data.tail(60)
    realtime_features = {
        'realized_vol': recent_data['returns'].std() * np.sqrt(252),
        'dispersion': recent_data['sector_returns'].std(axis=1).mean(),
        'amihud': (recent_data['returns'].abs() / recent_data['volume']).mean(),
        'residual_corr': calc_residual_corr_fast(recent_data),
        # vix_term: Yahoo Finance 실시간 덮어쓰기
        'vix_term': fetch_vix_term_realtime()
    }

    # 병합 후 예측
    features = {**cached_features, **realtime_features}
    noise_score = calculate_noise_score(features)

    model = model_bundle['hmm_model']
    scaler = model_bundle['scaler']

    feature_vector = np.array([[features[k] for k in FEATURE_ORDER]])
    scaled_vector = scaler.transform(feature_vector)
    state = model.predict(scaled_vector)[0]

    return {
        'state': int(state),
        'noise_score': float(noise_score),
        'updated_at': datetime.utcnow().isoformat()
    }

50일 백필: 모델 일관성 유지

모델을 새로 학습하면 과거 50일치 국면도 새 모델로 재채점한다. 오래된 모델로 채점된 기존 데이터와의 불일치를 최소화하기 위해서다.

def backfill_regime_scores(model_bundle: dict, days: int = 50):
    """
    모델 재학습 후 50일 백필
    - 각 날짜의 20일치 이전 데이터로 피처 재계산
    - 새 모델로 국면 예측 후 DB 업서트
    """
    today = datetime.today()
    dates_to_fill = [today - timedelta(days=i) for i in range(days)]

    results = []
    for date in dates_to_fill:
        # 해당 시점 이전 20일치 데이터
        historical_window = fetch_data_until(date, lookback=20)

        if historical_window is None:
            continue

        features = extract_features(historical_window)
        noise_score = calculate_noise_score(features)

        results.append({
            'date': date.strftime('%Y-%m-%d'),
            'noise_score': noise_score,
            'state': predict_state(model_bundle, features)
        })

    # Supabase 업서트 (이미 존재하는 날짜는 건너뜀)
    upsert_to_supabase(results, conflict_column='date')
    print(f"백필 완료: {len(results)}개 레코드")

배포 후 발견한 문제들

FRED API 불안정성

FRED API는 간헐적으로 타임아웃이 나거나 빈 응답을 반환한다. hy_spread와 tips_rate 피처가 FRED 의존적이라 전체 파이프라인이 멈추는 문제가 발생했다.

import time
import requests

def fetch_fred_data(series_id: str, max_retries: int = 3) -> pd.Series:
    """
    FRED API 지수 백오프 재시도 + pkl 캐시 폴백
    """
    cache_path = f"cache/{series_id}.pkl"

    for attempt in range(max_retries):
        try:
            response = requests.get(
                f"https://api.stlouisfed.org/fred/series/observations",
                params={
                    'series_id': series_id,
                    'api_key': FRED_API_KEY,
                    'file_type': 'json'
                },
                timeout=10
            )
            data = response.json()
            series = pd.Series(...)  # 파싱 로직
            # 성공 시 캐시 저장
            series.to_pickle(cache_path)
            return series

        except Exception as e:
            wait_time = 3 * (2 ** attempt)  # 3초, 6초 대기
            print(f"FRED 재시도 {attempt+1}/{max_retries}, {wait_time}초 대기")
            time.sleep(wait_time)

    # 모든 재시도 실패 시 캐시 폴백
    if os.path.exists(cache_path):
        print(f"FRED 캐시 폴백: {series_id}")
        return pd.read_pickle(cache_path)

    raise RuntimeError(f"FRED 데이터 수집 완전 실패: {series_id}")

부호 컨벤션 혼선과 DB 마이그레이션

개발 초기에 “양수 = 이성, 음수 = 감정”으로 정의했다가 중간에 반대로 뒤집었다. 이미 DB에 저장된 데이터와 새 데이터가 다른 컨벤션을 가지게 됐고, 전체 DB를 마이그레이션해야 했다.

# 마이그레이션 스크립트: DB 저장값 전체 부호 반전
def migrate_sign_convention():
    """
    DB 부호 컨벤션 변경: 양수=이성 → 양수=감정
    API 레이어에서 반전해서 표시하는 방식으로 통일
    """
    response = supabase.table('market_regime') \
        .select('id, noise_score') \
        .execute()

    batch_updates = []
    for row in response.data:
        batch_updates.append({
            'id': row['id'],
            'noise_score': -row['noise_score']  # 부호 반전
        })

    # 배치 업서트
    supabase.table('market_regime') \
        .upsert(batch_updates) \
        .execute()

    print(f"마이그레이션 완료: {len(batch_updates)}개 레코드 부호 반전")

# 교훈: DB 컨벤션은 처음에 확정하고,
# 표시 변환은 항상 API 레이어에서만 처리

이 기능을 만들면서 배운 것

피처 설계가 모델 성능을 좌우한다. fundamental_gap 없이 VIX만 넣으면 변동성 높낮이로만 국면이 분류된다. ERP를 빼면 밸류에이션 신호가 사라진다. 퀀트 투자 전략에서 어떤 피처를 넣느냐가 곧 “시장을 어떻게 정의하느냐”의 문제다.

단일 점수로 압축하는 결정이 UX를 결정한다. HMM이 출력하는 4개 상태 확률보다 noise_score라는 단일 숫자가 사용자에게 훨씬 직관적으로 전달된다. 복잡한 모델의 결과를 단순화하는 것도 금융 데이터 시각화의 중요한 과제다.

DB 컨벤션은 처음에 확정해야 한다. 중간에 부호 방향을 바꾸면 마이그레이션 비용이 크다. API 레이어에서만 변환하고 DB 저장값은 항상 일관된 방향으로 유지하는 원칙을 이때 정립했다.

Passive 직접 확인하기


다음 글 예고

다음 개발일지에서는 XGBoost + Optuna로 폭락/급등 전조를 탐지하는 방법을 다룬다. 3-class 분류, Platt Scaling 확률 보정, SHAP 기반 설명 가능성 구현까지 전 과정을 기록한다.