Parabolic Reversal Strategy

Concepto Base

La estrategia Parabolic Reversal se enfoca en identificar y capitalizar reversiones en stocks que han experimentado movimientos parabólicos extremos. Estos setups ofrecen algunas de las mejores oportunidades risk/reward cuando se ejecutan correctamente.

Anatomía de un Movimiento Parabólico

Características de Movimientos Parabólicos

class ParabolicMovementAnalyzer:
    def __init__(self):
        self.parabolic_criteria = {
            'magnitude': 100,        # >100% move from base
            'timeframe': 21,         # Within 21 days
            'acceleration': 1.5,     # Accelerating daily gains
            'volume_profile': 'increasing_then_decreasing'
        }
        
    def identify_parabolic_move(self, price_data):
        """Identificar si un stock está en movimiento parabólico"""
        
        # Calculate move from base
        lookback_period = 30
        base_price = price_data.tail(lookback_period)['low'].min()
        current_price = price_data.iloc[-1]['close']
        total_move = (current_price - base_price) / base_price
        
        # Calculate time to move
        base_date_idx = price_data.tail(lookback_period)['low'].idxmin()
        base_date_position = price_data.index.get_loc(base_date_idx)
        days_to_move = len(price_data) - base_date_position
        
        # Acceleration analysis
        acceleration_score = self.calculate_acceleration(price_data.tail(10))
        
        # Volume profile analysis
        volume_profile = self.analyze_volume_profile(price_data.tail(15))
        
        # Determine if parabolic
        is_parabolic = (
            total_move > 1.0 and          # >100% move
            days_to_move <= 21 and        # Within 21 days
            acceleration_score > 60       # Strong acceleration
        )
        
        return {
            'is_parabolic': is_parabolic,
            'total_move_pct': total_move,
            'days_to_move': days_to_move,
            'acceleration_score': acceleration_score,
            'volume_profile': volume_profile,
            'exhaustion_signals': self.detect_exhaustion_signals(price_data),
            'reversal_probability': self.calculate_reversal_probability(
                total_move, days_to_move, acceleration_score, volume_profile
            )
        }
    
    def calculate_acceleration(self, recent_data):
        """Calcular score de aceleración del momentum"""
        
        daily_returns = recent_data['close'].pct_change().fillna(0)
        
        # Check for accelerating pattern
        early_period = daily_returns.head(5).mean()
        late_period = daily_returns.tail(5).mean()
        
        # Acceleration ratio
        acceleration_ratio = late_period / early_period if early_period != 0 else 1
        
        # Consistency of gains
        positive_days = (daily_returns > 0).sum()
        consistency_score = (positive_days / len(daily_returns)) * 100
        
        # Combined acceleration score
        acceleration_score = min(100, (acceleration_ratio * 30) + consistency_score)
        
        return acceleration_score
    
    def analyze_volume_profile(self, volume_data):
        """Analizar perfil de volumen durante movimiento parabólico"""
        
        early_volume = volume_data.head(7)['volume'].mean()
        middle_volume = volume_data.iloc[4:11]['volume'].mean()
        late_volume = volume_data.tail(7)['volume'].mean()
        
        # Classic parabolic pattern: increasing then decreasing volume
        volume_pattern = {
            'early_avg': early_volume,
            'middle_avg': middle_volume,
            'late_avg': late_volume,
            'peak_volume': volume_data['volume'].max(),
            'current_vs_peak': volume_data.iloc[-1]['volume'] / volume_data['volume'].max()
        }
        
        # Determine pattern type
        if middle_volume > early_volume and late_volume < middle_volume:
            pattern_type = 'classic_parabolic'  # Good for reversal
        elif late_volume > middle_volume > early_volume:
            pattern_type = 'accelerating'       # Still building
        else:
            pattern_type = 'irregular'          # Mixed signals
        
        volume_pattern['pattern_type'] = pattern_type
        
        return volume_pattern
    
    def detect_exhaustion_signals(self, price_data):
        """Detectar señales de agotamiento"""
        
        signals = {}
        recent_data = price_data.tail(5)
        
        # 1. Shooting star / doji patterns
        signals['bearish_candlestick'] = self.detect_bearish_patterns(recent_data)
        
        # 2. Lower highs despite volume
        signals['lower_highs'] = self.detect_lower_highs(recent_data)
        
        # 3. Gap ups with immediate selling
        signals['gap_fade'] = self.detect_gap_fade_pattern(recent_data)
        
        # 4. Volume divergence
        signals['volume_divergence'] = self.detect_volume_divergence(price_data.tail(10))
        
        # 5. Extended time at highs
        signals['time_at_highs'] = self.calculate_time_at_highs(recent_data)
        
        # 6. Failed breakout attempts
        signals['failed_breakouts'] = self.count_failed_breakouts(recent_data)
        
        # Composite exhaustion score
        exhaustion_score = sum([
            signals['bearish_candlestick'] * 20,
            signals['lower_highs'] * 25,
            signals['gap_fade'] * 15,
            signals['volume_divergence'] * 20,
            signals['time_at_highs'] * 10,
            signals['failed_breakouts'] * 10
        ])
        
        signals['exhaustion_score'] = min(100, exhaustion_score)
        signals['reversal_imminent'] = exhaustion_score > 70
        
        return signals

