Source code for smx.graph.centrality

"""
LRC (Local Reaching Centrality) computation and cross-seed aggregation.
"""

from typing import Dict, List, Tuple

import networkx as nx
import numpy as np
import pandas as pd


[docs] def compute_lrc(graph: nx.DiGraph, predicates_df: pd.DataFrame) -> pd.DataFrame: """Compute Local Reaching Centrality (LRC) for every node of *graph*. LRC measures how well a node can reach other nodes in the graph, weighted by edge weights. Higher LRC → more central / important. Parameters ---------- graph : nx.DiGraph Directed predicate graph (e.g., from :class:`smx.graph.builder.PredicateGraphBuilder`). predicates_df : pd.DataFrame Predicate catalogue with columns ``'rule'``, ``'zone'``, ``'thresholds'``, ``'operator'``. Returns ------- pd.DataFrame Columns: ``Node``, ``Local_Reaching_Centrality``, ``Zone``, ``Threshold``, ``Operator``. Sorted descending by LRC. """ print("\nProcessing graph LRC…") if graph.number_of_nodes() == 0: return pd.DataFrame( columns=["Node", "Local_Reaching_Centrality", "Zone", "Threshold", "Operator"] ) lrc_values: Dict[str, float] = {} for node in graph.nodes(): try: lrc_values[node] = nx.local_reaching_centrality(graph, node, weight="weight") except (ZeroDivisionError, nx.NetworkXError): lrc_values[node] = 0.0 sorted_lrc = sorted(lrc_values.items(), key=lambda x: x[1], reverse=True) lrc_df = pd.DataFrame(sorted_lrc, columns=["Node", "Local_Reaching_Centrality"]) zones, thresholds, operators = [], [], [] for node in lrc_df["Node"]: if node.startswith("Class_"): zones.append(None) thresholds.append(None) operators.append(None) else: row = predicates_df[predicates_df["rule"] == node] if row.empty: zones.append("Unknown") thresholds.append(None) operators.append(None) else: zones.append(row.iloc[0]["zone"]) thresholds.append(row.iloc[0]["thresholds"]) operators.append(row.iloc[0]["operator"]) lrc_df["Zone"] = zones lrc_df["Threshold"] = thresholds lrc_df["Operator"] = operators return lrc_df
[docs] def aggregate_lrc_across_seeds( lrc_by_seed: Dict[int, pd.DataFrame], random_seeds: List[int], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Aggregate per-seed LRC DataFrames into a mean-aggregated ranking. Parameters ---------- lrc_by_seed : dict ``{seed: lrc_df}`` where each *lrc_df* is returned by :func:`compute_lrc` (must have column ``'Node'`` plus ``'Local_Reaching_Centrality'``, ``'Zone'``, ``'Threshold'``, ``'Operator'``). random_seeds : list of int Seeds to include in the aggregation (keys of *lrc_by_seed*). Returns ------- lrc_summed_df : pd.DataFrame Mean-aggregated LRC for all predicates, sorted descending. lrc_summed_unique_df : pd.DataFrame Zone-deduplicated version of *lrc_summed_df* (one row per zone), keeping the highest-ranked predicate per zone. """ frames = [lrc_by_seed[seed].copy() for seed in random_seeds if seed in lrc_by_seed] if not frames: raise ValueError("lrc_by_seed contains none of the requested seeds.") lrc_all = pd.concat(frames, ignore_index=True) lrc_summed_df = ( lrc_all.groupby("Node") .agg( Local_Reaching_Centrality=("Local_Reaching_Centrality", "mean"), Zone=("Zone", "first"), Threshold=("Threshold", "first"), Operator=("Operator", "first"), ) .reset_index() .sort_values("Local_Reaching_Centrality", ascending=False) .reset_index(drop=True) ) lrc_summed_unique_df = ( lrc_summed_df.drop_duplicates(subset=["Zone"], keep="first") .reset_index(drop=True) .sort_values("Local_Reaching_Centrality", ascending=False) .reset_index(drop=True) ) return lrc_summed_df, lrc_summed_unique_df