Integración Práctica entre Plataformas

Workflow Completo: Yahoo Finance → Polygon → IBKR → QuantConnect

Pipeline de Datos Unificado

import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import asyncio
import aiohttp
import requests
from typing import Dict, List, Optional, Union
import logging
from dataclasses import dataclass
from abc import ABC, abstractmethod
import json
import time

@dataclass
class UnifiedQuote:
    """Quote unificado entre plataformas"""
    symbol: str
    timestamp: datetime
    bid: float
    ask: float
    last: float
    volume: int
    source: str

@dataclass
class UnifiedBar:
    """Barra OHLCV unificada"""
    symbol: str
    timestamp: datetime
    open: float
    high: float
    low: float
    close: float
    volume: int
    source: str

class DataProvider(ABC):
    """Clase base para proveedores de datos"""
    
    @abstractmethod
    async def get_quote(self, symbol: str) -> Optional[UnifiedQuote]:
        pass
    
    @abstractmethod
    async def get_historical_data(self, symbol: str, start_date: str, 
                                end_date: str, interval: str) -> pd.DataFrame:
        pass
    
    @abstractmethod
    def is_available(self) -> bool:
        pass

class YahooFinanceProvider(DataProvider):
    """Proveedor Yahoo Finance"""
    
    def __init__(self):
        self.name = "yahoo"
        self.session = requests.Session()
    
    async def get_quote(self, symbol: str) -> Optional[UnifiedQuote]:
        """Obtener quote en tiempo real"""
        try:
            ticker = yf.Ticker(symbol)
            info = ticker.info
            
            # Yahoo no siempre tiene bid/ask en tiempo real
            last_price = info.get('regularMarketPrice', 0)
            bid = info.get('bid', last_price * 0.999)
            ask = info.get('ask', last_price * 1.001)
            volume = info.get('regularMarketVolume', 0)
            
            return UnifiedQuote(
                symbol=symbol,
                timestamp=datetime.now(),
                bid=bid,
                ask=ask,
                last=last_price,
                volume=volume,
                source=self.name
            )
        except Exception as e:
            logging.error(f"Yahoo Finance error for {symbol}: {e}")
            return None
    
    async def get_historical_data(self, symbol: str, start_date: str, 
                                end_date: str, interval: str = "1d") -> pd.DataFrame:
        """Obtener datos históricos"""
        try:
            ticker = yf.Ticker(symbol)
            data = ticker.history(start=start_date, end=end_date, interval=interval)
            
            if not data.empty:
                # Agregar metadatos
                data['source'] = self.name
                data['symbol'] = symbol
            
            return data
        except Exception as e:
            logging.error(f"Yahoo Finance historical error for {symbol}: {e}")
            return pd.DataFrame()
    
    def is_available(self) -> bool:
        """Verificar disponibilidad"""
        try:
            test_ticker = yf.Ticker("AAPL")
            info = test_ticker.info
            return 'regularMarketPrice' in info
        except:
            return False