Entry Strategies para Reversals

Long Setup (Bounce from Oversold)

class ParabolicBounceStrategy:
    def __init__(self):
        self.strategy_type = "oversold_bounce"
        self.min_decline = 0.30  # 30% decline from high
        
    def identify_bounce_setup(self, stock_data, parabolic_data):
        """Identificar setup para bounce después de crash parabólico"""
        
        # Verify parabolic crash
        recent_high = stock_data.tail(10)['high'].max()
        current_price = stock_data.iloc[-1]['close']
        decline_from_high = (recent_high - current_price) / recent_high
        
        if decline_from_high < self.min_decline:
            return None
        
        # Check for bounce signals
        bounce_signals = self.detect_bounce_signals(stock_data)
        
        if bounce_signals['signal_count'] >= 3:
            return self.calculate_bounce_entry_levels(stock_data, bounce_signals)
        
        return None
    
    def detect_bounce_signals(self, stock_data):
        """Detectar señales de bounce potencial"""
        
        signals = {}
        recent_data = stock_data.tail(5)
        
        # 1. Oversold RSI
        rsi = calculate_rsi(stock_data['close'], 14)
        signals['oversold_rsi'] = rsi.iloc[-1] < 25
        
        # 2. Volume spike on decline
        current_volume = stock_data.iloc[-1]['volume']
        avg_volume = stock_data['volume'].rolling(20).mean().iloc[-1]
        signals['volume_spike'] = current_volume > avg_volume * 2
        
        # 3. Hammer/doji formation
        signals['reversal_candle'] = self.detect_reversal_candlestick(recent_data.iloc[-1])
        
        # 4. Support level test
        signals['support_test'] = self.check_support_level_test(stock_data)
        
        # 5. Positive divergence
        signals['bullish_divergence'] = self.detect_bullish_divergence(stock_data.tail(10))
        
        # 6. Gap down with recovery
        signals['gap_recovery'] = self.detect_gap_recovery(recent_data)
        
        signal_count = sum([signals[key] for key in signals if isinstance(signals[key], bool)])
        signals['signal_count'] = signal_count
        
        return signals
    
    def calculate_bounce_entry_levels(self, stock_data, signals):
        """Calcular niveles de entrada para bounce"""
        
        current_price = stock_data.iloc[-1]['close']
        recent_low = stock_data.tail(5)['low'].min()
        support_level = self.identify_support_level(stock_data)
        
        return {
            'strategy': 'parabolic_bounce',
            'entry_levels': {
                'aggressive': {
                    'price': current_price * 1.02,  # 2% above current
                    'size_pct': 0.40,
                    'trigger': 'reversal_confirmation',
                    'confirmation': 'green_candle_with_volume'
                },
                'conservative': {
                    'price': support_level * 1.05,  # 5% above support
                    'size_pct': 0.60,
                    'trigger': 'support_hold_confirmation',
                    'confirmation': 'multiple_green_candles'
                }
            },
            'stop_loss': recent_low * 0.95,  # 5% below recent low
            'targets': {
                'target_1': current_price * 1.20,   # 20% bounce
                'target_2': current_price * 1.40,   # 40% bounce  
                'fibonacci_618': self.calculate_fib_retracement(stock_data, 0.618)
            },
            'time_limits': {
                'entry_window': '2_hours',
                'max_hold': '3_days'
            },
            'risk_reward': 2.5,  # Target 2.5:1 R/R minimum
            'confidence': self.calculate_setup_confidence(signals)
        }

Short Setup (Parabolic Top)

