🇪🇸 Leer en Español 🇺🇸 English

Network Analysis in Finance

Introduction: From Pixels to Companies

Network approaches in finance borrow techniques from image analysis and adapt them to understand relationships between financial assets. Similar to how convolutional filters detect patterns in images using spatial relationships, we can detect financial patterns using economic relationships.

Core Concepts

Financial Networks vs Traditional Networks

Key Differences:

Aspect Image Networks Financial Networks
Nodes Pixels Companies/Assets
Connections Spatial proximity Economic relationships
Distance Euclidean Correlation/Causality
Stability Static Dynamic over time

Building Financial Networks

import networkx as nx
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

class FinancialNetworkBuilder:
    def __init__(self, method='correlation'):
        self.method = method
        self.threshold = 0.3
        
    def build_correlation_network(self, returns_df):
        """
        Build network based on return correlations
        """
        # Calculate correlation matrix
        corr_matrix = returns_df.corr()
        
        # Create graph
        G = nx.Graph()
        
        # Add nodes (companies)
        for company in returns_df.columns:
            G.add_node(company)
        
        # Add edges based on correlation
        for i, company1 in enumerate(returns_df.columns):
            for j, company2 in enumerate(returns_df.columns[i+1:], i+1):
                correlation = corr_matrix.iloc[i, j]
                
                if abs(correlation) > self.threshold:
                    G.add_edge(company1, company2, 
                              weight=abs(correlation),
                              correlation=correlation)
        
        return G
    
    def build_news_network(self, news_mentions):
        """
        Build network based on news co-mentions
        """
        G = nx.Graph()
        
        # Process news articles
        for article in news_mentions:
            companies_mentioned = article['companies']
            
            # Create connections between companies mentioned together
            for i, company1 in enumerate(companies_mentioned):
                for company2 in companies_mentioned[i+1:]:
                    if G.has_edge(company1, company2):
                        G[company1][company2]['weight'] += 1
                    else:
                        G.add_edge(company1, company2, weight=1)
        
        return G
    
    def build_supply_chain_network(self, supply_relationships):
        """
        Build network based on supply chain relationships
        """
        G = nx.DiGraph()  # Directed for supplier -> customer
        
        for relationship in supply_relationships:
            supplier = relationship['supplier']
            customer = relationship['customer']
            importance = relationship['revenue_percentage']
            
            G.add_edge(supplier, customer, 
                      weight=importance,
                      relationship_type='supply_chain')
        
        return G

Graph Neural Networks for Finance

Base Architecture

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv

class FinancialGNN(nn.Module):
    def __init__(self, num_features, hidden_dim, num_classes, network_type='gcn'):
        super().__init__()
        self.network_type = network_type
        
        if network_type == 'gcn':
            self.conv1 = GCNConv(num_features, hidden_dim)
            self.conv2 = GCNConv(hidden_dim, hidden_dim)
        elif network_type == 'gat':
            self.conv1 = GATConv(num_features, hidden_dim, heads=8, dropout=0.1)
            self.conv2 = GATConv(hidden_dim * 8, hidden_dim, heads=1, dropout=0.1)
        
        self.classifier = nn.Linear(hidden_dim, num_classes)
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x, edge_index, edge_attr=None):
        # First convolution layer
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.dropout(x)
        
        # Second layer
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.dropout(x)
        
        # Classification/Prediction
        out = self.classifier(x)
        
        return out

Application: Sector Risk Prediction