class PolygonProvider(DataProvider):
    """Proveedor Polygon.io"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.name = "polygon"
        self.base_url = "https://api.polygon.io"
        self.session = aiohttp.ClientSession()
    
    async def get_quote(self, symbol: str) -> Optional[UnifiedQuote]:
        """Obtener quote en tiempo real"""
        try:
            url = f"{self.base_url}/v2/last/nbbo/{symbol}"
            params = {"apikey": self.api_key}
            
            async with self.session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    
                    if data.get('status') == 'OK' and 'results' in data:
                        result = data['results']
                        
                        return UnifiedQuote(
                            symbol=symbol,
                            timestamp=datetime.fromtimestamp(result['t'] / 1000),
                            bid=result.get('P', 0),
                            ask=result.get('p', 0),
                            last=result.get('p', 0),  # Usar ask como aproximación
                            volume=result.get('S', 0),
                            source=self.name
                        )
        except Exception as e:
            logging.error(f"Polygon error for {symbol}: {e}")
        
        return None
    
    async def get_historical_data(self, symbol: str, start_date: str, 
                                end_date: str, interval: str = "1d") -> pd.DataFrame:
        """Obtener datos históricos"""
        try:
            # Convertir intervalo a formato Polygon
            timespan_map = {
                "1m": ("minute", 1),
                "5m": ("minute", 5),
                "1h": ("hour", 1),
                "1d": ("day", 1)
            }
            
            if interval not in timespan_map:
                interval = "1d"
            
            timespan, multiplier = timespan_map[interval]
            
            url = f"{self.base_url}/v2/aggs/ticker/{symbol}/range/{multiplier}/{timespan}/{start_date}/{end_date}"
            params = {
                "apikey": self.api_key,
                "adjusted": "true",
                "sort": "asc"
            }
            
            async with self.session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    
                    if data.get('status') == 'OK' and 'results' in data:
                        results = data['results']
                        
                        df_data = []
                        for bar in results:
                            df_data.append({
                                'timestamp': pd.to_datetime(bar['t'], unit='ms'),
                                'open': bar['o'],
                                'high': bar['h'],
                                'low': bar['l'],
                                'close': bar['c'],
                                'volume': bar['v']
                            })
                        
                        df = pd.DataFrame(df_data)
                        if not df.empty:
                            df.set_index('timestamp', inplace=True)
                            df['source'] = self.name
                            df['symbol'] = symbol
                        
                        return df
        except Exception as e:
            logging.error(f"Polygon historical error for {symbol}: {e}")
        
        return pd.DataFrame()
    
    def is_available(self) -> bool:
        """Verificar disponibilidad (simplificado)"""
        return bool(self.api_key)
    
    async def close(self):
        """Cerrar sesión"""
        await self.session.close()

class UnifiedDataManager:
    """Gestor unificado de datos con fallback"""
    
    def __init__(self):
        self.providers: Dict[str, DataProvider] = {}
        self.provider_priority = []
        self.cache = {}
        self.cache_ttl = 60  # 60 segundos
    
    def add_provider(self, provider: DataProvider, priority: int = 0):
        """Agregar proveedor con prioridad"""
        self.providers[provider.name] = provider
        
        # Insertar en orden de prioridad
        inserted = False
        for i, (name, prio) in enumerate(self.provider_priority):
            if priority > prio:
                self.provider_priority.insert(i, (provider.name, priority))
                inserted = True
                break
        
        if not inserted:
            self.provider_priority.append((provider.name, priority))
    
    async def get_quote(self, symbol: str) -> Optional[UnifiedQuote]:
        """Obtener quote con fallback automático"""
        
        # Verificar cache
        cache_key = f"quote_{symbol}"
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if (datetime.now() - timestamp).seconds < self.cache_ttl:
                return cached_data
        
        # Intentar proveedores en orden de prioridad
        for provider_name, _ in self.provider_priority:
            provider = self.providers.get(provider_name)
            
            if provider and provider.is_available():
                try:
                    quote = await provider.get_quote(symbol)
                    if quote:
                        # Guardar en cache
                        self.cache[cache_key] = (quote, datetime.now())
                        logging.info(f"Quote for {symbol} from {provider_name}")
                        return quote
                except Exception as e:
                    logging.warning(f"Provider {provider_name} failed: {e}")
                    continue
        
        logging.error(f"No provider could fetch quote for {symbol}")
        return None
    
    async def get_historical_data(self, symbol: str, start_date: str, 
                                end_date: str, interval: str = "1d") -> pd.DataFrame:
        """Obtener datos históricos con fallback"""
        
        # Intentar proveedores en orden de prioridad
        for provider_name, _ in self.provider_priority:
            provider = self.providers.get(provider_name)
            
            if provider and provider.is_available():
                try:
                    data = await provider.get_historical_data(symbol, start_date, end_date, interval)
                    if not data.empty:
                        logging.info(f"Historical data for {symbol} from {provider_name}")
                        return data
                except Exception as e:
                    logging.warning(f"Provider {provider_name} failed: {e}")
                    continue
        
        logging.error(f"No provider could fetch historical data for {symbol}")
        return pd.DataFrame()
    
    def get_provider_status(self) -> Dict:
        """Obtener estado de todos los proveedores"""
        status = {}
        for name, provider in self.providers.items():
            status[name] = {
                'available': provider.is_available(),
                'priority': next((p for pname, p in self.provider_priority if pname == name), 0)
            }
        return status

# Demo del sistema unificado
async def demo_unified_data_system():
    """Demo del sistema unificado de datos"""
    
    print("🔄 Inicializando sistema unificado de datos...")
    
    # Crear manager
    data_manager = UnifiedDataManager()
    
    # Agregar proveedores (Yahoo como backup, Polygon como primario)
    yahoo_provider = YahooFinanceProvider()
    data_manager.add_provider(yahoo_provider, priority=1)
    
    # Polygon solo si tenemos API key
    polygon_api_key = "YOUR_POLYGON_API_KEY"  # Reemplazar con tu API key
    if polygon_api_key != "YOUR_POLYGON_API_KEY":
        polygon_provider = PolygonProvider(polygon_api_key)
        data_manager.add_provider(polygon_provider, priority=2)
    
    # Verificar estado
    status = data_manager.get_provider_status()
    print("📊 Estado de proveedores:")
    for name, info in status.items():
        print(f"  {name}: {'✅' if info['available'] else '❌'} (prioridad: {info['priority']})")
    
    # Obtener quotes
    symbols = ["AAPL", "TSLA", "NVDA"]
    
    print(f"\n💰 Obteniendo quotes...")
    for symbol in symbols:
        quote = await data_manager.get_quote(symbol)
        if quote:
            print(f"  {symbol}: ${quote.last:.2f} (bid: ${quote.bid:.2f}, ask: ${quote.ask:.2f}) [{quote.source}]")
        else:
            print(f"  {symbol}: ❌ No disponible")
    
    # Obtener datos históricos
    print(f"\n📈 Obteniendo datos históricos...")
    start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
    end_date = datetime.now().strftime('%Y-%m-%d')
    
    historical_data = await data_manager.get_historical_data("AAPL", start_date, end_date)
    if not historical_data.empty:
        print(f"  AAPL: {len(historical_data)} días de datos [{historical_data['source'].iloc[0]}]")
        print(f"  Rango: ${historical_data['Low'].min():.2f} - ${historical_data['High'].max():.2f}")
    
    # Cleanup
    for provider in data_manager.providers.values():
        if hasattr(provider, 'close'):
            await provider.close()

# Ejecutar demo
if __name__ == "__main__":
    asyncio.run(demo_unified_data_system())

Integración con Interactive Brokers TWS

Conexión y Ejecución de Órdenes

from ib_insync import IB, Stock, MarketOrder, LimitOrder, Contract
import pandas as pd
from typing import Dict, List, Optional
import asyncio

class IBKRIntegration:
    """Integración con Interactive Brokers TWS"""
    
    def __init__(self, host: str = "127.0.0.1", port: int = 7497, client_id: int = 1):
        self.ib = IB()
        self.host = host
        self.port = port
        self.client_id = client_id
        self.connected = False
        
        # Configurar callbacks
        self.ib.orderStatusEvent += self._on_order_status
        self.ib.openOrderEvent += self._on_open_order
        self.ib.execDetailsEvent += self._on_execution
    
    async def connect(self) -> bool:
        """Conectar a TWS/Gateway"""
        try:
            await self.ib.connectAsync(self.host, self.port, clientId=self.client_id)
            self.connected = True
            print(f"✅ Conectado a IBKR TWS en {self.host}:{self.port}")
            return True
        except Exception as e:
            print(f"❌ Error conectando a IBKR: {e}")
            return False
    
    def disconnect(self):
        """Desconectar de TWS"""
        if self.connected:
            self.ib.disconnect()
            self.connected = False
            print("📴 Desconectado de IBKR TWS")
    
    async def get_account_info(self) -> Dict:
        """Obtener información de cuenta"""
        if not self.connected:
            return {}
        
        account_values = self.ib.accountValues()
        portfolio = self.ib.portfolio()
        positions = self.ib.positions()
        
        # Procesar valores de cuenta
        account_info = {}
        for av in account_values:
            if av.tag in ['NetLiquidation', 'TotalCashValue', 'BuyingPower']:
                account_info[av.tag] = float(av.value)
        
        # Procesar posiciones
        position_info = []
        for pos in positions:
            position_info.append({
                'symbol': pos.contract.symbol,
                'position': pos.position,
                'market_price': pos.marketPrice,
                'market_value': pos.marketValue,
                'avg_cost': pos.averageCost,
                'unrealized_pnl': pos.unrealizedPNL
            })
        
        return {
            'account_values': account_info,
            'positions': position_info,
            'portfolio_items': len(portfolio)
        }
    
    async def get_market_data(self, symbol: str) -> Optional[Dict]:
        """Obtener datos de mercado en tiempo real"""
        if not self.connected:
            return None
        
        try:
            contract = Stock(symbol, 'SMART', 'USD')
            
            # Solicitar datos de mercado
            ticker = self.ib.reqMktData(contract, '', False, False)
            
            # Esperar datos
            await asyncio.sleep(2)
            
            if ticker.bid and ticker.ask:
                return {
                    'symbol': symbol,
                    'bid': ticker.bid,
                    'ask': ticker.ask,
                    'last': ticker.last,
                    'volume': ticker.volume,
                    'timestamp': datetime.now()
                }
        except Exception as e:
            print(f"Error obteniendo datos de {symbol}: {e}")
        
        return None
    
    async def place_order(self, symbol: str, action: str, quantity: int, 
                         order_type: str = "MKT", limit_price: float = None) -> Optional[int]:
        """Colocar orden"""
        if not self.connected:
            return None
        
        try:
            contract = Stock(symbol, 'SMART', 'USD')
            
            # Crear orden según tipo
            if order_type.upper() == "MKT":
                order = MarketOrder(action.upper(), quantity)
            elif order_type.upper() == "LMT" and limit_price:
                order = LimitOrder(action.upper(), quantity, limit_price)
            else:
                print(f"Tipo de orden no soportado: {order_type}")
                return None
            
            # Colocar orden
            trade = self.ib.placeOrder(contract, order)
            
            print(f"📝 Orden colocada: {action} {quantity} {symbol} @ {order_type}")
            if limit_price:
                print(f"   Precio límite: ${limit_price:.2f}")
            
            return trade.order.orderId
            
        except Exception as e:
            print(f"Error colocando orden: {e}")
            return None
    
    def _on_order_status(self, trade):
        """Callback para cambios de estado de orden"""
        order = trade.order
        status = trade.orderStatus
        
        print(f"🔄 Orden {order.orderId}: {status.status}")
        if status.status == 'Filled':
            print(f"   ✅ Ejecutada: {status.filled} @ ${status.avgFillPrice:.2f}")
    
    def _on_open_order(self, trade):
        """Callback para órdenes abiertas"""
        order = trade.order
        print(f"📋 Orden abierta: {order.orderId} - {order.action} {order.totalQuantity} {trade.contract.symbol}")
    
    def _on_execution(self, trade, fill):
        """Callback para ejecuciones"""
        print(f"⚡ Ejecución: {fill.shares} shares @ ${fill.price:.2f}")

class IBKRDataFeed:
    """Feed de datos en tiempo real desde IBKR"""
    
    def __init__(self, ibkr_integration: IBKRIntegration):
        self.ibkr = ibkr_integration
        self.subscriptions = {}
        self.data_callbacks = []
    
    def subscribe(self, symbol: str, callback=None):
        """Suscribirse a datos de un símbolo"""
        if symbol not in self.subscriptions:
            self.subscriptions[symbol] = []
        
        if callback:
            self.subscriptions[symbol].append(callback)
    
    def add_data_callback(self, callback):
        """Agregar callback global para datos"""
        self.data_callbacks.append(callback)
    
    async def start_feed(self):
        """Iniciar feed de datos"""
        if not self.ibkr.connected:
            print("❌ IBKR no conectado")
            return
        
        print("🔄 Iniciando feed de datos...")
        
        # Suscribirse a cada símbolo
        for symbol in self.subscriptions.keys():
            try:
                contract = Stock(symbol, 'SMART', 'USD')
                ticker = self.ibkr.ib.reqMktData(contract, '', False, False)
                print(f"✅ Suscrito a {symbol}")
            except Exception as e:
                print(f"❌ Error suscribiendo a {symbol}: {e}")
        
        # Procesar datos en loop
        while True:
            try:
                for symbol in self.subscriptions.keys():
                    data = await self.ibkr.get_market_data(symbol)
                    if data:
                        # Llamar callbacks específicos del símbolo
                        for callback in self.subscriptions[symbol]:
                            callback(data)
                        
                        # Llamar callbacks globales
                        for callback in self.data_callbacks:
                            callback(data)
                
                await asyncio.sleep(1)  # Actualizar cada segundo
                
            except Exception as e:
                print(f"Error en feed: {e}")
                await asyncio.sleep(5)

# Demo de integración con IBKR
async def demo_ibkr_integration():
    """Demo de integración con IBKR"""
    
    print("🔌 Demo de integración con Interactive Brokers...")
    
    # Crear integración
    ibkr = IBKRIntegration()
    
    # Conectar (requiere TWS/Gateway ejecutándose)
    connected = await ibkr.connect()
    if not connected:
        print("❌ No se pudo conectar a TWS. Asegúrate de que esté ejecutándose.")
        return
    
    try:
        # Obtener info de cuenta
        account_info = await ibkr.get_account_info()
        print(f"\n💰 Información de cuenta:")
        for key, value in account_info.get('account_values', {}).items():
            if isinstance(value, float):
                print(f"  {key}: ${value:,.2f}")
        
        print(f"\n📊 Posiciones actuales:")
        for pos in account_info.get('positions', []):
            if pos['position'] != 0:
                print(f"  {pos['symbol']}: {pos['position']} shares @ ${pos['avg_cost']:.2f}")
                print(f"    P&L no realizado: ${pos['unrealized_pnl']:.2f}")
        
        # Obtener datos de mercado
        print(f"\n📈 Datos de mercado:")
        symbols = ["AAPL", "TSLA"]
        for symbol in symbols:
            data = await ibkr.get_market_data(symbol)
            if data:
                print(f"  {symbol}: ${data['last']:.2f} (bid: ${data['bid']:.2f}, ask: ${data['ask']:.2f})")
        
        # Demo de orden (comentado para seguridad)
        # order_id = await ibkr.place_order("AAPL", "BUY", 1, "LMT", 150.00)
        # if order_id:
        #     print(f"Orden colocada con ID: {order_id}")
        
    finally:
        # Desconectar
        ibkr.disconnect()

if __name__ == "__main__":
    asyncio.run(demo_ibkr_integration())

Integración con QuantConnect

Estrategia Híbrida Local/Cloud

# Local strategy development que se puede portar a QuantConnect
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional

class QuantConnectCompatibleStrategy:
    """Estrategia compatible con QuantConnect"""
    
    def __init__(self):
        self.name = "HybridMomentumStrategy"
        self.symbols = ["SPY", "QQQ", "IWM"]
        self.lookback_period = 20
        self.portfolio = {}
        self.universe = {}
        
        # Configuración
        self.config = {
            'rebalance_frequency': 'daily',
            'max_positions': 3,
            'risk_per_trade': 0.02,
            'momentum_threshold': 0.1
        }
    
    def initialize(self, data_manager):
        """Inicializar estrategia (compatible con QC.Initialize)"""
        self.data_manager = data_manager
        
        # Configurar universo
        for symbol in self.symbols:
            self.universe[symbol] = {
                'data': pd.DataFrame(),
                'indicators': {},
                'signals': []
            }
        
        print(f"✅ Estrategia {self.name} inicializada")
    
    async def on_data(self, data: Dict):
        """Procesar nuevos datos (compatible con QC.OnData)"""
        
        # Actualizar datos para cada símbolo
        for symbol, price_data in data.items():
            if symbol in self.universe:
                await self._update_symbol_data(symbol, price_data)
        
        # Generar señales
        signals = await self._generate_signals()
        
        # Ejecutar trades si hay señales
        if signals:
            await self._execute_signals(signals)
    
    async def _update_symbol_data(self, symbol: str, price_data: Dict):
        """Actualizar datos de símbolo"""
        
        # Crear nueva fila
        new_row = pd.DataFrame([{
            'timestamp': price_data['timestamp'],
            'open': price_data['open'],
            'high': price_data['high'],
            'low': price_data['low'],
            'close': price_data['close'],
            'volume': price_data['volume']
        }])
        
        # Agregar a datos existentes
        symbol_data = self.universe[symbol]['data']
        symbol_data = pd.concat([symbol_data, new_row], ignore_index=True)
        
        # Mantener solo los últimos N períodos
        if len(symbol_data) > self.lookback_period * 2:
            symbol_data = symbol_data.tail(self.lookback_period * 2)
        
        self.universe[symbol]['data'] = symbol_data
        
        # Actualizar indicadores
        await self._update_indicators(symbol)
    
    async def _update_indicators(self, symbol: str):
        """Actualizar indicadores técnicos"""
        
        data = self.universe[symbol]['data']
        
        if len(data) < self.lookback_period:
            return
        
        indicators = {}
        
        # RSI
        indicators['rsi'] = self._calculate_rsi(data['close'], 14)
        
        # Moving averages
        indicators['sma_20'] = data['close'].rolling(20).mean()
        indicators['ema_12'] = data['close'].ewm(span=12).mean()
        indicators['ema_26'] = data['close'].ewm(span=26).mean()
        
        # MACD
        indicators['macd'] = indicators['ema_12'] - indicators['ema_26']
        indicators['macd_signal'] = indicators['macd'].ewm(span=9).mean()
        
        # Momentum
        indicators['momentum'] = data['close'].pct_change(10)
        
        # Volatility
        indicators['volatility'] = data['close'].pct_change().rolling(20).std()
        
        self.universe[symbol]['indicators'] = indicators
    
    def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
        """Calcular RSI"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))
    
    async def _generate_signals(self) -> List[Dict]:
        """Generar señales de trading"""
        
        signals = []
        
        for symbol in self.symbols:
            symbol_info = self.universe[symbol]
            data = symbol_info['data']
            indicators = symbol_info['indicators']
            
            if len(data) < self.lookback_period:
                continue
            
            # Obtener valores actuales
            current_price = data['close'].iloc[-1]
            current_rsi = indicators['rsi'].iloc[-1]
            current_momentum = indicators['momentum'].iloc[-1]
            current_macd = indicators['macd'].iloc[-1]
            current_signal = indicators['macd_signal'].iloc[-1]
            
            # Lógica de señales
            signal_strength = 0
            signal_type = None
            
            # Momentum positivo
            if current_momentum > self.config['momentum_threshold']:
                signal_strength += 30
            
            # RSI no sobrecomprado
            if 30 < current_rsi < 70:
                signal_strength += 20
            
            # MACD bullish
            if current_macd > current_signal:
                signal_strength += 25
            
            # Price above moving average
            if current_price > indicators['sma_20'].iloc[-1]:
                signal_strength += 25
            
            # Determinar tipo de señal
            if signal_strength >= 70:
                signal_type = "BUY"
            elif signal_strength <= 30:
                signal_type = "SELL"
            
            if signal_type:
                signals.append({
                    'symbol': symbol,
                    'type': signal_type,
                    'strength': signal_strength,
                    'price': current_price,
                    'timestamp': data['timestamp'].iloc[-1],
                    'metadata': {
                        'rsi': current_rsi,
                        'momentum': current_momentum,
                        'macd': current_macd
                    }
                })
        
        return signals
    
    async def _execute_signals(self, signals: List[Dict]):
        """Ejecutar señales (placeholder para integración real)"""
        
        for signal in signals:
            symbol = signal['symbol']
            signal_type = signal['type']
            strength = signal['strength']
            
            print(f"🎯 Señal {signal_type} para {symbol} (fuerza: {strength})")
            
            # Aquí se integraría con el broker real
            # await self.broker.place_order(...)
    
    def get_performance_metrics(self) -> Dict:
        """Obtener métricas de performance"""
        
        # Placeholder - en implementación real calcularía métricas reales
        return {
            'strategy_name': self.name,
            'total_signals': sum(len(info['signals']) for info in self.universe.values()),
            'symbols_tracked': len(self.symbols),
            'last_update': datetime.now()
        }

