Skip to content

Commit 2d49942

Browse files
Add parallel cleanup (#92)
1 parent 070f2c2 commit 2d49942

File tree

3 files changed

+443
-34
lines changed

3 files changed

+443
-34
lines changed

shared/python/utils.py

Lines changed: 187 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import secrets
1616
import base64
1717
import inspect
18+
import threading
19+
from concurrent.futures import ThreadPoolExecutor, as_completed
1820
from pathlib import Path
1921

2022
from typing import Any, Optional, Tuple
@@ -31,10 +33,19 @@
3133
BOLD_G = '\x1b[1;32m' # green
3234
BOLD_R = '\x1b[1;31m' # red
3335
BOLD_Y = '\x1b[1;33m' # yellow
36+
BOLD_C = '\x1b[1;36m' # cyan
37+
BOLD_M = '\x1b[1;35m' # magenta
38+
BOLD_W = '\x1b[1;37m' # white
3439
RESET = '\x1b[0m'
3540

41+
# Thread colors for parallel operations
42+
THREAD_COLORS = [BOLD_B, BOLD_G, BOLD_Y, BOLD_C, BOLD_M, BOLD_W]
43+
3644
CONSOLE_WIDTH = 175
3745

46+
# Thread-safe print lock
47+
_print_lock = threading.Lock()
48+
3849

3950
# ------------------------------
4051
# HELPER FUNCTIONS
@@ -1212,10 +1223,111 @@ def read_policy_xml(policy_xml_filepath_or_filename: str, named_values: dict[str
12121223
return policy_template_xml
12131224

12141225

1226+
def _cleanup_resources_thread_safe(deployment_name: str, rg_name: str, thread_prefix: str, thread_color: str) -> tuple[bool, str]:
1227+
"""
1228+
Thread-safe wrapper for _cleanup_resources with formatted output.
1229+
1230+
Args:
1231+
deployment_name (str): The deployment name (string).
1232+
rg_name (str): The resource group name.
1233+
thread_prefix (str): The thread prefix for output formatting.
1234+
thread_color (str): ANSI color code for this thread.
1235+
1236+
Returns:
1237+
tuple[bool, str]: (success, error_message)
1238+
"""
1239+
try:
1240+
with _print_lock:
1241+
_print_log(f"{thread_prefix}Starting cleanup for resource group: {rg_name}", '👉🏽 ', thread_color)
1242+
1243+
# Create a modified version of _cleanup_resources that uses thread-safe printing
1244+
_cleanup_resources_with_thread_safe_printing(deployment_name, rg_name, thread_prefix, thread_color)
1245+
1246+
with _print_lock:
1247+
_print_log(f"{thread_prefix}Completed cleanup for resource group: {rg_name}", '👉🏽 ', thread_color)
1248+
1249+
return True, ""
1250+
1251+
except Exception as e:
1252+
error_msg = f'An error occurred during cleanup of {rg_name}: {str(e)}'
1253+
with _print_lock:
1254+
_print_log(f"{thread_prefix}{error_msg}", '⛔ ', BOLD_R, show_time=True)
1255+
traceback.print_exc()
1256+
return False, error_msg
1257+
1258+
1259+
def _cleanup_resources_with_thread_safe_printing(deployment_name: str, rg_name: str, thread_prefix: str, thread_color: str) -> None:
1260+
"""
1261+
Clean up resources with thread-safe printing (internal implementation for parallel execution).
1262+
This is a modified version of _cleanup_resources that uses thread-safe output.
1263+
"""
1264+
if not deployment_name:
1265+
with _print_lock:
1266+
_print_log(f"{thread_prefix}Missing deployment name parameter.", '⛔ ', BOLD_R)
1267+
return
1268+
1269+
if not rg_name:
1270+
with _print_lock:
1271+
_print_log(f"{thread_prefix}Missing resource group name parameter.", '⛔ ', BOLD_R)
1272+
return
1273+
1274+
try:
1275+
with _print_lock:
1276+
_print_log(f"{thread_prefix}Resource group : {rg_name}", '👉🏽 ', thread_color)
1277+
1278+
# Show the deployment details
1279+
output = run(f'az deployment group show --name {deployment_name} -g {rg_name} -o json', 'Deployment retrieved', 'Failed to retrieve the deployment', print_command_to_run = False)
1280+
1281+
if output.success and output.json_data:
1282+
# Delete and purge CognitiveService accounts
1283+
output = run(f' az cognitiveservices account list -g {rg_name}', f'Listed CognitiveService accounts', f'Failed to list CognitiveService accounts', print_command_to_run = False)
1284+
1285+
if output.success and output.json_data:
1286+
for resource in output.json_data:
1287+
with _print_lock:
1288+
_print_log(f"{thread_prefix}Deleting and purging Cognitive Service Account '{resource['name']}'...", '👉🏽 ', thread_color)
1289+
output = run(f"az cognitiveservices account delete -g {rg_name} -n {resource['name']}", f"Cognitive Services '{resource['name']}' deleted", f"Failed to delete Cognitive Services '{resource['name']}'", print_command_to_run = False)
1290+
output = run(f"az cognitiveservices account purge -g {rg_name} -n {resource['name']} --location \"{resource['location']}\"", f"Cognitive Services '{resource['name']}' purged", f"Failed to purge Cognitive Services '{resource['name']}'", print_command_to_run = False)
1291+
1292+
# Delete and purge APIM resources
1293+
output = run(f' az apim list -g {rg_name}', f'Listed APIM resources', f'Failed to list APIM resources', print_command_to_run = False)
1294+
1295+
if output.success and output.json_data:
1296+
for resource in output.json_data:
1297+
with _print_lock:
1298+
_print_log(f"{thread_prefix}Deleting and purging API Management '{resource['name']}'...", '👉🏽 ', thread_color)
1299+
output = run(f"az apim delete -n {resource['name']} -g {rg_name} -y", f"API Management '{resource['name']}' deleted", f"Failed to delete API Management '{resource['name']}'", print_command_to_run = False)
1300+
output = run(f"az apim deletedservice purge --service-name {resource['name']} --location \"{resource['location']}\"", f"API Management '{resource['name']}' purged", f"Failed to purge API Management '{resource['name']}'", print_command_to_run = False)
1301+
1302+
# Delete and purge Key Vault resources
1303+
output = run(f' az keyvault list -g {rg_name}', f'Listed Key Vault resources', f'Failed to list Key Vault resources', print_command_to_run = False)
1304+
1305+
if output.success and output.json_data:
1306+
for resource in output.json_data:
1307+
with _print_lock:
1308+
_print_log(f"{thread_prefix}Deleting and purging Key Vault '{resource['name']}'...", '👉🏽 ', thread_color)
1309+
output = run(f"az keyvault delete -n {resource['name']} -g {rg_name}", f"Key Vault '{resource['name']}' deleted", f"Failed to delete Key Vault '{resource['name']}'", print_command_to_run = False)
1310+
output = run(f"az keyvault purge -n {resource['name']} --location \"{resource['location']}\"", f"Key Vault '{resource['name']}' purged", f"Failed to purge Key Vault '{resource['name']}'", print_command_to_run = False)
1311+
1312+
# Delete the resource group last
1313+
with _print_lock:
1314+
_print_log(f"{thread_prefix}Deleting resource group '{rg_name}'...", 'ℹ️ ', thread_color, show_time=True)
1315+
output = run(f'az group delete --name {rg_name} -y', f"Resource group '{rg_name}' deleted', f'Failed to delete resource group '{rg_name}'", print_command_to_run = False)
1316+
1317+
with _print_lock:
1318+
_print_log(f"{thread_prefix}Cleanup completed.", 'ℹ️ ', thread_color, show_time=True)
1319+
1320+
except Exception as e:
1321+
with _print_lock:
1322+
_print_log(f"{thread_prefix}An error occurred during cleanup: {e}", '⛔ ', BOLD_R)
1323+
traceback.print_exc()
1324+
1325+
12151326
def cleanup_infra_deployments(deployment: INFRASTRUCTURE, indexes: int | list[int] | None = None) -> None:
12161327
"""
12171328
Clean up infrastructure deployments by deployment enum and index/indexes.
12181329
Obtains the infra resource group name for each index and calls the private cleanup method.
1330+
For multiple indexes, runs cleanup operations in parallel for better performance.
12191331
12201332
Args:
12211333
deployment (INFRASTRUCTURE): The infrastructure deployment enum value.
@@ -1229,13 +1341,84 @@ def cleanup_infra_deployments(deployment: INFRASTRUCTURE, indexes: int | list[in
12291341
else:
12301342
indexes_list = [indexes]
12311343

1232-
i = 1
1233-
for idx in indexes_list:
1234-
print_info(f'{i}/{len(indexes_list)}: Cleaning up resources for {deployment} - {idx}', True)
1344+
# If only one index, run sequentially (no need for threading overhead)
1345+
if len(indexes_list) <= 1:
1346+
idx = indexes_list[0] if indexes_list else None
1347+
print_info(f'Cleaning up resources for {deployment.value} - {idx}', True)
12351348
rg_name = get_infra_rg_name(deployment, idx)
12361349
_cleanup_resources(deployment.value, rg_name)
1237-
i += 1
1350+
print_ok('Cleanup completed!')
1351+
return
1352+
1353+
# For multiple indexes, run in parallel
1354+
print_info(f'Starting parallel cleanup for {len(indexes_list)} infrastructure instances', True)
1355+
print_info(f'Infrastructure: {deployment.value}')
1356+
print_info(f'Indexes: {indexes_list}')
1357+
print()
1358+
1359+
# Determine max workers (reasonable limit to avoid overwhelming the system)
1360+
max_workers = min(len(indexes_list), 4) # Cap at 4 concurrent threads
1361+
1362+
cleanup_tasks = []
1363+
for i, idx in enumerate(indexes_list):
1364+
rg_name = get_infra_rg_name(deployment, idx)
1365+
thread_color = THREAD_COLORS[i % len(THREAD_COLORS)]
1366+
thread_prefix = f"{thread_color}[{deployment.value}-{idx}]{RESET}: "
1367+
1368+
cleanup_tasks.append({
1369+
'deployment_name': deployment.value,
1370+
'rg_name': rg_name,
1371+
'thread_prefix': thread_prefix,
1372+
'thread_color': thread_color,
1373+
'index': idx
1374+
})
12381375

1376+
# Execute cleanup tasks in parallel
1377+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
1378+
# Submit all tasks
1379+
future_to_task = {
1380+
executor.submit(
1381+
_cleanup_resources_thread_safe,
1382+
task['deployment_name'],
1383+
task['rg_name'],
1384+
task['thread_prefix'],
1385+
task['thread_color']
1386+
): task for task in cleanup_tasks
1387+
}
1388+
1389+
# Track results
1390+
completed_count = 0
1391+
failed_count = 0
1392+
1393+
# Wait for completion and handle results
1394+
for future in as_completed(future_to_task):
1395+
task = future_to_task[future]
1396+
try:
1397+
success, error_msg = future.result()
1398+
completed_count += 1
1399+
1400+
if success:
1401+
with _print_lock:
1402+
print_ok(f"Completed cleanup for {deployment.value}-{task['index']} ({completed_count}/{len(indexes_list)})")
1403+
else:
1404+
failed_count += 1
1405+
with _print_lock:
1406+
print_error(f"❌ Failed cleanup for {deployment.value}-{task['index']}: {error_msg}")
1407+
1408+
except Exception as e:
1409+
failed_count += 1
1410+
with _print_lock:
1411+
print_error(f"❌ Exception during cleanup for {deployment.value}-{task['index']}: {str(e)}")
1412+
1413+
# Final summary
1414+
print()
1415+
if failed_count == 0:
1416+
print_ok(f'All {len(indexes_list)} infrastructure cleanups completed successfully!')
1417+
else:
1418+
print_warning(f'Completed with {failed_count} failures out of {len(indexes_list)} total cleanups.')
1419+
if completed_count > 0:
1420+
print_info(f'{completed_count} cleanups succeeded.')
1421+
12391422
print_ok('All done!')
12401423

12411424
def extract_json(text: str) -> Any:

tests/python/test_infrastructures.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def test_infrastructure_base_policy_fragments_creation(mock_utils):
170170
)
171171

172172
# Initialize policy fragments
173-
pfs = infra._define_policy_fragments()
173+
infra._define_policy_fragments()
174174

175175
# Check that all base policy fragments are created
176176
expected_fragment_names = [
@@ -195,7 +195,7 @@ def test_infrastructure_base_apis_creation(mock_utils):
195195
)
196196

197197
# Initialize APIs
198-
apis = infra._define_apis()
198+
infra._define_apis()
199199

200200
# Check that hello-world API is created
201201
assert len(infra.base_apis) == 1

0 commit comments

Comments
 (0)