class SectorRiskPredictor:
    def __init__(self, gnn_model):
        self.model = gnn_model
        
    def prepare_sector_network(self, company_data, sector_classifications):
        """
        Prepare network for sector risk analysis
        """
        # Features per company
        features = []
        node_mapping = {}
        
        for i, (company, data) in enumerate(company_data.items()):
            node_mapping[company] = i
            
            company_features = [
                data['market_cap_log'],
                data['pe_ratio'],
                data['debt_to_equity'],
                data['beta'],
                data['roa'],
                data['current_ratio'],
                data['revenue_growth']
            ]
            features.append(company_features)
        
        # Create edges based on sector and correlations
        edge_index = []
        edge_attr = []
        
        # Intra-sector connections
        for sector, companies in sector_classifications.items():
            for i, company1 in enumerate(companies):
                for company2 in companies[i+1:]:
                    if company1 in node_mapping and company2 in node_mapping:
                        idx1, idx2 = node_mapping[company1], node_mapping[company2]
                        
                        edge_index.extend([[idx1, idx2], [idx2, idx1]])
                        
                        # Edge attributes: sector similarity, correlation
                        correlation = self.calculate_correlation(company1, company2)
                        edge_attr.extend([[1.0, correlation], [1.0, correlation]])
        
        # Inter-sector connections (supply chain, etc.)
        for connection in self.supply_chain_connections:
            if connection['supplier'] in node_mapping and connection['customer'] in node_mapping:
                idx1 = node_mapping[connection['supplier']]
                idx2 = node_mapping[connection['customer']]
                
                edge_index.extend([[idx1, idx2], [idx2, idx1]])
                edge_attr.extend([[0.5, connection['strength']], [0.5, connection['strength']]])
        
        return torch.tensor(features), torch.tensor(edge_index).t(), torch.tensor(edge_attr)
    
    def predict_contagion_risk(self, shock_companies, network_data):
        """
        Predict risk propagation through the network
        """
        features, edge_index, edge_attr = network_data
        
        # Initialize shock
        shock_vector = torch.zeros(len(features))
        for company in shock_companies:
            if company in self.node_mapping:
                shock_vector[self.node_mapping[company]] = 1.0
        
        # Simulate propagation
        propagation_steps = []
        current_shock = shock_vector.clone()
        
        for step in range(10):  # 10 propagation steps
            # Apply GNN to predict next state
            model_input = torch.cat([features, current_shock.unsqueeze(1)], dim=1)
            next_shock = self.model(model_input, edge_index)
            
            propagation_steps.append(next_shock.detach().clone())
            current_shock = next_shock.squeeze()
        
        return propagation_steps

Case Studies

Case 1: 2008 Financial Crisis - Network Analysis

def analyze_2008_crisis_network():
    """
    Analyze the network evolution during the 2008 crisis
    """
    # Analysis periods
    periods = {
        'pre_crisis': ('2006-01-01', '2007-06-30'),
        'crisis_onset': ('2007-07-01', '2008-03-31'),
        'peak_crisis': ('2008-04-01', '2008-12-31'),
        'recovery': ('2009-01-01', '2009-12-31')
    }
    
    network_evolution = {}
    
    for period_name, (start_date, end_date) in periods.items():
        # Period data
        period_returns = get_returns_data(start_date, end_date)
        
        # Build network
        network_builder = FinancialNetworkBuilder()
        G = network_builder.build_correlation_network(period_returns)
        
        # Network metrics
        metrics = {
            'density': nx.density(G),
            'clustering': nx.average_clustering(G),
            'path_length': nx.average_shortest_path_length(G) if nx.is_connected(G) else float('inf'),
            'centrality_concentration': calculate_centrality_concentration(G),
            'modularity': calculate_modularity_by_sector(G)
        }
        
        network_evolution[period_name] = {
            'graph': G,
            'metrics': metrics,
            'central_nodes': identify_central_nodes(G)
        }
    
    return network_evolution

def analyze_crisis_propagation():
    """
    Specific analysis of propagation during the crisis
    """
    # Pre-crisis vs crisis network
    pre_crisis_network = build_network('2006-01-01', '2007-06-30')
    crisis_network = build_network('2008-01-01', '2008-12-31')
    
    findings = {
        'density_change': nx.density(crisis_network) / nx.density(pre_crisis_network),
        'lehman_centrality': {
            'pre_crisis': nx.betweenness_centrality(pre_crisis_network)['LEH'],
            'during_crisis': nx.betweenness_centrality(crisis_network)['LEH'] if 'LEH' in crisis_network else 0
        },
        'financial_sector_clustering': analyze_sector_clustering(crisis_network, 'Financial'),
        'contagion_paths': find_contagion_paths(crisis_network, source='LEH')
    }
    
    return findings

Key Findings:

  • The network became highly connected during the crisis
  • Lehman Brothers emerged as a central node
  • Sector modularity disappeared (all sectors became correlated)
  • Contagion paths were primarily through the financial sector

Case 2: Sentiment Analysis in Networks