# Código para QuantConnect (archivo separado: main.py)
QUANTCONNECT_CODE = '''
# QuantConnect Strategy Implementation
from AlgorithmImports import *

class HybridMomentumAlgorithm(QCAlgorithm):
    
    def Initialize(self):
        self.SetStartDate(2023, 1, 1)
        self.SetCash(100000)
        
        # Add securities
        self.symbols = {
            self.AddEquity("SPY", Resolution.Daily).Symbol: "SPY",
            self.AddEquity("QQQ", Resolution.Daily).Symbol: "QQQ", 
            self.AddEquity("IWM", Resolution.Daily).Symbol: "IWM"
        }
        
        # Strategy parameters
        self.lookback_period = 20
        self.momentum_threshold = 0.1
        self.max_positions = 3
        
        # Indicators
        self.indicators = {}
        for symbol in self.symbols.keys():
            self.indicators[symbol] = {
                'rsi': self.RSI(symbol, 14, Resolution.Daily),
                'sma_20': self.SMA(symbol, 20, Resolution.Daily),
                'ema_12': self.EMA(symbol, 12, Resolution.Daily),
                'ema_26': self.EMA(symbol, 26, Resolution.Daily),
                'momentum': self.MOMP(symbol, 10, Resolution.Daily)
            }
        
        # Schedule rebalancing
        self.Schedule.On(
            self.DateRules.EveryDay("SPY"),
            self.TimeRules.AfterMarketOpen("SPY", 30),
            self.Rebalance
        )
    
    def OnData(self, data):
        # Data processing happens in scheduled rebalance
        pass
    
    def Rebalance(self):
        # Generate signals
        signals = self._generate_signals()
        
        # Execute trades
        self._execute_signals(signals)
    
    def _generate_signals(self):
        signals = []
        
        for symbol in self.symbols.keys():
            if not self.indicators[symbol]['rsi'].IsReady:
                continue
            
            # Get current values
            current_price = self.Securities[symbol].Price
            current_rsi = self.indicators[symbol]['rsi'].Current.Value
            current_momentum = self.indicators[symbol]['momentum'].Current.Value
            
            # Signal logic (same as local version)
            signal_strength = 0
            
            if current_momentum > self.momentum_threshold:
                signal_strength += 30
            
            if 30 < current_rsi < 70:
                signal_strength += 20
            
            if current_price > self.indicators[symbol]['sma_20'].Current.Value:
                signal_strength += 25
            
            # MACD logic
            ema_12 = self.indicators[symbol]['ema_12'].Current.Value
            ema_26 = self.indicators[symbol]['ema_26'].Current.Value
            if ema_12 > ema_26:
                signal_strength += 25
            
            if signal_strength >= 70:
                signals.append({
                    'symbol': symbol,
                    'type': 'BUY',
                    'strength': signal_strength
                })
            elif signal_strength <= 30:
                signals.append({
                    'symbol': symbol,
                    'type': 'SELL', 
                    'strength': signal_strength
                })
        
        return signals
    
    def _execute_signals(self, signals):
        # Calculate position sizing
        target_positions = len([s for s in signals if s['type'] == 'BUY'])
        if target_positions == 0:
            self.Liquidate()
            return
        
        position_size = 1.0 / target_positions
        
        # Execute buy signals
        for signal in signals:
            symbol = signal['symbol']
            
            if signal['type'] == 'BUY':
                self.SetHoldings(symbol, position_size)
                self.Debug(f"Buying {self.symbols[symbol]} with {position_size:.2%} allocation")
            elif signal['type'] == 'SELL':
                self.Liquidate(symbol)
                self.Debug(f"Selling {self.symbols[symbol]}")
'''

