Limpieza y Estructuración de Datos
Por Qué Importa
Garbage in, garbage out. Puedes tener la mejor estrategia del mundo, pero si tus datos están mal, tus resultados serán ficción. He perdido días debuggeando “estrategias rotas” que en realidad tenían datos corruptos.
Problemas Comunes en Trading Data
1. Missing Data (Huecos)
# Problema: Faltan barras durante horario de mercado
datetime close volume
2024-01-15 09:30:00 175.00 500000
2024-01-15 09:31:00 175.20 450000
2024-01-15 09:32:00 NaN NaN # <-- Missing!
2024-01-15 09:33:00 175.45 380000
Solución:
def fill_missing_bars(df, market_hours_only=True):
# Crear índice completo
if market_hours_only:
full_index = pd.date_range(
start=df.index[0].replace(hour=9, minute=30),
end=df.index[-1].replace(hour=16, minute=0),
freq='1min'
)
# Filtrar solo horario de mercado
full_index = full_index[(full_index.time >= pd.Timestamp('09:30').time()) &
(full_index.time <= pd.Timestamp('16:00').time())]
# Reindexar y forward fill
df = df.reindex(full_index)
df['close'] = df['close'].fillna(method='ffill')
df['volume'] = df['volume'].fillna(0) # Volumen 0 si no hubo trades
# OHLC: forward fill desde el close anterior
df['open'] = df['open'].fillna(df['close'])
df['high'] = df['high'].fillna(df['close'])
df['low'] = df['low'].fillna(df['close'])
return df
2. Outliers y Fat Fingers
# Problema: Trades erróneos que distorsionan el análisis
def detect_outliers(df, method='iqr', threshold=3):
if method == 'iqr':
# Método IQR (Interquartile Range)
Q1 = df['close'].quantile(0.25)
Q3 = df['close'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
elif method == 'zscore':
# Método Z-score
mean = df['close'].mean()
std = df['close'].std()
lower_bound = mean - threshold * std
upper_bound = mean + threshold * std
# Marcar outliers
df['is_outlier'] = (df['close'] < lower_bound) | (df['close'] > upper_bound)
# Opción 1: Eliminar
# df = df[~df['is_outlier']]
# Opción 2: Capear (mejor para trading)
df.loc[df['close'] > upper_bound, 'close'] = upper_bound
df.loc[df['close'] < lower_bound, 'close'] = lower_bound
return df
3. Ajustes por Splits y Dividendos
# Problema: AAPL hizo split 4:1, datos históricos incorrectos
def adjust_for_splits(df, splits_df):
"""
splits_df debe tener columnas: date, ratio
"""
df = df.copy()
for _, split in splits_df.iterrows():
split_date = split['date']
ratio = split['ratio']
# Ajustar precios anteriores al split
mask = df.index < split_date
df.loc[mask, ['open', 'high', 'low', 'close']] /= ratio
df.loc[mask, 'volume'] *= ratio
return df
# Usando yfinance (ya viene ajustado)
ticker = yf.Ticker('AAPL')
# auto_adjust=True ajusta automáticamente por splits y dividendos
df = ticker.history(period='2y', auto_adjust=True)
4. Timezone Hell
# Problema: Mezclar timezones = desastre
def standardize_timezone(df, target_tz='America/New_York'):
"""
Convierte todo a Eastern Time (NYSE)
"""
if df.index.tz is None:
# Asumir UTC si no tiene timezone
df.index = df.index.tz_localize('UTC')
# Convertir a Eastern
df.index = df.index.tz_convert(target_tz)
# Opcional: remover timezone para cálculos más rápidos
# df.index = df.index.tz_localize(None)
return df
5. Duplicados
# Problema: El mismo trade reportado múltiples veces
def remove_duplicates(df):
# Método 1: Duplicados exactos
df = df[~df.index.duplicated(keep='first')]
# Método 2: Duplicados en ventana temporal (más agresivo)
df = df.sort_index()
time_diff = df.index.to_series().diff()
# Eliminar si hay otro trade en < 1 segundo
mask = time_diff > pd.Timedelta('1s')
mask.iloc[0] = True # Mantener el primer trade
return df[mask]
Pipeline de Limpieza Completo
class DataCleaner:
def __init__(self, config=None):
self.config = config or self.default_config()
def default_config(self):
return {
'remove_outliers': True,
'outlier_method': 'iqr',
'outlier_threshold': 3,
'fill_missing': True,
'adjust_splits': True,
'timezone': 'America/New_York',
'validate_prices': True
}
def clean(self, df, ticker=None):
"""Pipeline completo de limpieza"""
original_len = len(df)
# 1. Validación básica
df = self.validate_basic(df)
# 2. Remover duplicados
df = df[~df.index.duplicated(keep='first')]
# 3. Timezone
if self.config['timezone']:
df = standardize_timezone(df, self.config['timezone'])
# 4. Fill missing
if self.config['fill_missing']:
df = fill_missing_bars(df)
# 5. Outliers
if self.config['remove_outliers']:
df = detect_outliers(
df,
method=self.config['outlier_method'],
threshold=self.config['outlier_threshold']
)
# 6. Validar OHLC relationships
if self.config['validate_prices']:
df = self.validate_ohlc(df)
# 7. Ajustar por splits si tenemos la info
if self.config['adjust_splits'] and ticker:
df = self.auto_adjust_splits(df, ticker)
print(f"Cleaned {ticker}: {original_len} -> {len(df)} rows")
return df
def validate_basic(self, df):
"""Validaciones básicas de sanidad"""
# Remover filas con todos NaN
df = df.dropna(how='all')
# Remover volumen negativo
df = df[df['volume'] >= 0]
# Remover precios negativos o cero
price_cols = ['open', 'high', 'low', 'close']
df = df[(df[price_cols] > 0).all(axis=1)]
return df
def validate_ohlc(self, df):
"""Asegurar que high >= low, etc."""
# High debe ser >= que open, close
df['high'] = df[['high', 'open', 'close']].max(axis=1)
# Low debe ser <= que open, close
df['low'] = df[['low', 'open', 'close']].min(axis=1)
return df
def auto_adjust_splits(self, df, ticker):
"""Auto-detectar y ajustar splits grandes"""
# Detectar cambios de precio > 40% overnight
df['prev_close'] = df['close'].shift(1)
df['overnight_change'] = df['open'] / df['prev_close']
potential_splits = df[
(df['overnight_change'] < 0.6) | # Posible split
(df['overnight_change'] > 1.4) # Posible reverse split
]
if len(potential_splits) > 0:
print(f"Warning: Potential splits detected for {ticker}")
print(potential_splits[['close', 'prev_close', 'overnight_change']])
df = df.drop(['prev_close', 'overnight_change'], axis=1)
return df
Estructuración para Analysis
Agregar Features Comunes
def add_technical_features(df):
"""Agregar indicadores comunes para análisis"""
# Returns
df['returns'] = df['close'].pct_change()
df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
# Volatilidad
df['volatility'] = df['returns'].rolling(20).std()
# Rango
df['range'] = (df['high'] - df['low']) / df['low'] * 100
df['range_avg'] = df['range'].rolling(20).mean()
# Volumen
df['volume_sma'] = df['volume'].rolling(20).mean()
df['volume_ratio'] = df['volume'] / df['volume_sma']
# VWAP
df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()
# Gaps
df['gap'] = df['open'] / df['close'].shift(1) - 1
return df
Estructura para Backtesting
class BacktestData:
def __init__(self, data, universe=None):
self.data = data
self.universe = universe or list(data.keys())
self.current_index = 0
def prepare_for_backtest(self):
"""Preparar data para backtesting eficiente"""
for ticker in self.universe:
df = self.data[ticker]
# Pre-calcular indicadores
df = add_technical_features(df)
# Crear columnas para señales
df['signal'] = 0
df['position'] = 0
df['pnl'] = 0
# Indexar por fecha para lookups rápidos
df = df.sort_index()
self.data[ticker] = df
def get_snapshot(self, date, lookback=20):
"""Obtener vista de los datos hasta cierta fecha"""
snapshot = {}
for ticker in self.universe:
df = self.data[ticker]
# Datos hasta la fecha, incluyendo lookback
mask = df.index <= date
historical = df[mask].tail(lookback)
snapshot[ticker] = historical
return snapshot
Validación Final
def validate_dataset(data_dict, start_date, end_date):
"""Validación completa del dataset antes de backtest"""
report = {
'tickers': len(data_dict),
'issues': [],
'stats': {}
}
for ticker, df in data_dict.items():
# Verificar cobertura temporal
actual_start = df.index[0]
actual_end = df.index[-1]
if actual_start > pd.Timestamp(start_date):
report['issues'].append(f"{ticker}: Data starts late ({actual_start})")
if actual_end < pd.Timestamp(end_date):
report['issues'].append(f"{ticker}: Data ends early ({actual_end})")
# Stats por ticker
report['stats'][ticker] = {
'rows': len(df),
'missing_closes': df['close'].isna().sum(),
'zero_volume_bars': (df['volume'] == 0).sum(),
'date_range': f"{actual_start} to {actual_end}"
}
return report
# Uso
report = validate_dataset(cleaned_data, '2023-01-01', '2024-01-01')
if report['issues']:
print("WARNING: Data issues found:")
for issue in report['issues']:
print(f" - {issue}")
Mi Checklist Personal
# clean_all.py
def my_cleaning_pipeline(raw_data):
steps = [
('Remove duplicates', lambda df: df[~df.index.duplicated()]),
('Fix timezone', lambda df: standardize_timezone(df)),
('Validate OHLC', lambda df: validate_ohlc(df)),
('Fill missing', lambda df: fill_missing_bars(df)),
('Remove outliers', lambda df: detect_outliers(df, method='iqr')),
('Add features', lambda df: add_technical_features(df)),
('Final validation', lambda df: validate_basic(df))
]
for step_name, step_func in steps:
print(f"Running: {step_name}")
raw_data = step_func(raw_data)
return raw_data
Storage Best Practices
# Guardar datos limpios
def save_clean_data(df, ticker, format='parquet'):
date_str = pd.Timestamp.now().strftime('%Y%m%d')
if format == 'parquet':
# Más eficiente para lectura
df.to_parquet(f'data/clean/{ticker}_{date_str}.parquet')
elif format == 'hdf':
# Mejor para datasets muy grandes
df.to_hdf(f'data/clean/{ticker}_{date_str}.h5', key='data')
elif format == 'csv':
# Compatible pero menos eficiente
df.to_csv(f'data/clean/{ticker}_{date_str}.csv')
Red Flags en tus Datos
- Demasiados gaps: Proveedor poco confiable
- Volumen errático: Posibles ajustes incorrectos
- Precios perfectamente redondos: Data sintética
- Patrones repetitivos: Posible data duplicada
- Volatilidad irreal: Outliers no detectados
Siguiente Paso
Con datos limpios, ahora podemos crear Datasets para Backtesting optimizados.