class ParabolicTopStrategy:
    def __init__(self):
        self.strategy_type = "parabolic_top_short"
        self.min_exhaustion_score = 70
        
    def identify_top_setup(self, stock_data, parabolic_data):
        """Identificar setup para short en top parabólico"""
        
        exhaustion_signals = parabolic_data['exhaustion_signals']
        
        if exhaustion_signals['exhaustion_score'] < self.min_exhaustion_score:
            return None
        
        # Additional confirmation signals
        top_confirmation = self.analyze_top_formation(stock_data)
        
        if top_confirmation['confidence'] >= 0.7:
            return self.calculate_short_entry_levels(stock_data, exhaustion_signals)
        
        return None
    
    def analyze_top_formation(self, stock_data):
        """Analizar formación de top"""
        
        recent_data = stock_data.tail(10)
        
        analysis = {}
        
        # 1. Multiple tests of resistance
        analysis['resistance_tests'] = self.count_resistance_tests(recent_data)
        
        # 2. Decreasing volume on up moves
        analysis['volume_exhaustion'] = self.detect_volume_exhaustion(recent_data)
        
        # 3. Bearish divergence
        analysis['bearish_divergence'] = self.detect_bearish_divergence(recent_data)
        
        # 4. Failed breakout pattern
        analysis['failed_breakout'] = self.detect_failed_breakout(recent_data)
        
        # 5. Time spent at highs
        analysis['distribution_time'] = self.calculate_distribution_time(recent_data)
        
        # Calculate confidence
        confidence_factors = [
            analysis['resistance_tests'] >= 2,
            analysis['volume_exhaustion'],
            analysis['bearish_divergence'],
            analysis['failed_breakout'],
            analysis['distribution_time'] > 3  # 3+ days at highs
        ]
        
        confidence = sum(confidence_factors) / len(confidence_factors)
        analysis['confidence'] = confidence
        
        return analysis
    
    def calculate_short_entry_levels(self, stock_data, exhaustion_signals):
        """Calcular niveles de entrada para short"""
        
        current_price = stock_data.iloc[-1]['close']
        recent_high = stock_data.tail(5)['high'].max()
        resistance_level = self.identify_resistance_level(stock_data)
        
        return {
            'strategy': 'parabolic_top_short',
            'entry_levels': {
                'aggressive': {
                    'price': current_price * 0.98,  # 2% below current
                    'size_pct': 0.50,
                    'trigger': 'first_weakness',
                    'confirmation': 'volume_on_decline'
                },
                'conservative': {
                    'price': resistance_level * 0.95,  # 5% below resistance
                    'size_pct': 0.50,
                    'trigger': 'confirmed_breakdown',
                    'confirmation': 'sustained_selling'
                }
            },
            'stop_loss': recent_high * 1.05,  # 5% above recent high
            'targets': {
                'target_1': current_price * 0.80,   # 20% decline
                'target_2': current_price * 0.65,   # 35% decline
                'fibonacci_618': self.calculate_fib_retracement(stock_data, 0.382)
            },
            'risk_management': {
                'max_hold_time': '5_days',
                'profit_taking_schedule': [0.30, 0.40, 0.30],  # 30%, 40%, 30%
                'stop_tightening': 'after_10pct_profit'
            }
        }

Pattern Recognition System

Machine Learning para Pattern Detection

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