class SentimentNetworkAnalyzer:
    def __init__(self):
        self.sentiment_analyzer = pipeline("sentiment-analysis")
        
    def build_sentiment_network(self, news_data, companies):
        """
        Build network based on news sentiment
        """
        G = nx.Graph()
        
        # Process news per company
        company_sentiments = {}
        
        for article in news_data:
            mentioned_companies = self.extract_companies(article['text'], companies)
            article_sentiment = self.sentiment_analyzer(article['text'])[0]
            
            sentiment_score = article_sentiment['score'] if article_sentiment['label'] == 'POSITIVE' else -article_sentiment['score']
            
            # Add sentiment to mentioned companies
            for company in mentioned_companies:
                if company not in company_sentiments:
                    company_sentiments[company] = []
                company_sentiments[company].append(sentiment_score)
        
        # Create connections based on sentiment co-occurrence
        for article in news_data:
            mentioned_companies = self.extract_companies(article['text'], companies)
            if len(mentioned_companies) > 1:
                article_sentiment = self.get_article_sentiment(article['text'])
                
                for i, company1 in enumerate(mentioned_companies):
                    for company2 in mentioned_companies[i+1:]:
                        if G.has_edge(company1, company2):
                            # Update weight based on shared sentiment
                            G[company1][company2]['sentiment_correlation'] += article_sentiment
                            G[company1][company2]['co_mentions'] += 1
                        else:
                            G.add_edge(company1, company2, 
                                     sentiment_correlation=article_sentiment,
                                     co_mentions=1)
        
        return G
    
    def predict_sentiment_contagion(self, network, initial_sentiment_shock):
        """
        Predict sentiment propagation through the network
        """
        # Sentiment diffusion model
        sentiment_states = {node: 0 for node in network.nodes()}
        
        # Apply initial shock
        for company, sentiment in initial_sentiment_shock.items():
            if company in sentiment_states:
                sentiment_states[company] = sentiment
        
        # Simulate propagation
        for iteration in range(10):
            new_states = sentiment_states.copy()
            
            for node in network.nodes():
                neighbor_influence = 0
                total_weight = 0
                
                for neighbor in network.neighbors(node):
                    edge_data = network[node][neighbor]
                    weight = edge_data.get('sentiment_correlation', 0)
                    
                    neighbor_influence += weight * sentiment_states[neighbor]
                    total_weight += abs(weight)
                
                if total_weight > 0:
                    # Combine own sentiment with neighbor influence
                    new_states[node] = 0.7 * sentiment_states[node] + 0.3 * (neighbor_influence / total_weight)
        
        return new_states

Risk Modeling with Networks

Dynamic Risk Networks

class DynamicRiskNetwork:
    def __init__(self, window_size=60):
        self.window_size = window_size
        self.risk_networks = {}
        
    def build_rolling_risk_networks(self, returns_data):
        """
        Build risk networks with rolling windows
        """
        for i in range(self.window_size, len(returns_data)):
            window_data = returns_data.iloc[i-self.window_size:i]
            date = returns_data.index[i]
            
            # Build correlation network
            corr_network = self.build_correlation_network(window_data)
            
            # Calculate systemic risk metrics
            systemic_risk_metrics = self.calculate_systemic_risk(corr_network, window_data)
            
            self.risk_networks[date] = {
                'network': corr_network,
                'systemic_risk': systemic_risk_metrics,
                'network_density': nx.density(corr_network),
                'avg_clustering': nx.average_clustering(corr_network)
            }
    
    def calculate_systemic_risk(self, network, returns_data):
        """
        Calculate systemic risk metrics
        """
        # 1. Network density as a contagion proxy
        density = nx.density(network)
        
        # 2. Centrality concentration
        centrality = nx.eigenvector_centrality(network)
        centrality_concentration = np.std(list(centrality.values()))
        
        # 3. CoVaR (Conditional Value at Risk)
        covar_matrix = self.calculate_covar_matrix(returns_data)
        
        # 4. Network CoVaR
        network_covar = self.calculate_network_covar(network, covar_matrix)
        
        return {
            'network_density': density,
            'centrality_concentration': centrality_concentration,
            'average_covar': np.mean(covar_matrix),
            'network_covar': network_covar,
            'systemic_risk_score': self.aggregate_risk_score(density, centrality_concentration, network_covar)
        }
    
    def calculate_covar_matrix(self, returns_data, quantile=0.05):
        """
        Calculate Conditional Value at Risk between asset pairs
        """
        companies = returns_data.columns
        covar_matrix = np.zeros((len(companies), len(companies)))
        
        for i, company1 in enumerate(companies):
            for j, company2 in enumerate(companies):
                if i != j:
                    # VaR of company2
                    var_company2 = np.quantile(returns_data[company2], quantile)
                    
                    # Returns of company1 when company2 is under stress
                    stress_condition = returns_data[company2] <= var_company2
                    conditional_returns = returns_data[company1][stress_condition]
                    
                    # CoVaR
                    if len(conditional_returns) > 0:
                        covar = np.quantile(conditional_returns, quantile)
                        covar_matrix[i, j] = covar
        
        return covar_matrix

