Arquitectura del Sistema de Trading Automatizado
Diseño de Arquitectura General
Componentes del Sistema
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from abc import ABC, abstractmethod
import asyncio
import logging
from datetime import datetime, timedelta
import json
import threading
import queue
class SystemComponent(Enum):
"""Componentes del sistema"""
DATA_MANAGER = "data_manager"
STRATEGY_ENGINE = "strategy_engine"
RISK_MANAGER = "risk_manager"
EXECUTION_ENGINE = "execution_engine"
PERFORMANCE_TRACKER = "performance_tracker"
ALERT_SYSTEM = "alert_system"
WEB_DASHBOARD = "web_dashboard"
ORDER_MANAGER = "order_manager"
class SystemState(Enum):
"""Estados del sistema"""
STARTING = "starting"
RUNNING = "running"
PAUSED = "paused"
STOPPING = "stopping"
STOPPED = "stopped"
ERROR = "error"
@dataclass
class SystemConfig:
"""Configuración del sistema"""
# Trading parameters
account_size: float = 100000
max_positions: int = 5
max_daily_loss_pct: float = 0.05
max_position_size_pct: float = 0.20
# System parameters
update_frequency_ms: int = 1000
data_retention_days: int = 30
log_level: str = "INFO"
# Strategies to run
active_strategies: List[str] = field(default_factory=list)
# Market hours
market_open_time: str = "09:30"
market_close_time: str = "16:00"
premarket_start: str = "04:00"
afterhours_end: str = "20:00"
# Broker settings
primary_broker: str = "ibkr"
backup_broker: Optional[str] = "alpaca"
# Data providers
primary_data_provider: str = "polygon"
backup_data_provider: str = "yahoo"
class TradingSystemCore:
"""Core del sistema de trading automatizado"""
def __init__(self, config: SystemConfig):
self.config = config
self.state = SystemState.STOPPED
self.components: Dict[SystemComponent, Any] = {}
self.message_queues: Dict[str, queue.Queue] = {}
self.running = False
# Setup logging
self.logger = self._setup_logging()
# Initialize message queues
self._initialize_message_queues()
# Initialize components
self._initialize_components()
def _setup_logging(self):
"""Configurar sistema de logging"""
logging.basicConfig(
level=getattr(logging, self.config.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'logs/trading_system_{datetime.now().strftime("%Y%m%d")}.log'),
logging.StreamHandler()
]
)
return logging.getLogger('TradingSystem')
def _initialize_message_queues(self):
"""Inicializar colas de mensajes"""
queue_names = [
'market_data',
'trade_signals',
'order_commands',
'risk_alerts',
'performance_updates',
'system_events'
]
for queue_name in queue_names:
self.message_queues[queue_name] = queue.Queue(maxsize=1000)
def _initialize_components(self):
"""Inicializar componentes del sistema"""
try:
# Data Manager
self.components[SystemComponent.DATA_MANAGER] = DataManager(
self.config,
self.message_queues['market_data']
)
# Strategy Engine
self.components[SystemComponent.STRATEGY_ENGINE] = StrategyEngine(
self.config,
self.message_queues['market_data'],
self.message_queues['trade_signals']
)
# Risk Manager
self.components[SystemComponent.RISK_MANAGER] = RiskManager(
self.config,
self.message_queues['trade_signals'],
self.message_queues['order_commands'],
self.message_queues['risk_alerts']
)
# Execution Engine
self.components[SystemComponent.EXECUTION_ENGINE] = ExecutionEngine(
self.config,
self.message_queues['order_commands']
)
# Performance Tracker
self.components[SystemComponent.PERFORMANCE_TRACKER] = PerformanceTracker(
self.config,
self.message_queues['performance_updates']
)
# Alert System
self.components[SystemComponent.ALERT_SYSTEM] = AlertSystem(
self.config,
self.message_queues['risk_alerts']
)
self.logger.info("All components initialized successfully")
except Exception as e:
self.logger.error(f"Error initializing components: {e}")
self.state = SystemState.ERROR
raise
async def start_system(self):
"""Iniciar sistema completo"""
try:
self.logger.info("Starting trading system...")
self.state = SystemState.STARTING
# Start all components
for component_type, component in self.components.items():
self.logger.info(f"Starting {component_type.value}...")
await component.start()
# Start main loop
self.running = True
self.state = SystemState.RUNNING
# Start monitoring loop
asyncio.create_task(self._main_system_loop())
asyncio.create_task(self._health_monitor_loop())
self.logger.info("Trading system started successfully")
except Exception as e:
self.logger.error(f"Error starting system: {e}")
self.state = SystemState.ERROR
raise
async def stop_system(self):
"""Detener sistema"""
try:
self.logger.info("Stopping trading system...")
self.state = SystemState.STOPPING
self.running = False
# Stop all components
for component_type, component in self.components.items():
self.logger.info(f"Stopping {component_type.value}...")
await component.stop()
self.state = SystemState.STOPPED
self.logger.info("Trading system stopped")
except Exception as e:
self.logger.error(f"Error stopping system: {e}")
self.state = SystemState.ERROR
async def _main_system_loop(self):
"""Loop principal del sistema"""
while self.running:
try:
# Check if market is open
if self._is_market_open():
# Process any pending system events
await self._process_system_events()
# Check component health
await self._check_component_health()
# Sleep until next iteration
await asyncio.sleep(self.config.update_frequency_ms / 1000)
except Exception as e:
self.logger.error(f"Error in main system loop: {e}")
await asyncio.sleep(1)
async def _health_monitor_loop(self):
"""Loop de monitoreo de salud del sistema"""
while self.running:
try:
# Check system health
health_status = await self._get_system_health()
# Log health status
if health_status['overall_status'] != 'healthy':
self.logger.warning(f"System health warning: {health_status}")
# Sleep for 30 seconds
await asyncio.sleep(30)
except Exception as e:
self.logger.error(f"Error in health monitor: {e}")
await asyncio.sleep(30)
def _is_market_open(self) -> bool:
"""Verificar si el mercado está abierto"""
now = datetime.now()
market_open = now.replace(
hour=int(self.config.market_open_time.split(':')[0]),
minute=int(self.config.market_open_time.split(':')[1]),
second=0, microsecond=0
)
market_close = now.replace(
hour=int(self.config.market_close_time.split(':')[0]),
minute=int(self.config.market_close_time.split(':')[1]),
second=0, microsecond=0
)
# Check if weekday and within market hours
return now.weekday() < 5 and market_open <= now <= market_close
async def _process_system_events(self):
"""Procesar eventos del sistema"""
try:
while not self.message_queues['system_events'].empty():
event = self.message_queues['system_events'].get_nowait()
await self._handle_system_event(event)
except queue.Empty:
pass
async def _handle_system_event(self, event: Dict):
"""Manejar evento del sistema"""
event_type = event.get('type')
if event_type == 'pause_trading':
await self._pause_trading()
elif event_type == 'resume_trading':
await self._resume_trading()
elif event_type == 'emergency_stop':
await self._emergency_stop()
elif event_type == 'rebalance_portfolio':
await self._rebalance_portfolio()
async def _check_component_health(self):
"""Verificar salud de componentes"""
for component_type, component in self.components.items():
try:
health = await component.get_health()
if not health['is_healthy']:
self.logger.warning(f"Component {component_type.value} is unhealthy: {health}")
except Exception as e:
self.logger.error(f"Error checking health of {component_type.value}: {e}")
async def _get_system_health(self) -> Dict:
"""Obtener estado de salud del sistema"""
health_status = {
'timestamp': datetime.now(),
'overall_status': 'healthy',
'components': {},
'issues': []
}
# Check each component
for component_type, component in self.components.items():
try:
component_health = await component.get_health()
health_status['components'][component_type.value] = component_health
if not component_health['is_healthy']:
health_status['overall_status'] = 'warning'
health_status['issues'].append(f"{component_type.value}: {component_health}")
except Exception as e:
health_status['overall_status'] = 'error'
health_status['issues'].append(f"{component_type.value}: Error - {e}")
return health_status
class BaseComponent(ABC):
"""Clase base para componentes del sistema"""
def __init__(self, config: SystemConfig, name: str):
self.config = config
self.name = name
self.logger = logging.getLogger(f'TradingSystem.{name}')
self.running = False
self.last_heartbeat = datetime.now()
@abstractmethod
async def start(self):
"""Iniciar componente"""
pass
@abstractmethod
async def stop(self):
"""Detener componente"""
pass
@abstractmethod
async def process(self):
"""Procesar lógica principal del componente"""
pass
async def get_health(self) -> Dict:
"""Obtener estado de salud del componente"""
time_since_heartbeat = datetime.now() - self.last_heartbeat
return {
'is_healthy': time_since_heartbeat < timedelta(minutes=5),
'last_heartbeat': self.last_heartbeat,
'running': self.running,
'component_name': self.name
}
def update_heartbeat(self):
"""Actualizar heartbeat"""
self.last_heartbeat = datetime.now()
class DataManager(BaseComponent):
"""Gestor de datos de mercado"""
def __init__(self, config: SystemConfig, market_data_queue: queue.Queue):
super().__init__(config, "DataManager")
self.market_data_queue = market_data_queue
self.data_providers = {}
self.subscribed_symbols = set()
async def start(self):
"""Iniciar data manager"""
self.logger.info("Starting Data Manager...")
# Initialize data providers
await self._initialize_data_providers()
# Start data processing loop
self.running = True
asyncio.create_task(self._data_processing_loop())
self.logger.info("Data Manager started")
async def stop(self):
"""Detener data manager"""
self.logger.info("Stopping Data Manager...")
self.running = False
# Disconnect from data providers
for provider in self.data_providers.values():
await provider.disconnect()
self.logger.info("Data Manager stopped")
async def process(self):
"""Procesar datos de mercado"""
# This is handled by the data processing loop
pass
async def _initialize_data_providers(self):
"""Inicializar proveedores de datos"""
# Initialize primary data provider
primary_provider = self._create_data_provider(self.config.primary_data_provider)
await primary_provider.connect()
self.data_providers['primary'] = primary_provider
# Initialize backup data provider if configured
if self.config.backup_data_provider:
backup_provider = self._create_data_provider(self.config.backup_data_provider)
await backup_provider.connect()
self.data_providers['backup'] = backup_provider
def _create_data_provider(self, provider_name: str):
"""Crear proveedor de datos"""
# Factory method to create data providers
if provider_name == "polygon":
from src.data_acquisition.polygon_provider import PolygonDataProvider
return PolygonDataProvider(self.config.polygon_api_key)
elif provider_name == "yahoo":
from src.data_acquisition.yahoo_provider import YahooDataProvider
return YahooDataProvider()
else:
raise ValueError(f"Unknown data provider: {provider_name}")
async def _data_processing_loop(self):
"""Loop de procesamiento de datos"""
while self.running:
try:
# Get data from primary provider
market_data = await self._get_market_data()
if market_data:
# Put data in queue for strategy engine
try:
self.market_data_queue.put_nowait(market_data)
except queue.Full:
self.logger.warning("Market data queue is full")
self.update_heartbeat()
await asyncio.sleep(1) # 1 second intervals
except Exception as e:
self.logger.error(f"Error in data processing loop: {e}")
await asyncio.sleep(5)
async def _get_market_data(self) -> Optional[Dict]:
"""Obtener datos de mercado"""
try:
# Try primary provider first
primary_provider = self.data_providers.get('primary')
if primary_provider:
data = await primary_provider.get_real_time_data(list(self.subscribed_symbols))
if data:
return data
# Fallback to backup provider
backup_provider = self.data_providers.get('backup')
if backup_provider:
data = await backup_provider.get_real_time_data(list(self.subscribed_symbols))
return data
except Exception as e:
self.logger.error(f"Error getting market data: {e}")
return None
def subscribe_symbol(self, symbol: str):
"""Suscribirse a datos de un símbolo"""
self.subscribed_symbols.add(symbol)
self.logger.info(f"Subscribed to {symbol}")
def unsubscribe_symbol(self, symbol: str):
"""Desuscribirse de datos de un símbolo"""
self.subscribed_symbols.discard(symbol)
self.logger.info(f"Unsubscribed from {symbol}")
class StrategyEngine(BaseComponent):
"""Motor de estrategias"""
def __init__(self, config: SystemConfig, market_data_queue: queue.Queue,
signals_queue: queue.Queue):
super().__init__(config, "StrategyEngine")
self.market_data_queue = market_data_queue
self.signals_queue = signals_queue
self.strategies = {}
self.strategy_positions = {}
async def start(self):
"""Iniciar strategy engine"""
self.logger.info("Starting Strategy Engine...")
# Load and initialize strategies
await self._load_strategies()
# Start strategy processing loop
self.running = True
asyncio.create_task(self._strategy_processing_loop())
self.logger.info("Strategy Engine started")
async def stop(self):
"""Detener strategy engine"""
self.logger.info("Stopping Strategy Engine...")
self.running = False
self.logger.info("Strategy Engine stopped")
async def process(self):
"""Procesar estrategias"""
# This is handled by the strategy processing loop
pass
async def _load_strategies(self):
"""Cargar estrategias configuradas"""
for strategy_name in self.config.active_strategies:
try:
strategy = self._create_strategy(strategy_name)
self.strategies[strategy_name] = strategy
self.strategy_positions[strategy_name] = {}
self.logger.info(f"Loaded strategy: {strategy_name}")
except Exception as e:
self.logger.error(f"Error loading strategy {strategy_name}: {e}")
def _create_strategy(self, strategy_name: str):
"""Crear instancia de estrategia"""
# Factory method to create strategies
if strategy_name == "gap_and_go":
from src.strategies.gap_and_go import GapAndGoStrategy
return GapAndGoStrategy()
elif strategy_name == "vwap_reclaim":
from src.strategies.vwap_reclaim import VWAPReclaimStrategy
return VWAPReclaimStrategy()
else:
raise ValueError(f"Unknown strategy: {strategy_name}")
async def _strategy_processing_loop(self):
"""Loop de procesamiento de estrategias"""
while self.running:
try:
# Get market data
try:
market_data = self.market_data_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(0.1)
continue
# Process data with each strategy
for strategy_name, strategy in self.strategies.items():
try:
signals = await strategy.process_market_data(market_data)
# Put signals in queue for risk manager
for signal in signals:
signal['strategy'] = strategy_name
try:
self.signals_queue.put_nowait(signal)
except queue.Full:
self.logger.warning("Signals queue is full")
except Exception as e:
self.logger.error(f"Error processing strategy {strategy_name}: {e}")
self.update_heartbeat()
except Exception as e:
self.logger.error(f"Error in strategy processing loop: {e}")
await asyncio.sleep(1)
class RiskManager(BaseComponent):
"""Gestor de riesgo"""
def __init__(self, config: SystemConfig, signals_queue: queue.Queue,
orders_queue: queue.Queue, alerts_queue: queue.Queue):
super().__init__(config, "RiskManager")
self.signals_queue = signals_queue
self.orders_queue = orders_queue
self.alerts_queue = alerts_queue
self.current_positions = {}
self.daily_pnl = 0.0
self.account_equity = config.account_size
async def start(self):
"""Iniciar risk manager"""
self.logger.info("Starting Risk Manager...")
# Start risk processing loop
self.running = True
asyncio.create_task(self._risk_processing_loop())
self.logger.info("Risk Manager started")
async def stop(self):
"""Detener risk manager"""
self.logger.info("Stopping Risk Manager...")
self.running = False
self.logger.info("Risk Manager stopped")
async def process(self):
"""Procesar gestión de riesgo"""
# This is handled by the risk processing loop
pass
async def _risk_processing_loop(self):
"""Loop de procesamiento de riesgo"""
while self.running:
try:
# Get trade signals
try:
signal = self.signals_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(0.1)
continue
# Validate signal against risk rules
if await self._validate_signal(signal):
# Convert signal to order
order = await self._create_order_from_signal(signal)
if order:
try:
self.orders_queue.put_nowait(order)
self.logger.info(f"Order created: {order}")
except queue.Full:
self.logger.warning("Orders queue is full")
else:
self.logger.info(f"Signal rejected by risk management: {signal}")
self.update_heartbeat()
except Exception as e:
self.logger.error(f"Error in risk processing loop: {e}")
await asyncio.sleep(1)
async def _validate_signal(self, signal: Dict) -> bool:
"""Validar señal contra reglas de riesgo"""
# Check daily loss limit
daily_loss_pct = self.daily_pnl / self.account_equity
if daily_loss_pct <= -self.config.max_daily_loss_pct:
await self._send_risk_alert("Daily loss limit exceeded", signal)
return False
# Check maximum positions
if len(self.current_positions) >= self.config.max_positions:
return False
# Check position size
position_value = signal.get('quantity', 0) * signal.get('price', 0)
position_pct = position_value / self.account_equity
if position_pct > self.config.max_position_size_pct:
return False
return True
async def _create_order_from_signal(self, signal: Dict) -> Optional[Dict]:
"""Crear orden desde señal"""
return {
'symbol': signal['symbol'],
'side': signal['side'],
'quantity': signal['quantity'],
'order_type': signal.get('order_type', 'market'),
'price': signal.get('price'),
'stop_price': signal.get('stop_price'),
'strategy': signal['strategy'],
'timestamp': datetime.now()
}
async def _send_risk_alert(self, message: str, data: Dict):
"""Enviar alerta de riesgo"""
alert = {
'type': 'risk_alert',
'message': message,
'data': data,
'timestamp': datetime.now()
}
try:
self.alerts_queue.put_nowait(alert)
except queue.Full:
self.logger.error("Alerts queue is full")
class ExecutionEngine(BaseComponent):
"""Motor de ejecución de órdenes"""
def __init__(self, config: SystemConfig, orders_queue: queue.Queue):
super().__init__(config, "ExecutionEngine")
self.orders_queue = orders_queue
self.brokers = {}
self.order_history = []
async def start(self):
"""Iniciar execution engine"""
self.logger.info("Starting Execution Engine...")
# Initialize brokers
await self._initialize_brokers()
# Start order processing loop
self.running = True
asyncio.create_task(self._order_processing_loop())
self.logger.info("Execution Engine started")
async def stop(self):
"""Detener execution engine"""
self.logger.info("Stopping Execution Engine...")
self.running = False
# Disconnect from brokers
for broker in self.brokers.values():
await broker.disconnect()
self.logger.info("Execution Engine stopped")
async def process(self):
"""Procesar ejecución de órdenes"""
# This is handled by the order processing loop
pass
async def _initialize_brokers(self):
"""Inicializar conexiones a brokers"""
# Initialize primary broker
primary_broker = self._create_broker(self.config.primary_broker)
await primary_broker.connect()
self.brokers['primary'] = primary_broker
# Initialize backup broker if configured
if self.config.backup_broker:
backup_broker = self._create_broker(self.config.backup_broker)
await backup_broker.connect()
self.brokers['backup'] = backup_broker
def _create_broker(self, broker_name: str):
"""Crear conexión a broker"""
if broker_name == "ibkr":
from src.execution.ibkr_broker import IBKRBroker
return IBKRBroker()
elif broker_name == "alpaca":
from src.execution.alpaca_broker import AlpacaBroker
return AlpacaBroker()
else:
raise ValueError(f"Unknown broker: {broker_name}")
async def _order_processing_loop(self):
"""Loop de procesamiento de órdenes"""
while self.running:
try:
# Get orders
try:
order = self.orders_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(0.1)
continue
# Execute order
execution_result = await self._execute_order(order)
# Log execution result
self.order_history.append({
'order': order,
'result': execution_result,
'timestamp': datetime.now()
})
self.logger.info(f"Order execution result: {execution_result}")
self.update_heartbeat()
except Exception as e:
self.logger.error(f"Error in order processing loop: {e}")
await asyncio.sleep(1)
async def _execute_order(self, order: Dict) -> Dict:
"""Ejecutar orden"""
try:
# Try primary broker first
primary_broker = self.brokers.get('primary')
if primary_broker:
result = await primary_broker.place_order(order)
if result.get('success'):
return result
# Fallback to backup broker
backup_broker = self.brokers.get('backup')
if backup_broker:
result = await backup_broker.place_order(order)
return result
return {'success': False, 'error': 'No available brokers'}
except Exception as e:
self.logger.error(f"Error executing order: {e}")
return {'success': False, 'error': str(e)}
# Demo de la arquitectura
async def demo_trading_system():
"""Demo del sistema de trading"""
# Configuración
config = SystemConfig(
account_size=100000,
max_positions=3,
max_daily_loss_pct=0.03,
active_strategies=['gap_and_go', 'vwap_reclaim'],
primary_broker='ibkr',
primary_data_provider='polygon'
)
# Crear sistema
trading_system = TradingSystemCore(config)
try:
# Iniciar sistema
await trading_system.start_system()
# Ejecutar por 30 segundos para demo
await asyncio.sleep(30)
# Obtener estado de salud
health = await trading_system._get_system_health()
print(f"System health: {health}")
finally:
# Detener sistema
await trading_system.stop_system()
if __name__ == "__main__":
asyncio.run(demo_trading_system())
Deployment y Infraestructura
Docker Configuration
# Dockerfile
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
make \
libc6-dev \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create necessary directories
RUN mkdir -p logs data/raw data/processed
# Set environment variables
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production
# Expose ports
EXPOSE 8080 8501
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python scripts/health_check.py || exit 1
# Start command
CMD ["python", "main.py"]
Docker Compose Configuration
# docker-compose.yml
version: '3.8'
services:
trading-system:
build: .
container_name: trading_system
environment:
- ENVIRONMENT=production
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
- ./data:/app/data
- ./config:/app/config
ports:
- "8080:8080"
- "8501:8501"
depends_on:
- redis
- postgres
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "scripts/health_check.py"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
container_name: trading_redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:14-alpine
container_name: trading_postgres
environment:
POSTGRES_DB: trading_db
POSTGRES_USER: trading_user
POSTGRES_PASSWORD: trading_password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
restart: unless-stopped
streamlit:
build: .
container_name: trading_dashboard
command: streamlit run dashboard/main.py --server.port=8501 --server.address=0.0.0.0
ports:
- "8501:8501"
depends_on:
- trading-system
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
container_name: trading_prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
restart: unless-stopped
grafana:
image: grafana/grafana:latest
container_name: trading_grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
depends_on:
- prometheus
restart: unless-stopped
volumes:
redis_data:
postgres_data:
prometheus_data:
grafana_data:
networks:
default:
name: trading_network
Kubernetes Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: trading-system
labels:
app: trading-system
spec:
replicas: 1
selector:
matchLabels:
app: trading-system
template:
metadata:
labels:
app: trading-system
spec:
containers:
- name: trading-system
image: trading-system:latest
ports:
- containerPort: 8080
env:
- name: ENVIRONMENT
value: "production"
- name: LOG_LEVEL
value: "INFO"
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: trading-secrets
key: database-url
volumeMounts:
- name: logs-volume
mountPath: /app/logs
- name: data-volume
mountPath: /app/data
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: logs-volume
persistentVolumeClaim:
claimName: logs-pvc
- name: data-volume
persistentVolumeClaim:
claimName: data-pvc
---
apiVersion: v1
kind: Service
metadata:
name: trading-system-service
spec:
selector:
app: trading-system
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: logs-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 50Gi
Monitoring y Observabilidad
# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Métricas de Prometheus
TRADES_TOTAL = Counter('trading_trades_total', 'Total number of trades', ['strategy', 'side', 'status'])
TRADE_DURATION = Histogram('trading_trade_duration_seconds', 'Trade duration in seconds')
ACCOUNT_EQUITY = Gauge('trading_account_equity_dollars', 'Current account equity in dollars')
POSITIONS_COUNT = Gauge('trading_positions_count', 'Number of open positions')
DAILY_PNL = Gauge('trading_daily_pnl_dollars', 'Daily P&L in dollars')
SYSTEM_ERRORS = Counter('trading_system_errors_total', 'Total system errors', ['component', 'error_type'])
class TradingMetrics:
"""Sistema de métricas para trading"""
def __init__(self, port=8000):
self.port = port
def start_metrics_server(self):
"""Iniciar servidor de métricas"""
start_http_server(self.port)
print(f"Metrics server started on port {self.port}")
def record_trade(self, strategy: str, side: str, status: str):
"""Registrar trade"""
TRADES_TOTAL.labels(strategy=strategy, side=side, status=status).inc()
def record_trade_duration(self, duration: float):
"""Registrar duración de trade"""
TRADE_DURATION.observe(duration)
def update_account_equity(self, equity: float):
"""Actualizar equity de cuenta"""
ACCOUNT_EQUITY.set(equity)
def update_positions_count(self, count: int):
"""Actualizar conteo de posiciones"""
POSITIONS_COUNT.set(count)
def update_daily_pnl(self, pnl: float):
"""Actualizar P&L diario"""
DAILY_PNL.set(pnl)
def record_system_error(self, component: str, error_type: str):
"""Registrar error de sistema"""
SYSTEM_ERRORS.labels(component=component, error_type=error_type).inc()
def measure_time(metric_name):
"""Decorator para medir tiempo de ejecución"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
TRADE_DURATION.observe(duration)
return wrapper
return decorator
# Configuración de Grafana Dashboard
GRAFANA_DASHBOARD = {
"dashboard": {
"title": "Trading System Dashboard",
"panels": [
{
"title": "Account Equity",
"type": "stat",
"targets": [
{
"expr": "trading_account_equity_dollars",
"legendFormat": "Equity"
}
]
},
{
"title": "Daily P&L",
"type": "stat",
"targets": [
{
"expr": "trading_daily_pnl_dollars",
"legendFormat": "Daily P&L"
}
]
},
{
"title": "Trade Rate",
"type": "graph",
"targets": [
{
"expr": "rate(trading_trades_total[5m])",
"legendFormat": "Trades/sec"
}
]
},
{
"title": "Open Positions",
"type": "stat",
"targets": [
{
"expr": "trading_positions_count",
"legendFormat": "Positions"
}
]
},
{
"title": "System Errors",
"type": "graph",
"targets": [
{
"expr": "rate(trading_system_errors_total[5m])",
"legendFormat": " - "
}
]
}
]
}
}
Esta arquitectura proporciona un sistema de trading robusto, escalable y observable, con capacidades de deployment tanto en contenedores como en Kubernetes, junto con monitoreo completo de métricas y salud del sistema.