Análisis Cuantitativo Avanzado
Detección de Pump & Dump con Machine Learning
Características del Modelo
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
class PumpDumpDetector:
def __init__(self):
self.model = RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=10,
random_state=42
)
self.feature_columns = []
self.is_trained = False
def create_features(self, df):
"""Crear features para detectar pump & dump"""
features = pd.DataFrame(index=df.index)
# Price-based features
features['price_change_1d'] = df['close'].pct_change()
features['price_change_3d'] = df['close'].pct_change(3)
features['price_change_5d'] = df['close'].pct_change(5)
features['price_volatility_5d'] = df['close'].pct_change().rolling(5).std()
# Volume-based features
features['volume_ratio_1d'] = df['volume'] / df['volume'].rolling(20).mean()
features['volume_spike'] = (features['volume_ratio_1d'] > 5).astype(int)
features['volume_trend_3d'] = df['volume'].rolling(3).mean() / df['volume'].rolling(10).mean()
# Technical indicators
features['rsi_14'] = self.calculate_rsi(df['close'], 14)
features['distance_from_sma20'] = (df['close'] - df['close'].rolling(20).mean()) / df['close'].rolling(20).mean()
features['bb_position'] = self.calculate_bollinger_position(df['close'])
# Pattern-based features
features['consecutive_green_days'] = self.count_consecutive_green(df)
features['gap_up'] = ((df['open'] - df['close'].shift(1)) / df['close'].shift(1) > 0.10).astype(int)
features['failed_breakout'] = self.detect_failed_breakout(df)
# Momentum features
features['momentum_5d'] = df['close'] / df['close'].shift(5) - 1
features['acceleration'] = features['price_change_1d'] - features['price_change_1d'].shift(1)
# Market cap and float features (if available)
if 'market_cap' in df.columns:
features['market_cap_category'] = pd.cut(
df['market_cap'],
bins=[0, 50e6, 200e6, 1e9, np.inf],
labels=[0, 1, 2, 3]
).astype(float)
return features.fillna(0)
def calculate_rsi(self, prices, periods=14):
"""Calcular RSI"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=periods).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=periods).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_bollinger_position(self, prices, periods=20):
"""Posición dentro de Bollinger Bands"""
sma = prices.rolling(periods).mean()
std = prices.rolling(periods).std()
upper_band = sma + (std * 2)
lower_band = sma - (std * 2)
bb_position = (prices - lower_band) / (upper_band - lower_band)
return bb_position.clip(0, 1)
def count_consecutive_green(self, df):
"""Contar días verdes consecutivos"""
green_days = (df['close'] > df['open']).astype(int)
consecutive = green_days * (green_days.groupby((green_days != green_days.shift()).cumsum()).cumcount() + 1)
return consecutive
def detect_failed_breakout(self, df, lookback=5):
"""Detectar failed breakouts"""
rolling_high = df['high'].rolling(lookback).max()
breakout = df['high'] > rolling_high.shift(1)
# Failed breakout: rompe high pero no puede mantenerlo
failed = breakout & (df['close'] < df['high'] * 0.95)
return failed.astype(int)
def create_labels(self, df, forward_days=5, dump_threshold=-0.30):
"""Crear labels para training (1 = dump incoming, 0 = normal)"""
# Look forward para ver si hay dump
future_returns = df['close'].shift(-forward_days) / df['close'] - 1
# Label = 1 si hay caída >30% en los próximos 5 días
labels = (future_returns < dump_threshold).astype(int)
return labels[:-forward_days] # Remove last days without future data
def train_model(self, historical_data, symbols):
"""Entrenar modelo con datos históricos"""
all_features = []
all_labels = []
for symbol in symbols:
if symbol in historical_data:
df = historical_data[symbol]
# Create features and labels
features = self.create_features(df)
labels = self.create_labels(df)
# Align features and labels
min_length = min(len(features), len(labels))
features = features.iloc[-min_length:]
labels = labels.iloc[-min_length:]
all_features.append(features)
all_labels.extend(labels)
# Combine all data
X = pd.concat(all_features, ignore_index=True)
y = np.array(all_labels)
# Store feature columns
self.feature_columns = X.columns.tolist()
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
self.model.fit(X_train, y_train)
self.is_trained = True
# Evaluate
y_pred = self.model.predict(X_test)
print("Model Performance:")
print(classification_report(y_test, y_pred))
# Feature importance
importance_df = pd.DataFrame({
'feature': self.feature_columns,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop 10 Most Important Features:")
print(importance_df.head(10))
return {
'train_accuracy': self.model.score(X_train, y_train),
'test_accuracy': self.model.score(X_test, y_test),
'feature_importance': importance_df
}
def predict_dump_probability(self, current_data):
"""Predecir probabilidad de dump"""
if not self.is_trained:
raise ValueError("Model must be trained first")
features = self.create_features(current_data)
latest_features = features.iloc[-1:][self.feature_columns]
# Predict probability
dump_probability = self.model.predict_proba(latest_features)[0][1]
return {
'dump_probability': dump_probability,
'risk_level': 'HIGH' if dump_probability > 0.7 else 'MEDIUM' if dump_probability > 0.4 else 'LOW',
'features_used': latest_features.to_dict('records')[0]
}
Análisis de Correlaciones Dinámicas
Correlaciones Rolling
class DynamicCorrelationAnalysis:
def __init__(self, window=30):
self.window = window
self.correlation_history = {}
def calculate_rolling_correlations(self, returns_data, benchmark='SPY'):
"""Calcular correlaciones rolling con benchmark"""
correlations = {}
for symbol in returns_data.columns:
if symbol != benchmark and benchmark in returns_data.columns:
rolling_corr = returns_data[symbol].rolling(
window=self.window
).corr(returns_data[benchmark])
correlations[symbol] = {
'current_correlation': rolling_corr.iloc[-1],
'avg_correlation': rolling_corr.mean(),
'correlation_trend': self.calculate_trend(rolling_corr),
'correlation_stability': rolling_corr.std(),
'rolling_series': rolling_corr
}
return correlations
def calculate_trend(self, series, periods=10):
"""Calcular tendencia de correlación"""
if len(series) < periods:
return 0
recent = series.tail(periods).mean()
previous = series.tail(periods * 2).head(periods).mean()
return (recent - previous) / abs(previous) if previous != 0 else 0
def identify_correlation_breakdowns(self, correlations, threshold=0.3):
"""Identificar breakdowns de correlación (oportunidades de short)"""
breakdowns = []
for symbol, corr_data in correlations.items():
# Breakdown = correlación históricamente alta pero actualmente baja
if (corr_data['avg_correlation'] > 0.5 and
corr_data['current_correlation'] < threshold):
severity = abs(corr_data['avg_correlation'] - corr_data['current_correlation'])
breakdowns.append({
'symbol': symbol,
'current_corr': corr_data['current_correlation'],
'avg_corr': corr_data['avg_correlation'],
'severity': severity,
'trend': corr_data['correlation_trend'],
'opportunity_type': 'mean_reversion_short' if severity > 0.4 else 'momentum_short'
})
return sorted(breakdowns, key=lambda x: x['severity'], reverse=True)
def sector_correlation_heatmap(self, returns_data, sector_mapping):
"""Crear heatmap de correlaciones por sector"""
sector_correlations = {}
# Group by sector
sectors = {}
for symbol, sector in sector_mapping.items():
if sector not in sectors:
sectors[sector] = []
if symbol in returns_data.columns:
sectors[sector].append(symbol)
# Calculate sector average returns
sector_returns = {}
for sector, symbols in sectors.items():
if len(symbols) > 0:
sector_returns[sector] = returns_data[symbols].mean(axis=1)
# Calculate correlation matrix
sector_df = pd.DataFrame(sector_returns)
correlation_matrix = sector_df.corr()
return correlation_matrix
Statistical Arbitrage Framework
Pairs Trading Avanzado
class StatisticalArbitrageFramework:
def __init__(self):
self.pairs = {}
self.cointegration_results = {}
def find_cointegrated_pairs(self, price_data, min_correlation=0.7):
"""Encontrar pares cointegrados"""
from statsmodels.tsa.stattools import coint
symbols = list(price_data.columns)
cointegrated_pairs = []
for i in range(len(symbols)):
for j in range(i+1, len(symbols)):
symbol1, symbol2 = symbols[i], symbols[j]
# Check basic correlation first
correlation = price_data[symbol1].corr(price_data[symbol2])
if correlation > min_correlation:
# Test cointegration
series1 = price_data[symbol1].dropna()
series2 = price_data[symbol2].dropna()
# Align series
aligned_data = pd.concat([series1, series2], axis=1).dropna()
if len(aligned_data) > 50: # Minimum observations
try:
coint_stat, p_value, critical_values = coint(
aligned_data.iloc[:, 0],
aligned_data.iloc[:, 1]
)
if p_value < 0.05: # Cointegrated at 5% level
cointegrated_pairs.append({
'pair': (symbol1, symbol2),
'correlation': correlation,
'coint_stat': coint_stat,
'p_value': p_value,
'critical_value_5pct': critical_values[1]
})
except Exception as e:
continue
return sorted(cointegrated_pairs, key=lambda x: x['p_value'])
def calculate_spread_metrics(self, price_data, pair):
"""Calcular métricas del spread"""
symbol1, symbol2 = pair
# Calculate spread using linear regression
from sklearn.linear_model import LinearRegression
aligned_data = pd.concat([
price_data[symbol1],
price_data[symbol2]
], axis=1).dropna()
X = aligned_data.iloc[:, 1].values.reshape(-1, 1) # symbol2
y = aligned_data.iloc[:, 0].values # symbol1
# Fit regression
reg = LinearRegression().fit(X, y)
hedge_ratio = reg.coef_[0]
# Calculate spread
spread = aligned_data.iloc[:, 0] - hedge_ratio * aligned_data.iloc[:, 1]
# Spread statistics
spread_stats = {
'hedge_ratio': hedge_ratio,
'spread_mean': spread.mean(),
'spread_std': spread.std(),
'current_spread': spread.iloc[-1],
'z_score': (spread.iloc[-1] - spread.mean()) / spread.std(),
'spread_series': spread
}
return spread_stats
def generate_pairs_signals(self, spread_stats, entry_threshold=2.0, exit_threshold=0.5):
"""Generar señales de pairs trading"""
z_score = spread_stats['z_score']
if abs(z_score) > entry_threshold:
if z_score > entry_threshold:
# Spread too high: short symbol1, long symbol2
signal = {
'action': 'enter',
'symbol1_position': 'short',
'symbol2_position': 'long',
'hedge_ratio': spread_stats['hedge_ratio'],
'confidence': min(abs(z_score) / entry_threshold, 2.0) / 2.0,
'expected_return': abs(z_score) * spread_stats['spread_std']
}
else:
# Spread too low: long symbol1, short symbol2
signal = {
'action': 'enter',
'symbol1_position': 'long',
'symbol2_position': 'short',
'hedge_ratio': spread_stats['hedge_ratio'],
'confidence': min(abs(z_score) / entry_threshold, 2.0) / 2.0,
'expected_return': abs(z_score) * spread_stats['spread_std']
}
elif abs(z_score) < exit_threshold:
signal = {
'action': 'exit',
'reason': 'spread_normalized'
}
else:
signal = {
'action': 'hold',
'z_score': z_score
}
return signal
Regime Detection
Market Regime Classification
class MarketRegimeDetector:
def __init__(self):
self.regimes = {}
self.current_regime = None
def detect_volatility_regime(self, returns, lookback=20):
"""Detectar régimen de volatilidad"""
rolling_vol = returns.rolling(lookback).std() * np.sqrt(252) # Annualized
vol_percentiles = {
'low': rolling_vol.quantile(0.25),
'medium': rolling_vol.quantile(0.75),
'high': rolling_vol.quantile(0.95)
}
current_vol = rolling_vol.iloc[-1]
if current_vol < vol_percentiles['low']:
regime = 'low_volatility'
elif current_vol < vol_percentiles['medium']:
regime = 'normal_volatility'
elif current_vol < vol_percentiles['high']:
regime = 'high_volatility'
else:
regime = 'extreme_volatility'
return {
'regime': regime,
'current_volatility': current_vol,
'percentiles': vol_percentiles,
'regime_persistence': self.calculate_regime_persistence(rolling_vol, regime)
}
def detect_trend_regime(self, prices, short_window=20, long_window=50):
"""Detectar régimen de tendencia"""
short_ma = prices.rolling(short_window).mean()
long_ma = prices.rolling(long_window).mean()
# Trend direction
trend_direction = np.where(short_ma > long_ma, 1, -1)
# Trend strength
price_position = (prices - long_ma) / long_ma
trend_strength = abs(price_position.iloc[-1])
# Classify regime
current_trend = trend_direction[-1]
if trend_strength > 0.1: # Strong trend
regime = 'strong_uptrend' if current_trend == 1 else 'strong_downtrend'
elif trend_strength > 0.05: # Moderate trend
regime = 'uptrend' if current_trend == 1 else 'downtrend'
else:
regime = 'sideways'
return {
'regime': regime,
'trend_strength': trend_strength,
'price_position': price_position.iloc[-1],
'ma_distance': (short_ma.iloc[-1] - long_ma.iloc[-1]) / long_ma.iloc[-1]
}
def adaptive_strategy_parameters(self, volatility_regime, trend_regime):
"""Adaptar parámetros de estrategia según régimen"""
adaptations = {
'position_sizing': 1.0,
'stop_loss_multiplier': 1.0,
'take_profit_multiplier': 1.0,
'holding_period_adjustment': 1.0,
'strategy_selection': []
}
# Volatility adaptations
if volatility_regime['regime'] == 'low_volatility':
adaptations['position_sizing'] = 1.5 # Increase size in low vol
adaptations['stop_loss_multiplier'] = 0.8 # Tighter stops
adaptations['strategy_selection'].append('mean_reversion')
elif volatility_regime['regime'] == 'high_volatility':
adaptations['position_sizing'] = 0.5 # Reduce size in high vol
adaptations['stop_loss_multiplier'] = 1.5 # Wider stops
adaptations['strategy_selection'].append('momentum')
elif volatility_regime['regime'] == 'extreme_volatility':
adaptations['position_sizing'] = 0.25 # Very small positions
adaptations['stop_loss_multiplier'] = 2.0 # Much wider stops
adaptations['holding_period_adjustment'] = 0.5 # Shorter holds
# Trend adaptations
if trend_regime['regime'] in ['strong_uptrend', 'strong_downtrend']:
adaptations['strategy_selection'].append('trend_following')
adaptations['take_profit_multiplier'] = 1.5 # Let winners run
elif trend_regime['regime'] == 'sideways':
adaptations['strategy_selection'].append('range_trading')
adaptations['take_profit_multiplier'] = 0.8 # Take profits quickly
return adaptations
Performance Attribution
Factor-Based Analysis
class PerformanceAttribution:
def __init__(self):
self.factor_exposures = {}
self.attribution_results = {}
def calculate_factor_exposures(self, returns, factor_returns):
"""Calcular exposiciones a factores"""
from sklearn.linear_model import LinearRegression
# Align data
aligned_data = pd.concat([returns, factor_returns], axis=1).dropna()
if len(aligned_data) < 30: # Minimum observations
return None
y = aligned_data.iloc[:, 0].values # Strategy returns
X = aligned_data.iloc[:, 1:].values # Factor returns
# Regression
reg = LinearRegression().fit(X, y)
exposures = dict(zip(factor_returns.columns, reg.coef_))
alpha = reg.intercept_
r_squared = reg.score(X, y)
return {
'factor_exposures': exposures,
'alpha': alpha,
'r_squared': r_squared,
'residual_volatility': np.std(y - reg.predict(X))
}
def decompose_returns(self, strategy_returns, factor_returns, factor_exposures):
"""Descomponer returns en factores + alpha"""
decomposition = pd.DataFrame(index=strategy_returns.index)
# Factor contributions
for factor, exposure in factor_exposures['factor_exposures'].items():
if factor in factor_returns.columns:
decomposition[f'{factor}_contribution'] = (
factor_returns[factor] * exposure
)
# Total factor return
decomposition['total_factor_return'] = decomposition.sum(axis=1)
# Alpha
decomposition['alpha'] = factor_exposures['alpha']
# Residual (unexplained)
decomposition['residual'] = (
strategy_returns -
decomposition['total_factor_return'] -
decomposition['alpha']
)
return decomposition
Este framework de análisis cuantitativo avanzado permite identificar oportunidades de short selling con mayor precisión y gestionar el riesgo de manera más sofisticada.