# Utilidad para sincronizar estrategias
class StrategySync:
    """Sincronizar estrategia entre local y QuantConnect"""
    
    def __init__(self, local_strategy: QuantConnectCompatibleStrategy):
        self.local_strategy = local_strategy
        self.qc_code_template = QUANTCONNECT_CODE
    
    def export_to_quantconnect(self, filename: str = "main.py"):
        """Exportar estrategia a formato QuantConnect"""
        
        # Personalizar código basado en configuración local
        qc_code = self.qc_code_template
        
        # Reemplazar parámetros
        qc_code = qc_code.replace(
            "self.momentum_threshold = 0.1",
            f"self.momentum_threshold = {self.local_strategy.config['momentum_threshold']}"
        )
        
        qc_code = qc_code.replace(
            "self.max_positions = 3",
            f"self.max_positions = {self.local_strategy.config['max_positions']}"
        )
        
        # Guardar archivo
        with open(filename, 'w') as f:
            f.write(qc_code)
        
        print(f"✅ Estrategia exportada a {filename}")
        print("📁 Sube este archivo a tu proyecto en QuantConnect")
    
    def backtest_locally(self, start_date: str, end_date: str):
        """Ejecutar backtest local antes de usar en QC"""
        
        print(f"🔬 Ejecutando backtest local...")
        
        # Simular datos
        symbols = self.local_strategy.symbols
        
        # Esta sería la integración con tu data manager real
        # data = await self.data_manager.get_historical_data(symbols, start_date, end_date)
        
        print(f"📊 Backtest completado para período {start_date} a {end_date}")
        
        # Retornar métricas de ejemplo
        return {
            'total_return': 0.15,
            'sharpe_ratio': 1.2,
            'max_drawdown': -0.08,
            'trades': 45
        }

