|
| 1 | +import glob |
| 2 | +import logging |
| 3 | +import json |
| 4 | +import os |
| 5 | +from typing import List |
| 6 | +from pathlib import Path |
| 7 | +from monty.json import MontyEncoder |
| 8 | +from monty.serialization import loadfn, dumpfn |
| 9 | +from apex.utils import ( |
| 10 | + judge_flow, |
| 11 | + json2dict, |
| 12 | + update_dict, |
| 13 | + return_prop_list, |
| 14 | + load_config_file, |
| 15 | + generate_random_string |
| 16 | +) |
| 17 | +from apex.database.DatabaseFactory import DatabaseFactory |
| 18 | +from apex.config import Config |
| 19 | + |
| 20 | + |
| 21 | +class ResultStorage: |
| 22 | + def __init__(self, work_dir): |
| 23 | + self.work_dir = Path(work_dir).absolute() |
| 24 | + self.result_dict = {'work_path': str(self.work_dir)} |
| 25 | + |
| 26 | + @property |
| 27 | + def result_data(self): |
| 28 | + return self.result_dict |
| 29 | + |
| 30 | + @property |
| 31 | + def work_dir_path(self): |
| 32 | + return str(self.work_dir) |
| 33 | + |
| 34 | + @json2dict |
| 35 | + def sync_relax(self, relax_param: dict): |
| 36 | + # sync results from relaxation task |
| 37 | + confs = relax_param["structures"] |
| 38 | + interaction = relax_param["interaction"] |
| 39 | + conf_dirs = [] |
| 40 | + for conf in confs: |
| 41 | + conf_dirs.extend(glob.glob(str(self.work_dir / conf))) |
| 42 | + conf_dirs.sort() |
| 43 | + for ii in conf_dirs: |
| 44 | + relax_task = os.path.join(ii, 'relaxation/relax_task') |
| 45 | + inter,task,structure,result=[os.path.join(relax_task,ii) for ii in |
| 46 | + ['inter.json','task.json','structure.json','result.json']] |
| 47 | + if not ( |
| 48 | + os.path.isfile(inter) |
| 49 | + and os.path.isfile(task) |
| 50 | + and os.path.isfile(result) |
| 51 | + ): |
| 52 | + logging.warning( |
| 53 | + f"relaxation result path is not complete, will skip result extraction from {relax_task}" |
| 54 | + ) |
| 55 | + continue |
| 56 | + logging.info(msg=f"extract results from {relax_task}") |
| 57 | + conf_key = os.path.relpath(ii, self.work_dir) |
| 58 | + conf_dict = {"interaction": loadfn(inter), |
| 59 | + "parameter": loadfn(task), |
| 60 | + "structure_info": loadfn(structure), |
| 61 | + "result": loadfn(result)} |
| 62 | + new_dict = {conf_key: {"relaxation": conf_dict}} |
| 63 | + update_dict(self.result_dict, new_dict) |
| 64 | + |
| 65 | + @json2dict |
| 66 | + def sync_props(self, props_param: dict, archive_tasks: bool = False): |
| 67 | + # sync results from property test |
| 68 | + confs = props_param["structures"] |
| 69 | + interaction = props_param["interaction"] |
| 70 | + properties = props_param["properties"] |
| 71 | + prop_list = return_prop_list(properties) |
| 72 | + conf_dirs = [] |
| 73 | + for conf in confs: |
| 74 | + conf_dirs.extend(glob.glob(str(self.work_dir / conf))) |
| 75 | + conf_dirs.sort() |
| 76 | + for ii in conf_dirs: |
| 77 | + for jj in prop_list: |
| 78 | + prop_dir = os.path.join(ii, jj) |
| 79 | + result = os.path.join(prop_dir, 'result.json') |
| 80 | + param = os.path.join(prop_dir, 'param.json') |
| 81 | + task_list_path = os.path.join(prop_dir, 'task_list.json') |
| 82 | + if not os.path.isfile(result): |
| 83 | + logging.warning( |
| 84 | + f"Property post-process is not complete, will skip result extraction from {prop_dir}" |
| 85 | + ) |
| 86 | + continue |
| 87 | + logging.info(msg=f"extract results from {prop_dir}") |
| 88 | + conf_key = os.path.relpath(ii, self.work_dir) |
| 89 | + result_dict = loadfn(result) |
| 90 | + try: |
| 91 | + param_dict = loadfn(param) |
| 92 | + except FileNotFoundError: |
| 93 | + logging.warning(f'{param} file not found') |
| 94 | + param_dict = None |
| 95 | + prop_dict = {"parameter": param_dict, "result": result_dict} |
| 96 | + # extract running details of each task |
| 97 | + if archive_tasks: |
| 98 | + logging.debug(msg='Archive running details of tasks...') |
| 99 | + logging.warning( |
| 100 | + msg='You are trying to archive detailed running log of each task into database,' |
| 101 | + 'which may exceed the limitation of database allowance.' |
| 102 | + 'Please consider spliting the data or only archiving details of the most important property.' |
| 103 | + ) |
| 104 | + try: |
| 105 | + task_list = loadfn(task_list_path) |
| 106 | + result_task_path = [os.path.join(prop_dir, task, 'result_task.json') for task in task_list] |
| 107 | + except FileNotFoundError: |
| 108 | + logging.warning(f'{task_list_path} file not found, will track all tasks listed {prop_dir}') |
| 109 | + result_task_path = glob.glob(os.path.join(prop_dir, 'task.*', 'result_task.json')) |
| 110 | + task_result_list = [loadfn(kk) for kk in sorted(result_task_path)] |
| 111 | + prop_dict["tasks"] = task_result_list |
| 112 | + |
| 113 | + new_dict = {conf_key: {jj: prop_dict}} |
| 114 | + update_dict(self.result_dict, new_dict) |
| 115 | + |
| 116 | + |
| 117 | +def connect_database(config): |
| 118 | + # connect to database |
| 119 | + if config.database_type == 'mongodb': |
| 120 | + logging.info(msg='Use database type: MongoDB') |
| 121 | + database = DatabaseFactory.create_database( |
| 122 | + 'mongodb', |
| 123 | + 'mongodb', |
| 124 | + config.mongodb_database, |
| 125 | + config.mongodb_collection, |
| 126 | + **config.mongodb_config_dict |
| 127 | + ) |
| 128 | + elif config.database_type == 'dynamodb': |
| 129 | + logging.info(msg='Use database type: DynamoDB') |
| 130 | + database = DatabaseFactory.create_database( |
| 131 | + 'dynamodb', |
| 132 | + 'dynamodb', |
| 133 | + config.dynamodb_table_name, |
| 134 | + **config.dynamodb_config_dict |
| 135 | + ) |
| 136 | + else: |
| 137 | + raise RuntimeError(f'unsupported database type: {config.database_type}') |
| 138 | + return database |
| 139 | + |
| 140 | + |
| 141 | +def archive2db(config, data: dict, data_id: str): |
| 142 | + database = connect_database(config) |
| 143 | + # archive results database |
| 144 | + if config.archive_method == 'sync': |
| 145 | + logging.debug(msg='Archive method: sync') |
| 146 | + database.sync(data, data_id, depth=2) |
| 147 | + elif config.archive_method == 'record': |
| 148 | + logging.debug(msg='Archive method: record') |
| 149 | + database.record(data, data_id) |
| 150 | + else: |
| 151 | + raise TypeError( |
| 152 | + f'Unrecognized archive method: {config.archive_method}. ' |
| 153 | + f"Should select from 'sync' and 'record'." |
| 154 | + ) |
| 155 | + |
| 156 | + |
| 157 | +def archive_workdir(relax_param, props_param, config, work_dir, flow_type): |
| 158 | + print(f'=> Begin archiving {work_dir}') |
| 159 | + # extract results json |
| 160 | + store = ResultStorage(work_dir) |
| 161 | + if relax_param and flow_type != 'props': |
| 162 | + store.sync_relax(relax_param) |
| 163 | + if props_param and flow_type != 'relax': |
| 164 | + store.sync_props(props_param, config.archive_tasks) |
| 165 | + |
| 166 | + dump_file = os.path.join(store.work_dir_path, 'all_result.json') |
| 167 | + default_id = generate_random_string(10) |
| 168 | + if os.path.isfile(dump_file): |
| 169 | + logging.info(msg='all_result.json exists, and will be updated.') |
| 170 | + orig_data = loadfn(dump_file) |
| 171 | + try: |
| 172 | + default_id = orig_data['archive_key'] |
| 173 | + except KeyError: |
| 174 | + store.result_data['archive_key'] = default_id |
| 175 | + update_dict(orig_data, store.result_data, depth=2) |
| 176 | + dumpfn(orig_data, dump_file, indent=4) |
| 177 | + else: |
| 178 | + store.result_data['archive_key'] = default_id |
| 179 | + dumpfn(store.result_data, dump_file, indent=4) |
| 180 | + |
| 181 | + # try to get documented key id from all_result.json |
| 182 | + # define archive key |
| 183 | + data_id = config.archive_key if config.archive_key else default_id |
| 184 | + |
| 185 | + if config.database_type != 'local': |
| 186 | + data_json_str = json.dumps(store.result_data, cls=MontyEncoder, indent=4) |
| 187 | + data_dict = json.loads(data_json_str) |
| 188 | + data_dict['_id'] = data_id |
| 189 | + |
| 190 | + archive2db(config, data_dict, data_id) |
| 191 | + |
| 192 | + |
| 193 | +def archive2db_from_json(config, json_file): |
| 194 | + logging.info(msg=f'Archive from local json file: {json_file}') |
| 195 | + data_dict = loadfn(json_file) |
| 196 | + data_json_str = json.dumps(data_dict, cls=MontyEncoder, indent=4) |
| 197 | + data_dict = json.loads(data_json_str) |
| 198 | + # define archive key |
| 199 | + if config.archive_key: |
| 200 | + data_id = config.archive_key |
| 201 | + else: |
| 202 | + data_id = data_dict['archive_key'] |
| 203 | + data_dict['_id'] = data_id |
| 204 | + |
| 205 | + archive2db(config, data_dict, data_id) |
| 206 | + |
| 207 | + |
| 208 | +def archive_result( |
| 209 | + parameters: List[os.PathLike], |
| 210 | + config_dict: dict, |
| 211 | + work_dir: List[os.PathLike], |
| 212 | + indicated_flow_type: str, |
| 213 | + database_type, |
| 214 | + method, |
| 215 | + archive_tasks, |
| 216 | + is_result |
| 217 | +): |
| 218 | + global_config = Config(**config_dict) |
| 219 | + # re-specify args |
| 220 | + if database_type: |
| 221 | + global_config.database_type = database_type |
| 222 | + if method: |
| 223 | + global_config.archive_method = method |
| 224 | + if archive_tasks: |
| 225 | + global_config.archive_tasks = archive_tasks |
| 226 | + |
| 227 | + if is_result: |
| 228 | + # archive local results json file |
| 229 | + json_file_list = [] |
| 230 | + # Parameter here stands for json files that store test results and be archived directly |
| 231 | + for ii in parameters: |
| 232 | + glob_list = glob.glob(os.path.abspath(ii)) |
| 233 | + json_file_list.extend(glob_list) |
| 234 | + json_file_list.sort() |
| 235 | + for ii in json_file_list: |
| 236 | + archive2db_from_json(global_config, ii) |
| 237 | + else: |
| 238 | + _, _, flow_type, relax_param, props_param = judge_flow( |
| 239 | + [loadfn(jj) for jj in parameters], |
| 240 | + indicated_flow_type |
| 241 | + ) |
| 242 | + # archive work directories |
| 243 | + work_dir_list = [] |
| 244 | + for ii in work_dir: |
| 245 | + glob_list = glob.glob(os.path.abspath(ii)) |
| 246 | + work_dir_list.extend(glob_list) |
| 247 | + work_dir_list.sort() |
| 248 | + for ii in work_dir_list: |
| 249 | + archive_workdir(relax_param, props_param, global_config, ii, flow_type) |
| 250 | + |
| 251 | + |
| 252 | +def archive_from_args( |
| 253 | + parameters: List[os.PathLike], |
| 254 | + config_file: os.PathLike, |
| 255 | + work_dirs: List[os.PathLike], |
| 256 | + indicated_flow_type: str, |
| 257 | + database_type, |
| 258 | + method, |
| 259 | + archive_tasks, |
| 260 | + is_result |
| 261 | +): |
| 262 | + print('-------Archive result Mode-------') |
| 263 | + archive_result( |
| 264 | + parameters=parameters, |
| 265 | + config_dict=load_config_file(config_file), |
| 266 | + work_dir=work_dirs, |
| 267 | + indicated_flow_type=indicated_flow_type, |
| 268 | + database_type=database_type, |
| 269 | + method=method, |
| 270 | + archive_tasks=archive_tasks, |
| 271 | + is_result=is_result |
| 272 | + ) |
| 273 | + print('Complete!') |
0 commit comments