Análisis de Redes en Finanzas
Introducción: De Pixeles a Empresas
Los enfoques de redes en finanzas toman prestadas técnicas del análisis de imágenes y las adaptan para entender relaciones entre activos financieros. Similar a como los filtros convolucionales detectan patrones en imágenes usando relaciones espaciales, podemos detectar patrones financieros usando relaciones económicas.
Conceptos Fundamentales
Redes Financieras vs Redes Tradicionales
Diferencias Clave:
| Aspecto | Redes de Imágenes | Redes Financieras |
|---|---|---|
| Nodos | Píxeles | Empresas/Activos |
| Conexiones | Proximidad espacial | Relaciones económicas |
| Distancia | Euclidiana | Correlación/Causalidad |
| Estabilidad | Estática | Dinámica en el tiempo |
Construcción de Redes Financieras
import networkx as nx
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
class FinancialNetworkBuilder:
def __init__(self, method='correlation'):
self.method = method
self.threshold = 0.3
def build_correlation_network(self, returns_df):
"""
Construye red basada en correlaciones de rendimientos
"""
# Calcular matriz de correlación
corr_matrix = returns_df.corr()
# Crear grafo
G = nx.Graph()
# Agregar nodos (empresas)
for company in returns_df.columns:
G.add_node(company)
# Agregar aristas basadas en correlación
for i, company1 in enumerate(returns_df.columns):
for j, company2 in enumerate(returns_df.columns[i+1:], i+1):
correlation = corr_matrix.iloc[i, j]
if abs(correlation) > self.threshold:
G.add_edge(company1, company2,
weight=abs(correlation),
correlation=correlation)
return G
def build_news_network(self, news_mentions):
"""
Construye red basada en co-menciones en noticias
"""
G = nx.Graph()
# Procesar artículos de noticias
for article in news_mentions:
companies_mentioned = article['companies']
# Crear conexiones entre empresas mencionadas juntas
for i, company1 in enumerate(companies_mentioned):
for company2 in companies_mentioned[i+1:]:
if G.has_edge(company1, company2):
G[company1][company2]['weight'] += 1
else:
G.add_edge(company1, company2, weight=1)
return G
def build_supply_chain_network(self, supply_relationships):
"""
Construye red basada en relaciones de cadena de suministro
"""
G = nx.DiGraph() # Dirigido para supplier -> customer
for relationship in supply_relationships:
supplier = relationship['supplier']
customer = relationship['customer']
importance = relationship['revenue_percentage']
G.add_edge(supplier, customer,
weight=importance,
relationship_type='supply_chain')
return G
Graph Neural Networks para Finanzas
Arquitectura Base
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv
class FinancialGNN(nn.Module):
def __init__(self, num_features, hidden_dim, num_classes, network_type='gcn'):
super().__init__()
self.network_type = network_type
if network_type == 'gcn':
self.conv1 = GCNConv(num_features, hidden_dim)
self.conv2 = GCNConv(hidden_dim, hidden_dim)
elif network_type == 'gat':
self.conv1 = GATConv(num_features, hidden_dim, heads=8, dropout=0.1)
self.conv2 = GATConv(hidden_dim * 8, hidden_dim, heads=1, dropout=0.1)
self.classifier = nn.Linear(hidden_dim, num_classes)
self.dropout = nn.Dropout(0.2)
def forward(self, x, edge_index, edge_attr=None):
# Primer layer de convolución
x = self.conv1(x, edge_index)
x = F.relu(x)
x = self.dropout(x)
# Segundo layer
x = self.conv2(x, edge_index)
x = F.relu(x)
x = self.dropout(x)
# Clasificación/Predicción
out = self.classifier(x)
return out
Aplicación: Predicción de Riesgo Sectorial
class SectorRiskPredictor:
def __init__(self, gnn_model):
self.model = gnn_model
def prepare_sector_network(self, company_data, sector_classifications):
"""
Prepara red para análisis de riesgo sectorial
"""
# Features por empresa
features = []
node_mapping = {}
for i, (company, data) in enumerate(company_data.items()):
node_mapping[company] = i
company_features = [
data['market_cap_log'],
data['pe_ratio'],
data['debt_to_equity'],
data['beta'],
data['roa'],
data['current_ratio'],
data['revenue_growth']
]
features.append(company_features)
# Crear edges basados en sector y correlaciones
edge_index = []
edge_attr = []
# Intra-sector connections
for sector, companies in sector_classifications.items():
for i, company1 in enumerate(companies):
for company2 in companies[i+1:]:
if company1 in node_mapping and company2 in node_mapping:
idx1, idx2 = node_mapping[company1], node_mapping[company2]
edge_index.extend([[idx1, idx2], [idx2, idx1]])
# Edge attributes: sector similarity, correlation
correlation = self.calculate_correlation(company1, company2)
edge_attr.extend([[1.0, correlation], [1.0, correlation]])
# Inter-sector connections (supply chain, etc.)
for connection in self.supply_chain_connections:
if connection['supplier'] in node_mapping and connection['customer'] in node_mapping:
idx1 = node_mapping[connection['supplier']]
idx2 = node_mapping[connection['customer']]
edge_index.extend([[idx1, idx2], [idx2, idx1]])
edge_attr.extend([[0.5, connection['strength']], [0.5, connection['strength']]])
return torch.tensor(features), torch.tensor(edge_index).t(), torch.tensor(edge_attr)
def predict_contagion_risk(self, shock_companies, network_data):
"""
Predice propagación de riesgo a través de la red
"""
features, edge_index, edge_attr = network_data
# Inicializar shock
shock_vector = torch.zeros(len(features))
for company in shock_companies:
if company in self.node_mapping:
shock_vector[self.node_mapping[company]] = 1.0
# Simular propagación
propagation_steps = []
current_shock = shock_vector.clone()
for step in range(10): # 10 pasos de propagación
# Aplicar GNN para predecir próximo estado
model_input = torch.cat([features, current_shock.unsqueeze(1)], dim=1)
next_shock = self.model(model_input, edge_index)
propagation_steps.append(next_shock.detach().clone())
current_shock = next_shock.squeeze()
return propagation_steps
Casos de Estudio
Caso 1: Crisis Financiera 2008 - Análisis de Red
def analyze_2008_crisis_network():
"""
Analiza la evolución de la red durante la crisis de 2008
"""
# Períodos de análisis
periods = {
'pre_crisis': ('2006-01-01', '2007-06-30'),
'crisis_onset': ('2007-07-01', '2008-03-31'),
'peak_crisis': ('2008-04-01', '2008-12-31'),
'recovery': ('2009-01-01', '2009-12-31')
}
network_evolution = {}
for period_name, (start_date, end_date) in periods.items():
# Datos del período
period_returns = get_returns_data(start_date, end_date)
# Construir red
network_builder = FinancialNetworkBuilder()
G = network_builder.build_correlation_network(period_returns)
# Métricas de red
metrics = {
'density': nx.density(G),
'clustering': nx.average_clustering(G),
'path_length': nx.average_shortest_path_length(G) if nx.is_connected(G) else float('inf'),
'centrality_concentration': calculate_centrality_concentration(G),
'modularity': calculate_modularity_by_sector(G)
}
network_evolution[period_name] = {
'graph': G,
'metrics': metrics,
'central_nodes': identify_central_nodes(G)
}
return network_evolution
def analyze_crisis_propagation():
"""
Análisis específico de propagación durante la crisis
"""
# Red pre-crisis vs crisis
pre_crisis_network = build_network('2006-01-01', '2007-06-30')
crisis_network = build_network('2008-01-01', '2008-12-31')
findings = {
'density_change': nx.density(crisis_network) / nx.density(pre_crisis_network),
'lehman_centrality': {
'pre_crisis': nx.betweenness_centrality(pre_crisis_network)['LEH'],
'during_crisis': nx.betweenness_centrality(crisis_network)['LEH'] if 'LEH' in crisis_network else 0
},
'financial_sector_clustering': analyze_sector_clustering(crisis_network, 'Financial'),
'contagion_paths': find_contagion_paths(crisis_network, source='LEH')
}
return findings
Hallazgos Clave:
- La red se volvió altamente conectada durante la crisis
- Lehman Brothers emergió como nodo central
- La modularidad sectorial desapareció (todos los sectores se correlacionaron)
- Paths de contagio fueron principalmente a través del sector financiero
Caso 2: Análisis de Sentiment en Redes
class SentimentNetworkAnalyzer:
def __init__(self):
self.sentiment_analyzer = pipeline("sentiment-analysis")
def build_sentiment_network(self, news_data, companies):
"""
Construye red basada en sentiment de noticias
"""
G = nx.Graph()
# Procesar noticias por empresa
company_sentiments = {}
for article in news_data:
mentioned_companies = self.extract_companies(article['text'], companies)
article_sentiment = self.sentiment_analyzer(article['text'])[0]
sentiment_score = article_sentiment['score'] if article_sentiment['label'] == 'POSITIVE' else -article_sentiment['score']
# Agregar sentiment a empresas mencionadas
for company in mentioned_companies:
if company not in company_sentiments:
company_sentiments[company] = []
company_sentiments[company].append(sentiment_score)
# Crear conexiones basadas en co-ocurrencia de sentiment
for article in news_data:
mentioned_companies = self.extract_companies(article['text'], companies)
if len(mentioned_companies) > 1:
article_sentiment = self.get_article_sentiment(article['text'])
for i, company1 in enumerate(mentioned_companies):
for company2 in mentioned_companies[i+1:]:
if G.has_edge(company1, company2):
# Actualizar peso basado en sentiment compartido
G[company1][company2]['sentiment_correlation'] += article_sentiment
G[company1][company2]['co_mentions'] += 1
else:
G.add_edge(company1, company2,
sentiment_correlation=article_sentiment,
co_mentions=1)
return G
def predict_sentiment_contagion(self, network, initial_sentiment_shock):
"""
Predice propagación de sentiment a través de la red
"""
# Modelo de difusión de sentiment
sentiment_states = {node: 0 for node in network.nodes()}
# Aplicar shock inicial
for company, sentiment in initial_sentiment_shock.items():
if company in sentiment_states:
sentiment_states[company] = sentiment
# Simular propagación
for iteration in range(10):
new_states = sentiment_states.copy()
for node in network.nodes():
neighbor_influence = 0
total_weight = 0
for neighbor in network.neighbors(node):
edge_data = network[node][neighbor]
weight = edge_data.get('sentiment_correlation', 0)
neighbor_influence += weight * sentiment_states[neighbor]
total_weight += abs(weight)
if total_weight > 0:
# Combinar sentiment propio con influencia de vecinos
new_states[node] = 0.7 * sentiment_states[node] + 0.3 * (neighbor_influence / total_weight)
return new_states
Modelado de Riesgo con Redes
Dynamic Risk Networks
class DynamicRiskNetwork:
def __init__(self, window_size=60):
self.window_size = window_size
self.risk_networks = {}
def build_rolling_risk_networks(self, returns_data):
"""
Construye redes de riesgo con ventanas móviles
"""
for i in range(self.window_size, len(returns_data)):
window_data = returns_data.iloc[i-self.window_size:i]
date = returns_data.index[i]
# Construir red de correlaciones
corr_network = self.build_correlation_network(window_data)
# Calcular métricas de riesgo sistémico
systemic_risk_metrics = self.calculate_systemic_risk(corr_network, window_data)
self.risk_networks[date] = {
'network': corr_network,
'systemic_risk': systemic_risk_metrics,
'network_density': nx.density(corr_network),
'avg_clustering': nx.average_clustering(corr_network)
}
def calculate_systemic_risk(self, network, returns_data):
"""
Calcula métricas de riesgo sistémico
"""
# 1. Network density como proxy de contagio
density = nx.density(network)
# 2. Concentration de centralidad
centrality = nx.eigenvector_centrality(network)
centrality_concentration = np.std(list(centrality.values()))
# 3. CoVaR (Conditional Value at Risk)
covar_matrix = self.calculate_covar_matrix(returns_data)
# 4. Network CoVaR
network_covar = self.calculate_network_covar(network, covar_matrix)
return {
'network_density': density,
'centrality_concentration': centrality_concentration,
'average_covar': np.mean(covar_matrix),
'network_covar': network_covar,
'systemic_risk_score': self.aggregate_risk_score(density, centrality_concentration, network_covar)
}
def calculate_covar_matrix(self, returns_data, quantile=0.05):
"""
Calcula Conditional Value at Risk entre pares de activos
"""
companies = returns_data.columns
covar_matrix = np.zeros((len(companies), len(companies)))
for i, company1 in enumerate(companies):
for j, company2 in enumerate(companies):
if i != j:
# VaR de company2
var_company2 = np.quantile(returns_data[company2], quantile)
# Rendimientos de company1 cuando company2 está en stress
stress_condition = returns_data[company2] <= var_company2
conditional_returns = returns_data[company1][stress_condition]
# CoVaR
if len(conditional_returns) > 0:
covar = np.quantile(conditional_returns, quantile)
covar_matrix[i, j] = covar
return covar_matrix
Stress Testing con Redes
class NetworkStressTester:
def __init__(self, risk_network):
self.network = risk_network
def node_removal_stress_test(self, removal_strategy='centrality'):
"""
Test de estrés removiendo nodos críticos
"""
original_network = self.network.copy()
stress_results = {}
if removal_strategy == 'centrality':
# Remover nodos por orden de centralidad
centrality = nx.betweenness_centrality(original_network)
nodes_to_remove = sorted(centrality.keys(), key=lambda x: centrality[x], reverse=True)
elif removal_strategy == 'random':
nodes_to_remove = list(original_network.nodes())
np.random.shuffle(nodes_to_remove)
for i, node in enumerate(nodes_to_remove[:10]): # Top 10 nodos
test_network = original_network.copy()
test_network.remove_node(node)
# Métricas post-remoción
if nx.is_connected(test_network):
avg_path_length = nx.average_shortest_path_length(test_network)
efficiency = nx.global_efficiency(test_network)
else:
# Red fragmentada
largest_component = max(nx.connected_components(test_network), key=len)
subgraph = test_network.subgraph(largest_component)
avg_path_length = nx.average_shortest_path_length(subgraph)
efficiency = len(largest_component) / len(original_network)
stress_results[node] = {
'removed_order': i + 1,
'avg_path_length': avg_path_length,
'network_efficiency': efficiency,
'largest_component_size': len(largest_component) if not nx.is_connected(test_network) else len(test_network),
'fragmentation_impact': 1 - (len(largest_component) / len(original_network)) if not nx.is_connected(test_network) else 0
}
return stress_results
def cascading_failure_simulation(self, initial_shock_nodes, failure_threshold=0.3):
"""
Simula fallos en cascada en la red
"""
network = self.network.copy()
failed_nodes = set(initial_shock_nodes)
cascade_steps = []
step = 0
while True:
step += 1
new_failures = set()
# Calcular estrés en nodos restantes
for node in network.nodes():
if node not in failed_nodes:
# Contar vecinos fallados
neighbors = list(network.neighbors(node))
failed_neighbors = len([n for n in neighbors if n in failed_nodes])
# Ratio de vecinos fallados
if len(neighbors) > 0:
failure_ratio = failed_neighbors / len(neighbors)
if failure_ratio >= failure_threshold:
new_failures.add(node)
if not new_failures:
break # No more cascading failures
failed_nodes.update(new_failures)
cascade_steps.append({
'step': step,
'new_failures': list(new_failures),
'total_failed': len(failed_nodes),
'failure_percentage': len(failed_nodes) / len(self.network.nodes())
})
# Remover nodos fallados para próxima iteración
network.remove_nodes_from(new_failures)
return {
'total_steps': step,
'cascade_progression': cascade_steps,
'final_failure_rate': len(failed_nodes) / len(self.network.nodes()),
'surviving_nodes': len(self.network.nodes()) - len(failed_nodes)
}
Aplicaciones Prácticas
1. Portfolio Construction
class NetworkAwarePortfolio:
def __init__(self, returns_data, risk_network):
self.returns_data = returns_data
self.network = risk_network
def optimize_network_diversification(self, target_return=0.10):
"""
Optimización de portfolio considerando estructura de red
"""
from scipy.optimize import minimize
n_assets = len(self.returns_data.columns)
expected_returns = self.returns_data.mean()
cov_matrix = self.returns_data.cov()
# Función objetivo: minimizar riesgo + penalización por concentración de red
def objective(weights):
portfolio_var = np.dot(weights.T, np.dot(cov_matrix, weights))
network_concentration_penalty = self.calculate_network_concentration(weights)
return portfolio_var + 0.1 * network_concentration_penalty
# Restricciones
constraints = [
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, # Suma = 1
{'type': 'ineq', 'fun': lambda x: np.dot(expected_returns, x) - target_return} # Return target
]
bounds = [(0, 0.1) for _ in range(n_assets)] # Max 10% per asset
# Optimización
result = minimize(
objective,
x0=np.ones(n_assets) / n_assets,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
return result.x
def calculate_network_concentration(self, weights):
"""
Penaliza concentración en nodos centrales de la red
"""
centrality = nx.eigenvector_centrality(self.network)
companies = self.returns_data.columns
concentration_penalty = 0
for i, company in enumerate(companies):
if company in centrality:
# Penalizar más peso en nodos más centrales
concentration_penalty += weights[i] * centrality[company]
return concentration_penalty
2. Risk Management
class NetworkRiskManager:
def __init__(self, portfolio_weights, risk_network):
self.weights = portfolio_weights
self.network = risk_network
def calculate_network_var(self, confidence_level=0.95, holding_period=1):
"""
Calcula VaR considerando efectos de red
"""
# VaR tradicional
portfolio_returns = self.calculate_portfolio_returns()
traditional_var = np.quantile(portfolio_returns, 1 - confidence_level)
# Ajuste por concentración de red
network_concentration = self.calculate_portfolio_network_exposure()
concentration_multiplier = 1 + 0.5 * network_concentration # Hasta 50% de aumento
# VaR ajustado por red
network_adjusted_var = traditional_var * concentration_multiplier
return {
'traditional_var': traditional_var,
'network_adjusted_var': network_adjusted_var,
'network_concentration': network_concentration,
'concentration_multiplier': concentration_multiplier
}
def real_time_contagion_monitoring(self):
"""
Monitoreo en tiempo real de riesgo de contagio
"""
# Identificar posiciones en nodos críticos
centrality = nx.betweenness_centrality(self.network)
critical_exposures = {}
for i, company in enumerate(self.portfolio.companies):
if company in centrality and self.weights[i] > 0.05: # >5% weight
critical_exposures[company] = {
'weight': self.weights[i],
'centrality': centrality[company],
'risk_score': self.weights[i] * centrality[company]
}
# Alert si hay concentración excesiva
total_critical_exposure = sum([exp['weight'] for exp in critical_exposures.values()])
return {
'critical_exposures': critical_exposures,
'total_critical_weight': total_critical_exposure,
'alert_level': 'HIGH' if total_critical_exposure > 0.3 else 'MEDIUM' if total_critical_exposure > 0.2 else 'LOW'
}
Limitaciones y Consideraciones
Desafíos Técnicos
1. Calidad de Datos:
def validate_network_data_quality(network_data):
"""
Validación de calidad para datos de red
"""
quality_checks = {
'missing_nodes': check_missing_company_data(network_data),
'edge_stability': check_edge_temporal_stability(network_data),
'spurious_correlations': detect_spurious_connections(network_data),
'data_freshness': check_data_recency(network_data)
}
return quality_checks
2. Computational Complexity:
- Redes grandes (>1000 nodos) requieren optimizaciones
- Actualización en tiempo real challenging
- Trade-off entre precisión y velocidad
3. Model Overfitting:
- Riesgo de overfitting a patrones específicos de red
- Necesidad de validación cruzada temporal
- Robustez a cambios estructurales
Mejores Prácticas
1. Network Construction:
def robust_network_construction():
"""
Mejores prácticas para construcción de redes robustas
"""
best_practices = {
'multiple_data_sources': 'Combinar correlaciones, noticias, fundamentales',
'dynamic_thresholds': 'Ajustar thresholds basado en régimen de mercado',
'temporal_validation': 'Validar estabilidad de conexiones en el tiempo',
'sector_awareness': 'Considerar estructura sectorial conocida',
'outlier_handling': 'Detectar y manejar períodos anómalos'
}
return best_practices
2. Risk Management:
- Límites de concentración en nodos centrales
- Monitoreo continuo de métricas de red
- Stress testing regular con escenarios de contagio
- Diversificación explícita por estructura de red
El análisis de redes ofrece una perspectiva única sobre la estructura y dinámicas de los mercados financieros. Al considerar no solo los activos individuales sino también sus interconexiones, podemos desarrollar estrategias más robustas y sistemas de gestión de riesgo más efectivos.