# Demo de integración QuantConnect
async def demo_quantconnect_integration():
    """Demo de integración con QuantConnect"""
    
    print("🚀 Demo de integración con QuantConnect...")
    
    # Crear estrategia local
    strategy = QuantConnectCompatibleStrategy()
    
    # Simular inicialización (necesitarías tu data manager real)
    # await strategy.initialize(data_manager)
    
    # Simular algunos datos de ejemplo
    sample_data = {
        "SPY": {
            'timestamp': datetime.now(),
            'open': 400.0,
            'high': 402.0,
            'low': 399.0,
            'close': 401.5,
            'volume': 1000000
        }
    }
    
    # await strategy.on_data(sample_data)
    
    # Configurar sincronización
    sync = StrategySync(strategy)
    
    # Ejecutar backtest local
    backtest_results = sync.backtest_locally("2023-01-01", "2023-12-31")
    print(f"📈 Resultados del backtest:")
    for metric, value in backtest_results.items():
        if isinstance(value, float):
            print(f"  {metric}: {value:.2%}" if 'return' in metric or 'drawdown' in metric else f"  {metric}: {value:.2f}")
        else:
            print(f"  {metric}: {value}")
    
    # Exportar a QuantConnect
    sync.export_to_quantconnect("hybrid_momentum_strategy.py")
    
    print(f"\n📋 Próximos pasos:")
    print("1. Revisa el archivo hybrid_momentum_strategy.py")
    print("2. Sube el archivo a tu proyecto en QuantConnect")
    print("3. Ejecuta el backtest en la plataforma")
    print("4. Compara resultados con el backtest local")