Stress Testing with Networks

class NetworkStressTester:
    def __init__(self, risk_network):
        self.network = risk_network
        
    def node_removal_stress_test(self, removal_strategy='centrality'):
        """
        Stress test by removing critical nodes
        """
        original_network = self.network.copy()
        stress_results = {}
        
        if removal_strategy == 'centrality':
            # Remove nodes by centrality order
            centrality = nx.betweenness_centrality(original_network)
            nodes_to_remove = sorted(centrality.keys(), key=lambda x: centrality[x], reverse=True)
        elif removal_strategy == 'random':
            nodes_to_remove = list(original_network.nodes())
            np.random.shuffle(nodes_to_remove)
        
        for i, node in enumerate(nodes_to_remove[:10]):  # Top 10 nodes
            test_network = original_network.copy()
            test_network.remove_node(node)
            
            # Post-removal metrics
            if nx.is_connected(test_network):
                avg_path_length = nx.average_shortest_path_length(test_network)
                efficiency = nx.global_efficiency(test_network)
            else:
                # Fragmented network
                largest_component = max(nx.connected_components(test_network), key=len)
                subgraph = test_network.subgraph(largest_component)
                avg_path_length = nx.average_shortest_path_length(subgraph)
                efficiency = len(largest_component) / len(original_network)
            
            stress_results[node] = {
                'removed_order': i + 1,
                'avg_path_length': avg_path_length,
                'network_efficiency': efficiency,
                'largest_component_size': len(largest_component) if not nx.is_connected(test_network) else len(test_network),
                'fragmentation_impact': 1 - (len(largest_component) / len(original_network)) if not nx.is_connected(test_network) else 0
            }
        
        return stress_results
    
    def cascading_failure_simulation(self, initial_shock_nodes, failure_threshold=0.3):
        """
        Simulate cascading failures in the network
        """
        network = self.network.copy()
        failed_nodes = set(initial_shock_nodes)
        cascade_steps = []
        
        step = 0
        while True:
            step += 1
            new_failures = set()
            
            # Calculate stress on remaining nodes
            for node in network.nodes():
                if node not in failed_nodes:
                    # Count failed neighbors
                    neighbors = list(network.neighbors(node))
                    failed_neighbors = len([n for n in neighbors if n in failed_nodes])
                    
                    # Failed neighbor ratio
                    if len(neighbors) > 0:
                        failure_ratio = failed_neighbors / len(neighbors)
                        
                        if failure_ratio >= failure_threshold:
                            new_failures.add(node)
            
            if not new_failures:
                break  # No more cascading failures
                
            failed_nodes.update(new_failures)
            cascade_steps.append({
                'step': step,
                'new_failures': list(new_failures),
                'total_failed': len(failed_nodes),
                'failure_percentage': len(failed_nodes) / len(self.network.nodes())
            })
            
            # Remove failed nodes for next iteration
            network.remove_nodes_from(new_failures)
        
        return {
            'total_steps': step,
            'cascade_progression': cascade_steps,
            'final_failure_rate': len(failed_nodes) / len(self.network.nodes()),
            'surviving_nodes': len(self.network.nodes()) - len(failed_nodes)
        }

Practical Applications

1. Portfolio Construction

