@@ -12,56 +12,56 @@ class TopologyAnalyzer:
12
12
def __init__ (self , topology_data : Dict ):
13
13
"""
14
14
Initialize the analyzer with topology data.
15
-
15
+
16
16
Args:
17
17
topology_data (dict): Dictionary containing nodes and edges information
18
18
"""
19
19
self .topology_data = topology_data
20
20
self .graph = nx .DiGraph ()
21
21
self ._build_graph ()
22
-
22
+
23
23
def _build_graph (self ):
24
24
"""Build the directed graph from topology data."""
25
25
# Create a set of all node IDs for quick lookup
26
26
node_ids = {node ["id" ] for node in self .topology_data .get ("nodes" , [])}
27
-
27
+
28
28
# Add nodes with their attributes
29
29
for node in self .topology_data .get ("nodes" , []):
30
30
self .graph .add_node (node ["id" ], ** node .get ("attributes" , {}))
31
-
31
+
32
32
# Track missing nodes referenced in edges
33
33
missing_nodes = set ()
34
-
34
+
35
35
# Add edges with their attributes
36
36
for edge in self .topology_data .get ("edges" , []):
37
37
source = edge ["source" ]
38
38
target = edge ["target" ]
39
-
39
+
40
40
# Check if both source and target nodes exist
41
41
if source not in node_ids :
42
42
missing_nodes .add (source )
43
43
logger .warning (f"Edge references missing source node: { source } " )
44
44
if target not in node_ids :
45
45
missing_nodes .add (target )
46
46
logger .warning (f"Edge references missing target node: { target } " )
47
-
47
+
48
48
# Add the edge even if nodes are missing - we'll handle them as placeholder nodes
49
49
if source not in self .graph :
50
50
self .graph .add_node (source , kind = "Unknown" , name = f"missing-{ source } " )
51
51
if target not in self .graph :
52
52
self .graph .add_node (target , kind = "Unknown" , name = f"missing-{ target } " )
53
-
53
+
54
54
self .graph .add_edge (source , target , ** edge .get ("attributes" , {}))
55
-
55
+
56
56
if missing_nodes :
57
57
logger .warning (f"Found { len (missing_nodes )} nodes referenced in edges but not in nodes list" )
58
-
58
+
59
59
logger .info (f"Graph built with { self .graph .number_of_nodes ()} nodes and { self .graph .number_of_edges ()} edges" )
60
-
60
+
61
61
def _determine_position (self , node : str , subgraph : nx .DiGraph ) -> str :
62
62
in_degree = subgraph .in_degree (node )
63
63
out_degree = subgraph .out_degree (node )
64
-
64
+
65
65
# Parent: No incoming edges but has outgoing edges, OR isolated node
66
66
if in_degree == 0 :
67
67
return "parent"
@@ -71,37 +71,37 @@ def _determine_position(self, node: str, subgraph: nx.DiGraph) -> str:
71
71
# Leaf: Has incoming edges but no outgoing edges
72
72
elif in_degree > 0 and out_degree == 0 :
73
73
return "leaf"
74
-
74
+
75
75
return "parent" # Default case for any unhandled scenarios
76
-
76
+
77
77
def analyze (self ) -> List [Dict ]:
78
78
# Get weakly connected components
79
79
components = list (nx .weakly_connected_components (self .graph ))
80
80
result = []
81
-
81
+
82
82
logger .info (f"Found { len (components )} disconnected components" )
83
-
83
+
84
84
for i , component in enumerate (components , 1 ):
85
85
# Create subgraph for each component
86
86
subgraph = self .graph .subgraph (component )
87
87
logger .info (f"Processing component { i } with { len (component )} nodes and { subgraph .number_of_edges ()} edges" )
88
-
88
+
89
89
# Process nodes with position classification
90
90
nodes = []
91
91
node_positions = {} # Keep track of positions for debugging
92
92
for node in subgraph .nodes ():
93
93
position = self ._determine_position (node , subgraph )
94
94
node_positions [position ] = node_positions .get (position , 0 ) + 1
95
-
95
+
96
96
node_data = {
97
97
"id" : node ,
98
98
"attributes" : dict (subgraph .nodes [node ]),
99
99
"position" : position
100
100
}
101
101
nodes .append (node_data )
102
-
102
+
103
103
logger .info (f"Component { i } node positions: { node_positions } " )
104
-
104
+
105
105
# Process edges
106
106
edges = []
107
107
for source , target , data in subgraph .edges (data = True ):
@@ -111,7 +111,7 @@ def analyze(self) -> List[Dict]:
111
111
"attributes" : data
112
112
}
113
113
edges .append (edge_data )
114
-
114
+
115
115
# Create component data
116
116
component_data = {
117
117
"nodes" : sorted (nodes , key = lambda x : (
@@ -120,11 +120,11 @@ def analyze(self) -> List[Dict]:
120
120
)),
121
121
"edges" : edges
122
122
}
123
-
123
+
124
124
result .append (component_data )
125
-
125
+
126
126
return result
127
-
127
+
128
128
def export_to_json (self , filename : str ):
129
129
analysis_results = self .analyze ()
130
130
with open (filename , 'w' ) as f :
@@ -140,20 +140,20 @@ def main():
140
140
help = 'Output JSON file for analyzed results' )
141
141
parser .add_argument ('--debug' , action = 'store_true' ,
142
142
help = 'Enable debug logging' )
143
-
143
+
144
144
args = parser .parse_args ()
145
-
145
+
146
146
if args .debug :
147
147
logging .getLogger ().setLevel (logging .DEBUG )
148
-
148
+
149
149
try :
150
150
with open (args .input_file , 'r' ) as f :
151
151
topology_data = json .load (f )
152
-
152
+
153
153
analyzer = TopologyAnalyzer (topology_data )
154
-
154
+
155
155
analyzer .export_to_json (args .output_file )
156
-
156
+
157
157
except FileNotFoundError as e :
158
158
logger .error (f"Could not find file - { e .filename } " )
159
159
sys .exit (1 )
@@ -165,4 +165,4 @@ def main():
165
165
sys .exit (1 )
166
166
167
167
if __name__ == "__main__" :
168
- main ()
168
+ main ()
0 commit comments