if __name__ == "__main__":
    asyncio.run(demo_quantconnect_integration())

Dashboard de Monitoreo Multi-Plataforma

Sistema de Monitoreo Centralizado

import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import asyncio
from typing import Dict, List

class MultiPlatformDashboard:
    """Dashboard para monitorear múltiples plataformas"""
    
    def __init__(self):
        self.data_sources = {}
        self.trading_accounts = {}
        self.strategies = {}
        self.alerts = []
    
    def add_data_source(self, name: str, provider):
        """Agregar fuente de datos"""
        self.data_sources[name] = provider
    
    def add_trading_account(self, name: str, account_integration):
        """Agregar cuenta de trading"""
        self.trading_accounts[name] = account_integration
    
    def add_strategy(self, name: str, strategy):
        """Agregar estrategia"""
        self.strategies[name] = strategy
    
    async def get_unified_portfolio_view(self) -> Dict:
        """Obtener vista unificada del portfolio"""
        
        portfolio_summary = {
            'total_equity': 0,
            'total_pnl': 0,
            'positions': [],
            'account_breakdown': {}
        }
        
        # Agregar datos de cada cuenta
        for account_name, account in self.trading_accounts.items():
            try:
                account_info = await account.get_account_info()
                
                account_equity = account_info.get('account_values', {}).get('NetLiquidation', 0)
                portfolio_summary['total_equity'] += account_equity
                
                # Agregar posiciones
                for pos in account_info.get('positions', []):
                    if pos['position'] != 0:
                        portfolio_summary['positions'].append({
                            **pos,
                            'account': account_name
                        })
                        portfolio_summary['total_pnl'] += pos.get('unrealized_pnl', 0)
                
                portfolio_summary['account_breakdown'][account_name] = {
                    'equity': account_equity,
                    'positions_count': len([p for p in account_info.get('positions', []) if p['position'] != 0])
                }
                
            except Exception as e:
                st.error(f"Error obteniendo datos de {account_name}: {e}")
        
        return portfolio_summary
    
    async def get_data_feed_status(self) -> Dict:
        """Obtener estado de feeds de datos"""
        
        feed_status = {}
        
        for source_name, source in self.data_sources.items():
            try:
                # Test connection
                test_symbol = "AAPL"
                start_time = datetime.now()
                
                if hasattr(source, 'get_quote'):
                    quote = await source.get_quote(test_symbol)
                    success = quote is not None
                else:
                    success = source.is_available() if hasattr(source, 'is_available') else True
                
                response_time = (datetime.now() - start_time).total_seconds() * 1000
                
                feed_status[source_name] = {
                    'status': 'online' if success else 'offline',
                    'response_time_ms': response_time,
                    'last_check': datetime.now()
                }
                
            except Exception as e:
                feed_status[source_name] = {
                    'status': 'error',
                    'error': str(e),
                    'last_check': datetime.now()
                }
        
        return feed_status
    
    async def get_strategy_performance(self) -> Dict:
        """Obtener performance de estrategias"""
        
        strategy_performance = {}
        
        for strategy_name, strategy in self.strategies.items():
            try:
                if hasattr(strategy, 'get_performance_metrics'):
                    metrics = strategy.get_performance_metrics()
                    strategy_performance[strategy_name] = metrics
                else:
                    # Mock metrics para demo
                    strategy_performance[strategy_name] = {
                        'total_trades': np.random.randint(10, 100),
                        'win_rate': np.random.uniform(0.4, 0.8),
                        'profit_factor': np.random.uniform(1.0, 2.5),
                        'total_pnl': np.random.uniform(-1000, 5000)
                    }
            except Exception as e:
                strategy_performance[strategy_name] = {'error': str(e)}
        
        return strategy_performance