class ParabolicPatternRecognizer:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.is_trained = False
        
    def create_features(self, price_data):
        """Crear features para ML pattern recognition"""
        
        features = {}
        
        # Price-based features
        features['total_move_pct'] = self.calculate_total_move(price_data)
        features['days_to_peak'] = self.calculate_days_to_peak(price_data)
        features['volatility_increasing'] = self.is_volatility_increasing(price_data)
        features['price_acceleration'] = self.calculate_price_acceleration(price_data)
        
        # Volume features
        features['volume_pattern_score'] = self.score_volume_pattern(price_data)
        features['volume_divergence'] = self.detect_volume_divergence(price_data)
        features['avg_volume_ratio'] = self.calculate_avg_volume_ratio(price_data)
        
        # Technical features
        features['rsi_overbought'] = self.calculate_rsi_extreme(price_data)
        features['bb_position'] = self.calculate_bollinger_position(price_data)
        features['distance_from_ma'] = self.calculate_ma_distance(price_data)
        
        # Pattern features
        features['exhaustion_candles'] = self.count_exhaustion_candles(price_data)
        features['failed_breakouts'] = self.count_failed_breakouts(price_data)
        features['gap_behavior'] = self.analyze_gap_behavior(price_data)
        
        # Time features
        features['time_at_highs'] = self.calculate_time_at_highs(price_data)
        features['momentum_duration'] = self.calculate_momentum_duration(price_data)
        
        return features
    
    def train_model(self, historical_data, labels):
        """Entrenar modelo de reconocimiento de patrones"""
        
        # Create feature matrix
        X = []
        y = []
        
        for i, data in enumerate(historical_data):
            features = self.create_features(data)
            feature_vector = [features[key] for key in sorted(features.keys())]
            X.append(feature_vector)
            y.append(labels[i])
        
        X = np.array(X)
        y = np.array(y)
        
        # Scale features
        X_scaled = self.scaler.fit_transform(X)
        
        # Train model
        self.model = RandomForestClassifier(
            n_estimators=200,
            max_depth=10,
            min_samples_split=5,
            random_state=42
        )
        
        self.model.fit(X_scaled, y)
        self.is_trained = True
        
        # Feature importance
        feature_names = sorted(self.create_features(historical_data[0]).keys())
        importance_df = pd.DataFrame({
            'feature': feature_names,
            'importance': self.model.feature_importances_
        }).sort_values('importance', ascending=False)
        
        return {
            'training_accuracy': self.model.score(X_scaled, y),
            'feature_importance': importance_df,
            'model_trained': True
        }
    
    def predict_reversal_probability(self, current_data):
        """Predecir probabilidad de reversal"""
        
        if not self.is_trained:
            raise ValueError("Model must be trained first")
        
        features = self.create_features(current_data)
        feature_vector = np.array([[features[key] for key in sorted(features.keys())]])
        feature_vector_scaled = self.scaler.transform(feature_vector)
        
        # Get prediction probability
        reversal_probability = self.model.predict_proba(feature_vector_scaled)[0][1]
        
        # Get feature contributions
        feature_names = sorted(features.keys())
        feature_importance = dict(zip(feature_names, self.model.feature_importances_))
        
        return {
            'reversal_probability': reversal_probability,
            'confidence_level': 'high' if reversal_probability > 0.8 else 'medium' if reversal_probability > 0.6 else 'low',
            'key_features': self.get_top_contributing_features(features, feature_importance),
            'model_features': features
        }

Risk Management Específico

Gestión de Volatilidad Extrema

class ParabolicRiskManager:
    def __init__(self, account_size):
        self.account_size = account_size
        self.max_parabolic_exposure = 0.10  # Max 10% in parabolic plays
        
    def calculate_parabolic_position_size(self, stock_data, strategy_type, confidence_score):
        """Cálculo de position size para trades parabólicos"""
        
        # Base risk is lower for parabolic trades due to volatility
        base_risk_pct = 0.015  # 1.5% base risk
        
        # Volatility adjustment
        volatility_factor = self.calculate_volatility_adjustment(stock_data)
        
        # Confidence adjustment
        confidence_factor = max(0.5, confidence_score)
        
        # Strategy-specific adjustment
        strategy_factors = {
            'parabolic_bounce': 1.2,    # Slightly more size (better R/R)
            'parabolic_top_short': 0.8  # Less size (unlimited upside risk)
        }
        
        strategy_factor = strategy_factors.get(strategy_type, 1.0)
        
        # Final risk calculation
        adjusted_risk_pct = (base_risk_pct * volatility_factor * 
                           confidence_factor * strategy_factor)
        
        # Cap at maximum parabolic exposure
        max_position_value = self.account_size * self.max_parabolic_exposure
        
        return {
            'risk_percentage': adjusted_risk_pct,
            'max_position_value': max_position_value,
            'volatility_adjustment': volatility_factor,
            'confidence_adjustment': confidence_factor,
            'strategy_adjustment': strategy_factor,
            'recommended_risk': min(adjusted_risk_pct, self.max_parabolic_exposure)
        }
    
    def manage_parabolic_stops(self, position, current_data, strategy_type):
        """Gestión especializada de stops para trades parabólicos"""
        
        if strategy_type == 'parabolic_bounce':
            return self.manage_bounce_stops(position, current_data)
        else:  # parabolic_top_short
            return self.manage_short_stops(position, current_data)
    
    def manage_bounce_stops(self, position, current_data):
        """Stops para bounce trades"""
        
        entry_price = position['entry_price']
        current_price = current_data['price']
        
        # Initial stop below recent low
        initial_stop = position['recent_low'] * 0.95
        
        current_profit = (current_price - entry_price) / entry_price
        
        # Aggressive profit protection due to volatility
        if current_profit > 0.15:  # 15% profit
            # Lock in 8% profit
            return entry_price * 1.08
        elif current_profit > 0.08:  # 8% profit
            # Move to breakeven + 2%
            return entry_price * 1.02
        else:
            return initial_stop
    
    def manage_short_stops(self, position, current_data):
        """Stops para short trades en parabolic tops"""
        
        entry_price = position['entry_price']
        current_price = current_data['price']
        
        # Initial stop above recent high
        initial_stop = position['recent_high'] * 1.05
        
        current_profit = (entry_price - current_price) / entry_price
        
        # Quick profit protection for shorts
        if current_profit > 0.12:  # 12% profit
            # Lock in 6% profit
            return entry_price * 0.94
        elif current_profit > 0.06:  # 6% profit
            # Move to breakeven - 2%
            return entry_price * 0.98
        else:
            return initial_stop

