Skip to content

Commit 22399a1

Browse files
authored
Config arguments not being passed with correct types (#159)
When trying to update an existing cluster the head not root volume size is being flagged becuase it is str(200), not int(200). It turns out that when passing the config to the CloudFormation CustomResource for the cluster it isn't preserving the integer types and is passing their values as strings. Pass the configuration to the CustomResource as a json string and parse the json string back into a Python structure so that int and string types are correctly preserved. Also combine the custom resources for the S3 config objects and the ParallelCluster so that a change to the json config string will be assured to also trigger an updates of the cluster. Move the customer resource lambda to create_parallel_cluster_lambdas() where it belongs. Save the config as json and yaml in a fixed s3 key in the assets bucket. Delete CreateParallelClusterConfig lambda. Functionality is move the CreateParallelClusterConfig. Updates weren't happening if the fleet was running. Added checking for this condition and stop the fleet and retry the update when necessary. Resolves #158
1 parent 1ab863a commit 22399a1

File tree

4 files changed

+179
-174
lines changed

4 files changed

+179
-174
lines changed

source/cdk/cdk_slurm_stack.py

Lines changed: 68 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
115115

116116
self.check_config()
117117

118+
self.cluster_region = self.config['Region']
119+
118120
if self.use_parallel_cluster:
119121
self.create_vpc()
120122

@@ -1096,6 +1098,58 @@ def create_lambdas(self):
10961098
def create_parallel_cluster_lambdas(self):
10971099
self.create_callSlurmRestApiLambda()
10981100

1101+
self.parallel_cluster_lambda_layer = aws_lambda.LayerVersion(self, "ParallelClusterLambdaLayer",
1102+
description = 'ParallelCluster Layer',
1103+
code = aws_lambda.Code.from_bucket(
1104+
s3.Bucket.from_bucket_name(self, 'ParallelClusterBucket', f"{self.cluster_region}-aws-parallelcluster"),
1105+
f"parallelcluster/{self.config['slurm']['ParallelClusterConfig']['Version']}/layers/aws-parallelcluster/lambda-layer.zip"
1106+
),
1107+
compatible_architectures = [
1108+
aws_lambda.Architecture.ARM_64,
1109+
aws_lambda.Architecture.X86_64,
1110+
],
1111+
compatible_runtimes = [
1112+
aws_lambda.Runtime.PYTHON_3_9,
1113+
aws_lambda.Runtime.PYTHON_3_10,
1114+
# aws_lambda.Runtime.PYTHON_3_11, # Doesn't work: No module named 'rpds.rpds'
1115+
],
1116+
)
1117+
1118+
createParallelClusterLambdaAsset = s3_assets.Asset(self, "CreateParallelClusterAsset", path="resources/lambdas/CreateParallelCluster")
1119+
self.create_parallel_cluster_lambda = aws_lambda.Function(
1120+
self, "CreateParallelClusterLambda",
1121+
function_name=f"{self.stack_name}-CreateParallelCluster",
1122+
description="Create ParallelCluster from json string",
1123+
memory_size=2048,
1124+
runtime=aws_lambda.Runtime.PYTHON_3_9,
1125+
architecture=aws_lambda.Architecture.X86_64,
1126+
timeout=Duration.minutes(15),
1127+
log_retention=logs.RetentionDays.INFINITE,
1128+
handler="CreateParallelCluster.lambda_handler",
1129+
code=aws_lambda.Code.from_bucket(createParallelClusterLambdaAsset.bucket, createParallelClusterLambdaAsset.s3_object_key),
1130+
layers=[self.parallel_cluster_lambda_layer],
1131+
vpc = self.vpc,
1132+
allow_all_outbound = True
1133+
)
1134+
self.create_parallel_cluster_lambda.add_to_role_policy(
1135+
statement=iam.PolicyStatement(
1136+
effect=iam.Effect.ALLOW,
1137+
actions=[
1138+
's3:*',
1139+
],
1140+
resources=['*']
1141+
)
1142+
)
1143+
self.create_parallel_cluster_lambda.add_to_role_policy(
1144+
statement=iam.PolicyStatement(
1145+
effect=iam.Effect.ALLOW,
1146+
actions=[
1147+
'*',
1148+
],
1149+
resources=['*']
1150+
)
1151+
)
1152+
10991153
def create_callSlurmRestApiLambda(self):
11001154
callSlurmRestApiLambdaAsset = s3_assets.Asset(self, "CallSlurmRestApiLambdaAsset", path="resources/lambdas/CallSlurmRestApi")
11011155
self.call_slurm_rest_api_lambda = aws_lambda.Function(
@@ -4143,102 +4197,29 @@ def create_parallel_cluster_config(self):
41434197
}
41444198
self.parallel_cluster_config['SharedStorage'].append(parallel_cluster_storage_dict)
41454199

4146-
fh = NamedTemporaryFile()
4147-
yaml.dump(self.parallel_cluster_config, fh, encoding='utf-8', sort_keys=False)
4148-
self.parallelClusterConfigAsset = s3_assets.Asset(self, "ParallelClusterConfigAsset", path=fh.name)
4149-
self.parallelClusterConfigAsset.grant_read(self.parallel_cluster_asset_read_policy)
4150-
4151-
createParallelClusterConfigAsset = s3_assets.Asset(self, "CreateParallelClusterConfigAsset", path="resources/lambdas/CreateParallelClusterConfig")
4152-
self.create_parallel_cluster_config_lambda = aws_lambda.Function(
4153-
self, "CreateParallelClusterConfigLambda",
4154-
function_name=f"{self.stack_name}-CreateParallelClusterConfig",
4155-
description="Create ParallelCluster config file in s3",
4156-
memory_size=128,
4157-
runtime=aws_lambda.Runtime.PYTHON_3_8,
4158-
architecture=aws_lambda.Architecture.ARM_64,
4159-
timeout=Duration.minutes(3),
4160-
log_retention=logs.RetentionDays.INFINITE,
4161-
handler="CreateParallelClusterConfig.lambda_handler",
4162-
code=aws_lambda.Code.from_bucket(createParallelClusterConfigAsset.bucket, createParallelClusterConfigAsset.s3_object_key),
4163-
vpc = self.vpc,
4164-
allow_all_outbound = True
4165-
)
4166-
self.create_parallel_cluster_config_lambda.add_to_role_policy(
4167-
statement=iam.PolicyStatement(
4168-
effect=iam.Effect.ALLOW,
4169-
actions=[
4170-
's3:*',
4171-
],
4172-
resources=['*']
4173-
)
4174-
)
4175-
# This is created as a custom resource so that resources get resolved in the config string before it gets passed to the lambda.
4176-
self.parallel_cluster_config_s3_url = CustomResource(
4177-
self, "ParallelClusterConfigS3Object",
4178-
service_token = self.create_parallel_cluster_config_lambda.function_arn,
4179-
properties = {
4180-
'ParallelClusterConfigYaml': yaml.dump(self.parallel_cluster_config, sort_keys=False),
4181-
'S3Bucket': self.parallelClusterConfigAsset.s3_bucket_name,
4182-
'S3Key': self.parallelClusterConfigAsset.s3_object_key,
4183-
'S3ObjectUrl': self.parallelClusterConfigAsset.s3_object_url
4184-
}
4185-
).get_att_string('S3ObjectUrl')
4200+
self.parallel_cluster_config_json_s3_key = f"{self.assets_base_key}/ParallelClusterConfig.json"
4201+
self.parallel_cluster_config_yaml_s3_key = f"{self.assets_base_key}/ParallelClusterConfig.yml"
41864202

4187-
self.parallel_cluster_lambda_layer = aws_lambda.LayerVersion(self, "ParallelClusterLambdaLayer",
4188-
description = 'ParallelCluster Layer',
4189-
code = aws_lambda.Code.from_bucket(
4190-
s3.Bucket.from_bucket_name(self, 'ParallelClusterBucket', f"{self.config['Region']}-aws-parallelcluster"),
4191-
f"parallelcluster/{self.config['slurm']['ParallelClusterConfig']['Version']}/layers/aws-parallelcluster/lambda-layer.zip"
4192-
),
4193-
compatible_architectures = [
4194-
aws_lambda.Architecture.ARM_64,
4195-
aws_lambda.Architecture.X86_64,
4196-
],
4197-
compatible_runtimes = [
4198-
aws_lambda.Runtime.PYTHON_3_8,
4199-
aws_lambda.Runtime.PYTHON_3_9,
4200-
# aws_lambda.Runtime.PYTHON_3_10,
4201-
# aws_lambda.Runtime.PYTHON_3_11,
4202-
],
4203-
)
4204-
createParallelClusterAsset = s3_assets.Asset(self, "CreateParallelClusterAsset", path="resources/lambdas/CreateParallelCluster")
4205-
self.create_parallel_cluster_lambda = aws_lambda.Function(
4206-
self, "CreateParallelClusterLambda",
4207-
function_name=f"{self.stack_name}-CreateParallelCluster",
4208-
description="Create ParallelCluster from config file in s3",
4209-
memory_size=2048,
4210-
runtime=aws_lambda.Runtime.PYTHON_3_9,
4211-
architecture=aws_lambda.Architecture.X86_64,
4212-
timeout=Duration.minutes(15),
4213-
log_retention=logs.RetentionDays.INFINITE,
4214-
handler="CreateParallelCluster.lambda_handler",
4215-
code=aws_lambda.Code.from_bucket(createParallelClusterAsset.bucket, createParallelClusterAsset.s3_object_key),
4216-
layers=[self.parallel_cluster_lambda_layer],
4217-
vpc = self.vpc,
4218-
allow_all_outbound = True
4219-
)
4220-
self.create_parallel_cluster_lambda.add_to_role_policy(
4221-
statement=iam.PolicyStatement(
4222-
effect=iam.Effect.ALLOW,
4223-
actions=[
4224-
'*',
4225-
],
4226-
resources=['*']
4227-
)
4228-
)
42294203
self.parallel_cluster = CustomResource(
4230-
self, "CreatedParallelCluster",
4204+
self, "ParallelCluster",
42314205
service_token = self.create_parallel_cluster_lambda.function_arn,
42324206
properties = {
4207+
'ParallelClusterConfigJson': json.dumps(self.parallel_cluster_config, sort_keys=False),
4208+
'ParallelClusterConfigS3Bucket': self.assets_bucket,
4209+
'ParallelClusterConfigJsonS3Key': self.parallel_cluster_config_json_s3_key,
4210+
'ParallelClusterConfigYamlS3Key': self.parallel_cluster_config_yaml_s3_key,
42334211
'Region': self.config['Region'],
42344212
'ClusterName': self.config['slurm']['ClusterName'],
4235-
'ParallelClusterConfigJson': self.parallel_cluster_config,
4236-
'ParallelClusterConfigS3Url': self.parallel_cluster_config_s3_url,
42374213
}
42384214
)
4215+
self.parallel_cluster_config_json_s3_url = self.parallel_cluster.get_att_string('ConfigJsonS3Url')
4216+
self.parallel_cluster_config_yaml_s3_url = self.parallel_cluster.get_att_string('ConfigYamlS3Url')
42394217

4240-
CfnOutput(self, "ParallelClusterConfigS3Url",
4241-
value = self.parallelClusterConfigAsset.s3_object_url
4218+
CfnOutput(self, "ParallelClusterConfigJsonS3Url",
4219+
value = self.parallel_cluster_config_json_s3_url
4220+
)
4221+
CfnOutput(self, "ParallelClusterConfigYamlS3Url",
4222+
value = self.parallel_cluster_config_yaml_s3_url
42424223
)
42434224
CfnOutput(self, "MungeParameterName",
42444225
value = self.munge_key_ssm_parameter.parameter_name

source/resources/lambdas/CreateParallelCluster/CreateParallelCluster.py

Lines changed: 111 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717
"""
1818

1919
'''
20-
Create/update/delete ParallelCluster cluster.
20+
Create/update/delete ParallelCluster cluster and save config to S3 as json and yaml.
2121
'''
22+
import boto3
2223
import cfnresponse
2324
import json
2425
import logging
2526
from os import environ as environ
2627
import pcluster.lib as pc
28+
from pcluster.api.errors import BadRequestException, UpdateClusterBadRequestException
29+
from time import sleep
30+
import yaml
2731

2832
logger=logging.getLogger(__file__)
2933
logger_formatter = logging.Formatter('%(levelname)s: %(message)s')
@@ -40,10 +44,12 @@ def lambda_handler(event, context):
4044
requestType = event['RequestType']
4145
properties = event['ResourceProperties']
4246
required_properties = [
43-
'Region',
44-
'ClusterName',
4547
'ParallelClusterConfigJson',
46-
'ParallelClusterConfigS3Url'
48+
'ParallelClusterConfigS3Bucket',
49+
'ParallelClusterConfigJsonS3Key',
50+
'ParallelClusterConfigYamlS3Key',
51+
'Region',
52+
'ClusterName'
4753
]
4854
error_message = ""
4955
for property in required_properties:
@@ -59,6 +65,42 @@ def lambda_handler(event, context):
5965
else:
6066
raise KeyError(error_message)
6167

68+
s3_resource = boto3.resource('s3')
69+
70+
json_key = f"{properties['ParallelClusterConfigJsonS3Key']}"
71+
json_s3_url = f"s3://{properties['ParallelClusterConfigS3Bucket']}/{json_key}"
72+
json_config_object = s3_resource.Object(
73+
bucket_name = properties['ParallelClusterConfigS3Bucket'],
74+
key = json_key
75+
)
76+
77+
if not properties['ParallelClusterConfigJson']:
78+
# Read the config from S3 for debug so don't need a complex test event
79+
logger.info(f"Reading ParallelClusterConfigJson from {json_s3_url}")
80+
properties['ParallelClusterConfigJson'] = json_config_object.get()['Body'].read().decode('utf-8')
81+
82+
parallel_cluster_config = json.loads(properties['ParallelClusterConfigJson'])
83+
84+
if requestType == 'Delete':
85+
logging.info(f"Deleting Parallel Cluster json config in {json_s3_url}")
86+
json_config_object.delete()
87+
else:
88+
logging.info(f"Saving Parallel Cluster json config in {json_s3_url}")
89+
json_config_object.put(Body=json.dumps(parallel_cluster_config, indent=4, sort_keys=False))
90+
91+
yaml_key = f"{properties['ParallelClusterConfigYamlS3Key']}"
92+
yaml_s3_url = f"s3://{properties['ParallelClusterConfigS3Bucket']}/{yaml_key}"
93+
yaml_config_object = s3_resource.Object(
94+
bucket_name = properties['ParallelClusterConfigS3Bucket'],
95+
key = yaml_key
96+
)
97+
if requestType == 'Delete':
98+
logging.info(f"Deleting Parallel Cluster yaml config in {yaml_s3_url}")
99+
yaml_config_object.delete()
100+
else:
101+
logging.info(f"Saving Parallel Cluster yaml config in {yaml_s3_url}")
102+
yaml_config_object.put(Body=yaml.dump(parallel_cluster_config, sort_keys=False))
103+
62104
cluster_name = properties['ClusterName']
63105
cluster_region = properties['Region']
64106
logger.info(f"{requestType} request for {cluster_name} in {cluster_region}")
@@ -94,24 +136,84 @@ def lambda_handler(event, context):
94136
try:
95137
pc.create_cluster(
96138
cluster_name = properties['ClusterName'],
97-
cluster_configuration = properties['ParallelClusterConfigJson'],
139+
cluster_configuration = parallel_cluster_config,
98140
region = properties['Region'],
99141
rollback_on_failure = False,
100142
)
101143
logger.info("Create call succeeded.")
102144
except:
103145
logger.exception("ParallelCluster create failed. Ignoring exception")
104146
elif requestType == "Update":
147+
logger.info("Checking compute fleet status.")
148+
compute_fleet_status = pc.describe_compute_fleet(
149+
cluster_name = properties['ClusterName'],
150+
region = properties['Region'])['status']
151+
logger.info(f"compute fleet status: {compute_fleet_status}")
152+
105153
logger.info(f"Updating {properties['ClusterName']}")
154+
stop_and_retry = False
106155
try:
107156
pc.update_cluster(
108157
cluster_name = properties['ClusterName'],
109-
cluster_configuration = properties['ParallelClusterConfigJson'],
158+
cluster_configuration = parallel_cluster_config,
110159
region = properties['Region']
111160
)
112161
logger.info("Update call succeeded")
113-
except:
114-
logger.exception("ParallelCluster Update failed. Ignoring exception.")
162+
except BadRequestException as e:
163+
message = e.content.message
164+
if 'No changes found in your cluster configuration' in message:
165+
logger.info('No changes found in your cluster configuration.')
166+
else:
167+
logger.error(message)
168+
169+
except UpdateClusterBadRequestException as e:
170+
message = e.content.message
171+
logger.info(message)
172+
logger.info(f"{e.content.__dict__}")
173+
if 'All compute nodes must be stopped' in str(e.content.__dict__):
174+
stop_and_retry = True
175+
else:
176+
logger.error(f"{message}")
177+
178+
if stop_and_retry:
179+
logger.info(f"Stopping the cluster and retrying the update.")
180+
try:
181+
pc.update_compute_fleet(
182+
cluster_name = properties['ClusterName'],
183+
status = 'STOP_REQUESTED',
184+
region = properties['Region']
185+
)
186+
except BadRequestException as e:
187+
message = e.content.message
188+
logger.error(e.content)
189+
logger.error(e.content.message)
190+
raise
191+
except Exception as e:
192+
logger.exception("update_compute_fleet failed")
193+
logger.error(f"{type(e)} {e.__dict__}")
194+
raise
195+
logger.info(f"Stop requested. Waiting for cluster to be STOPPED.")
196+
while compute_fleet_status != 'STOPPED':
197+
compute_fleet_status = pc.describe_compute_fleet(
198+
cluster_name = properties['ClusterName'],
199+
region = properties['Region'])['status']
200+
logger.info(f"compute fleet status: {compute_fleet_status}")
201+
sleep(1)
202+
logger.info("Compute fleet is stopped. Retrying update.")
203+
try:
204+
pc.update_cluster(
205+
cluster_name = properties['ClusterName'],
206+
cluster_configuration = parallel_cluster_config,
207+
region = properties['Region']
208+
)
209+
logger.info("Update call succeeded")
210+
except (BadRequestException, UpdateClusterBadRequestException) as e:
211+
message = e.content.message
212+
logger.error(message)
213+
logger.error(f"{e.content.__dict__}")
214+
except Exception as e:
215+
logger.exception("ParallelCluster Update failed.")
216+
115217
elif requestType == 'Delete':
116218
logger.info(f"Deleting {properties['ClusterName']}")
117219
try:
@@ -130,4 +232,4 @@ def lambda_handler(event, context):
130232
cfnresponse.send(event, context, cfnresponse.FAILED, {'error': str(e)}, physicalResourceId=cluster_name)
131233
raise
132234

133-
cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, physicalResourceId=cluster_name)
235+
cfnresponse.send(event, context, cfnresponse.SUCCESS, {'ConfigJsonS3Url': json_s3_url, 'ConfigYamlS3Url': yaml_s3_url}, physicalResourceId=cluster_name)

0 commit comments

Comments
 (0)