def create_streamlit_dashboard():
    """Crear dashboard con Streamlit"""
    
    st.set_page_config(
        page_title="Trading Multi-Platform Dashboard",
        page_icon="📊",
        layout="wide"
    )
    
    st.title("📊 Multi-Platform Trading Dashboard")
    
    # Inicializar dashboard (normalmente esto estaría en session_state)
    if 'dashboard' not in st.session_state:
        st.session_state.dashboard = MultiPlatformDashboard()
        
        # Agregar fuentes mock para demo
        st.session_state.dashboard.data_sources = {
            'Yahoo Finance': {'status': 'online'},
            'Polygon.io': {'status': 'online'},
            'IBKR TWS': {'status': 'online'}
        }
        
        st.session_state.dashboard.trading_accounts = {
            'IBKR Main': {'equity': 85000, 'positions': 3},
            'TD Ameritrade': {'equity': 25000, 'positions': 1}
        }
    
    dashboard = st.session_state.dashboard
    
    # Sidebar para controles
    with st.sidebar:
        st.header("🔧 Controles")
        
        auto_refresh = st.checkbox("Auto Refresh", value=True)
        refresh_interval = st.slider("Intervalo (seg)", 10, 300, 30)
        
        if st.button("🔄 Refresh Manual"):
            st.rerun()
        
        st.header("📡 Data Sources")
        for source, info in dashboard.data_sources.items():
            status_icon = "🟢" if info.get('status') == 'online' else "🔴"
            st.write(f"{status_icon} {source}")
    
    # Métricas principales
    col1, col2, col3, col4 = st.columns(4)
    
    # Mock data para demo
    total_equity = sum(acc['equity'] for acc in dashboard.trading_accounts.values())
    total_positions = sum(acc['positions'] for acc in dashboard.trading_accounts.values())
    daily_pnl = np.random.uniform(-2000, 3000)
    
    with col1:
        st.metric(
            "Total Equity",
            f"${total_equity:,.2f}",
            f"{daily_pnl:+.2f}"
        )
    
    with col2:
        st.metric(
            "Active Positions",
            total_positions,
            "+2"
        )
    
    with col3:
        st.metric(
            "Daily P&L",
            f"${daily_pnl:,.2f}",
            f"{daily_pnl/total_equity:.2%}"
        )
    
    with col4:
        data_sources_online = len([s for s in dashboard.data_sources.values() if s.get('status') == 'online'])
        st.metric(
            "Data Sources",
            f"{data_sources_online}/{len(dashboard.data_sources)}",
            "All Online" if data_sources_online == len(dashboard.data_sources) else "Some Offline"
        )
    
    # Gráficos principales
    col1, col2 = st.columns(2)
    
    with col1:
        st.subheader("📈 Portfolio Allocation")
        
        # Mock allocation data
        allocation_data = pd.DataFrame({
            'Account': list(dashboard.trading_accounts.keys()),
            'Value': [acc['equity'] for acc in dashboard.trading_accounts.values()]
        })
        
        fig_pie = px.pie(
            allocation_data,
            values='Value',
            names='Account',
            title="Allocation by Account"
        )
        st.plotly_chart(fig_pie, use_container_width=True)
    
    with col2:
        st.subheader("📊 Data Feed Status")
        
        # Feed status table
        feed_status_df = pd.DataFrame([
            {
                'Source': source,
                'Status': info.get('status', 'unknown'),
                'Response Time': f"{np.random.randint(50, 200)}ms"
            }
            for source, info in dashboard.data_sources.items()
        ])
        
        # Color code status
        def color_status(val):
            if val == 'online':
                return 'background-color: #90EE90'
            elif val == 'offline':
                return 'background-color: #FFB6C1'
            else:
                return 'background-color: #FFFFE0'
        
        styled_df = feed_status_df.style.applymap(color_status, subset=['Status'])
        st.dataframe(styled_df, use_container_width=True)
    
    # Posiciones actuales
    st.subheader("📋 Current Positions")
    
    # Mock positions data
    positions_data = []
    symbols = ["AAPL", "TSLA", "NVDA", "MSFT"]
    
    for i, symbol in enumerate(symbols):
        if i < total_positions:
            account = list(dashboard.trading_accounts.keys())[i % len(dashboard.trading_accounts)]
            positions_data.append({
                'Account': account,
                'Symbol': symbol,
                'Quantity': np.random.randint(10, 500),
                'Avg Price': np.random.uniform(100, 300),
                'Current Price': np.random.uniform(100, 300),
                'Unrealized P&L': np.random.uniform(-2000, 5000),
                'P&L %': np.random.uniform(-0.15, 0.25)
            })
    
    if positions_data:
        positions_df = pd.DataFrame(positions_data)
        
        # Color code P&L
        def color_pnl(val):
            if val > 0:
                return 'color: green'
            elif val < 0:
                return 'color: red'
            else:
                return 'color: black'
        
        styled_positions = positions_df.style.applymap(
            color_pnl, 
            subset=['Unrealized P&L', 'P&L %']
        ).format({
            'Avg Price': '${:.2f}',
            'Current Price': '${:.2f}',
            'Unrealized P&L': '${:.2f}',
            'P&L %': '{:.2%}'
        })
        
        st.dataframe(styled_positions, use_container_width=True)
    else:
        st.info("No open positions")
    
    # Strategy Performance
    st.subheader("🎯 Strategy Performance")
    
    col1, col2 = st.columns(2)
    
    with col1:
        # Mock strategy data
        strategy_data = {
            'Gap & Go': {'trades': 45, 'win_rate': 0.67, 'pnl': 3200},
            'VWAP Reclaim': {'trades': 32, 'win_rate': 0.59, 'pnl': 1800},
            'Momentum': {'trades': 28, 'win_rate': 0.71, 'pnl': 2400}
        }
        
        strategy_df = pd.DataFrame([
            {
                'Strategy': strategy,
                'Trades': data['trades'],
                'Win Rate': data['win_rate'],
                'Total P&L': data['pnl']
            }
            for strategy, data in strategy_data.items()
        ])
        
        st.dataframe(
            strategy_df.style.format({
                'Win Rate': '{:.1%}',
                'Total P&L': '${:.0f}'
            }),
            use_container_width=True
        )
    
    with col2:
        # Strategy P&L chart
        fig_strategy = go.Figure()
        
        for strategy, data in strategy_data.items():
            fig_strategy.add_trace(go.Bar(
                name=strategy,
                x=[strategy],
                y=[data['pnl']],
                text=f"${data['pnl']:.0f}",
                textposition='auto'
            ))
        
        fig_strategy.update_layout(
            title="Strategy P&L Comparison",
            showlegend=False,
            yaxis_title="P&L ($)"
        )
        
        st.plotly_chart(fig_strategy, use_container_width=True)
    
    # Alerts y notificaciones
    st.subheader("🚨 Alerts & Notifications")
    
    # Mock alerts
    mock_alerts = [
        {"time": "10:30 AM", "type": "INFO", "message": "AAPL gap up 3.2% with high volume"},
        {"time": "11:15 AM", "type": "WARNING", "message": "TSLA position approaching stop loss"},
        {"time": "12:00 PM", "type": "SUCCESS", "message": "NVDA target reached - position closed"},
    ]
    
    for alert in mock_alerts:
        alert_type = alert['type']
        if alert_type == "WARNING":
            st.warning(f"⚠️ {alert['time']}: {alert['message']}")
        elif alert_type == "SUCCESS":
            st.success(f"✅ {alert['time']}: {alert['message']}")
        else:
            st.info(f"ℹ️ {alert['time']}: {alert['message']}")
    
    # Auto refresh
    if auto_refresh:
        time.sleep(refresh_interval)
        st.rerun()

# Ejecutar dashboard
if __name__ == "__main__":
    create_streamlit_dashboard()

Este sistema de integración multi-plataforma proporciona una base sólida para conectar y coordinar diferentes fuentes de datos, brokers y estrategias en un workflow unificado de trading cuantitativo.