Backtesting Framework

Parabolic-Specific Backtesting

class ParabolicBacktester:
    def __init__(self, start_date, end_date):
        self.start_date = start_date
        self.end_date = end_date
        self.parabolic_recognizer = ParabolicPatternRecognizer()
        
    def run_parabolic_backtest(self, stock_universe):
        """Ejecutar backtest específico para patrones parabólicos"""
        
        results = {
            'bounce_trades': [],
            'short_trades': [],
            'pattern_detection_accuracy': {},
            'performance_metrics': {}
        }
        
        for symbol in stock_universe:
            symbol_results = self.backtest_symbol(symbol)
            
            results['bounce_trades'].extend(symbol_results['bounce_trades'])
            results['short_trades'].extend(symbol_results['short_trades'])
        
        # Analyze results
        results['performance_metrics'] = self.analyze_parabolic_performance(results)
        
        return results
    
    def backtest_symbol(self, symbol):
        """Backtest para un símbolo específico"""
        
        # Get historical data
        historical_data = self.get_historical_data(symbol, self.start_date, self.end_date)
        
        symbol_results = {
            'bounce_trades': [],
            'short_trades': []
        }
        
        # Sliding window analysis
        window_size = 30
        
        for i in range(window_size, len(historical_data)):
            window_data = historical_data.iloc[i-window_size:i]
            
            # Check for parabolic patterns
            parabolic_analysis = ParabolicMovementAnalyzer().identify_parabolic_move(window_data)
            
            if parabolic_analysis['is_parabolic']:
                # Check for reversal setups
                bounce_setup = ParabolicBounceStrategy().identify_bounce_setup(
                    window_data, parabolic_analysis
                )
                
                top_setup = ParabolicTopStrategy().identify_top_setup(
                    window_data, parabolic_analysis
                )
                
                if bounce_setup:
                    trade_result = self.simulate_bounce_trade(
                        symbol, window_data, bounce_setup, historical_data.iloc[i:]
                    )
                    if trade_result:
                        symbol_results['bounce_trades'].append(trade_result)
                
                if top_setup:
                    trade_result = self.simulate_short_trade(
                        symbol, window_data, top_setup, historical_data.iloc[i:]
                    )
                    if trade_result:
                        symbol_results['short_trades'].append(trade_result)
        
        return symbol_results
    
    def analyze_parabolic_performance(self, results):
        """Analizar performance específica de trades parabólicos"""
        
        bounce_df = pd.DataFrame(results['bounce_trades'])
        short_df = pd.DataFrame(results['short_trades'])
        
        analysis = {}
        
        if len(bounce_df) > 0:
            analysis['bounce_performance'] = {
                'total_trades': len(bounce_df),
                'win_rate': (bounce_df['pnl'] > 0).mean(),
                'avg_pnl': bounce_df['pnl'].mean(),
                'avg_hold_time': bounce_df['hold_time_hours'].mean(),
                'best_trade': bounce_df['pnl'].max(),
                'worst_trade': bounce_df['pnl'].min()
            }
        
        if len(short_df) > 0:
            analysis['short_performance'] = {
                'total_trades': len(short_df),
                'win_rate': (short_df['pnl'] > 0).mean(),
                'avg_pnl': short_df['pnl'].mean(),
                'avg_hold_time': short_df['hold_time_hours'].mean(),
                'best_trade': short_df['pnl'].max(),
                'worst_trade': short_df['pnl'].min()
            }
        
        # Combined analysis
        all_trades = pd.concat([bounce_df, short_df], ignore_index=True)
        
        if len(all_trades) > 0:
            analysis['combined_performance'] = {
                'total_trades': len(all_trades),
                'overall_win_rate': (all_trades['pnl'] > 0).mean(),
                'total_pnl': all_trades['pnl'].sum(),
                'sharpe_ratio': all_trades['pnl'].mean() / all_trades['pnl'].std(),
                'max_drawdown': self.calculate_max_drawdown(all_trades['cumulative_pnl'])
            }
        
        return analysis

La estrategia Parabolic Reversal requiere timing extremadamente preciso y gestión de riesgo agresiva, pero puede ofrecer algunas de las mejores oportunidades risk/reward en small cap trading cuando se ejecuta correctamente.