class NetworkAwarePortfolio:
    def __init__(self, returns_data, risk_network):
        self.returns_data = returns_data
        self.network = risk_network
        
    def optimize_network_diversification(self, target_return=0.10):
        """
        Portfolio optimization considering network structure
        """
        from scipy.optimize import minimize
        
        n_assets = len(self.returns_data.columns)
        expected_returns = self.returns_data.mean()
        cov_matrix = self.returns_data.cov()
        
        # Objective function: minimize risk + network concentration penalty
        def objective(weights):
            portfolio_var = np.dot(weights.T, np.dot(cov_matrix, weights))
            network_concentration_penalty = self.calculate_network_concentration(weights)
            
            return portfolio_var + 0.1 * network_concentration_penalty
        
        # Constraints
        constraints = [
            {'type': 'eq', 'fun': lambda x: np.sum(x) - 1},  # Sum = 1
            {'type': 'ineq', 'fun': lambda x: np.dot(expected_returns, x) - target_return}  # Return target
        ]
        
        bounds = [(0, 0.1) for _ in range(n_assets)]  # Max 10% per asset
        
        # Optimization
        result = minimize(
            objective,
            x0=np.ones(n_assets) / n_assets,
            method='SLSQP',
            bounds=bounds,
            constraints=constraints
        )
        
        return result.x
    
    def calculate_network_concentration(self, weights):
        """
        Penalize concentration in central network nodes
        """
        centrality = nx.eigenvector_centrality(self.network)
        companies = self.returns_data.columns
        
        concentration_penalty = 0
        for i, company in enumerate(companies):
            if company in centrality:
                # Penalize more weight in more central nodes
                concentration_penalty += weights[i] * centrality[company]
        
        return concentration_penalty

2. Risk Management

class NetworkRiskManager:
    def __init__(self, portfolio_weights, risk_network):
        self.weights = portfolio_weights
        self.network = risk_network
        
    def calculate_network_var(self, confidence_level=0.95, holding_period=1):
        """
        Calculate VaR considering network effects
        """
        # Traditional VaR
        portfolio_returns = self.calculate_portfolio_returns()
        traditional_var = np.quantile(portfolio_returns, 1 - confidence_level)
        
        # Network concentration adjustment
        network_concentration = self.calculate_portfolio_network_exposure()
        concentration_multiplier = 1 + 0.5 * network_concentration  # Up to 50% increase
        
        # Network-adjusted VaR
        network_adjusted_var = traditional_var * concentration_multiplier
        
        return {
            'traditional_var': traditional_var,
            'network_adjusted_var': network_adjusted_var,
            'network_concentration': network_concentration,
            'concentration_multiplier': concentration_multiplier
        }
    
    def real_time_contagion_monitoring(self):
        """
        Real-time contagion risk monitoring
        """
        # Identify positions in critical nodes
        centrality = nx.betweenness_centrality(self.network)
        
        critical_exposures = {}
        for i, company in enumerate(self.portfolio.companies):
            if company in centrality and self.weights[i] > 0.05:  # >5% weight
                critical_exposures[company] = {
                    'weight': self.weights[i],
                    'centrality': centrality[company],
                    'risk_score': self.weights[i] * centrality[company]
                }
        
        # Alert if there is excessive concentration
        total_critical_exposure = sum([exp['weight'] for exp in critical_exposures.values()])
        
        return {
            'critical_exposures': critical_exposures,
            'total_critical_weight': total_critical_exposure,
            'alert_level': 'HIGH' if total_critical_exposure > 0.3 else 'MEDIUM' if total_critical_exposure > 0.2 else 'LOW'
        }

Limitations and Considerations

Technical Challenges

1. Data Quality:

def validate_network_data_quality(network_data):
    """
    Quality validation for network data
    """
    quality_checks = {
        'missing_nodes': check_missing_company_data(network_data),
        'edge_stability': check_edge_temporal_stability(network_data),
        'spurious_correlations': detect_spurious_connections(network_data),
        'data_freshness': check_data_recency(network_data)
    }
    
    return quality_checks

2. Computational Complexity:

  • Large networks (>1000 nodes) require optimizations
  • Real-time updates are challenging
  • Trade-off between accuracy and speed

3. Model Overfitting:

  • Risk of overfitting to specific network patterns
  • Need for temporal cross-validation
  • Robustness to structural changes

Best Practices

1. Network Construction:

def robust_network_construction():
    """
    Best practices for robust network construction
    """
    best_practices = {
        'multiple_data_sources': 'Combine correlations, news, fundamentals',
        'dynamic_thresholds': 'Adjust thresholds based on market regime',
        'temporal_validation': 'Validate connection stability over time',
        'sector_awareness': 'Consider known sector structure',
        'outlier_handling': 'Detect and handle anomalous periods'
    }
    return best_practices

2. Risk Management:

  • Concentration limits on central nodes
  • Continuous monitoring of network metrics
  • Regular stress testing with contagion scenarios
  • Explicit diversification by network structure

Network analysis offers a unique perspective on the structure and dynamics of financial markets. By considering not only individual assets but also their interconnections, we can develop more robust strategies and more effective risk management systems.