|
| 1 | +import sqlite3 |
| 2 | +from copy import deepcopy |
| 3 | +from typing import Tuple |
| 4 | + |
| 5 | +import pandas as pd |
| 6 | + |
| 7 | +from utils.configs import sheets_mapping, different_names_pos, sheet_columns, guidelines, converters, has_merged_names |
| 8 | +from utils.filler_utils import get_requirements_columns, get_columns_count_for_guideline, split_sheet, \ |
| 9 | + get_version_name_for_database, get_guideline_name_for_database, is_double_guideline, get_first_col_for_guideline, \ |
| 10 | + get_column |
| 11 | + |
| 12 | +dataframe = pd.read_excel("guidelines.xlsx", header=[0, 1], sheet_name=list(sheets_mapping.keys()), |
| 13 | + converters=converters, dtype=str) |
| 14 | + |
| 15 | +sheet_with_extra_table = { |
| 16 | + "TLS extensions": ("applies to version", "TlsVersionExtension") |
| 17 | +} |
| 18 | + |
| 19 | +conn = sqlite3.connect("requirements.db") |
| 20 | +cur = conn.cursor() |
| 21 | + |
| 22 | + |
| 23 | +def prepare_database(): |
| 24 | + cur.execute("SELECT name FROM sqlite_master WHERE type='table'") |
| 25 | + for table in cur.fetchall(): |
| 26 | + cur.execute("DELETE FROM " + table[0]) |
| 27 | + conn.commit() |
| 28 | + |
| 29 | + |
| 30 | +def insert_guideline_info(): |
| 31 | + cur.executemany("INSERT OR REPLACE INTO Guideline VALUES (?, ?)", |
| 32 | + [(guideline, guidelines[guideline]) for guideline in guidelines]) |
| 33 | + |
| 34 | + |
| 35 | +def get_cell_for_df(df: pd.DataFrame, row_index: int, header): |
| 36 | + col_index = 0 |
| 37 | + for col_index, col in enumerate(df.columns): |
| 38 | + if col[0] == header[0]: |
| 39 | + break |
| 40 | + return df.iloc[row_index: row_index + 1, col_index:col_index + 1].iat[0, 0] |
| 41 | + |
| 42 | + |
| 43 | +def get_name_from_index_for_sheet(index, sheet_name: str) -> str: |
| 44 | + """ |
| 45 | + Gets the name of the item for that row. Some sheets have the name column in a different position, for that case |
| 46 | + see the different_names_pos dictionary |
| 47 | + :param index: row index |
| 48 | + :param sheet_name: sheet in which the search should be done |
| 49 | + :return: item_name: the name for the row at index in the sheet |
| 50 | + """ |
| 51 | + column = different_names_pos.get(sheet_name, (0, 1))[0] |
| 52 | + return dataframe[sheet_name].iloc[index:index + 1, column:column + 1].iat[0, 0] |
| 53 | + |
| 54 | + |
| 55 | +def get_additional_info(index, sheet_name: str): |
| 56 | + column, lengths = different_names_pos.get(sheet_name, (0, 1)) |
| 57 | + return_vals = [] |
| 58 | + tmp_df = dataframe[sheet_name].iloc[index:index + 1, column:column + lengths] |
| 59 | + if lengths > 1: |
| 60 | + for i in range(1, lengths): |
| 61 | + val = tmp_df.iat[0, i] |
| 62 | + return_vals.append(val) |
| 63 | + return return_vals |
| 64 | + |
| 65 | + |
| 66 | +def already_parsed(col_name: str) -> bool: |
| 67 | + for _, c2 in sheet_with_extra_table.items(): |
| 68 | + if c2[0] == col_name.strip(): |
| 69 | + return True |
| 70 | + return False |
| 71 | + |
| 72 | + |
| 73 | +def values_to_add(r: pd.Series, columns: pd.Index) -> Tuple: |
| 74 | + """Given a series of values checks if those values belong to columns that were already parsed |
| 75 | + :param r The row (Series) containing the values that need to be checked |
| 76 | + :param columns: The columns of the dataframe from which the row is taken |
| 77 | + """ |
| 78 | + val_list = r.to_list() |
| 79 | + i = 0 |
| 80 | + for c in columns: |
| 81 | + if already_parsed(c[0]): |
| 82 | + val_list.pop(i) |
| 83 | + else: |
| 84 | + i += 1 |
| 85 | + return tuple(val_list) |
| 86 | + |
| 87 | + |
| 88 | +def has_extra_table(sheet_name: str) -> Tuple: |
| 89 | + return sheet_with_extra_table.get(sheet_name, ()) |
| 90 | + |
| 91 | + |
| 92 | +def fill_extra_table(sheet_name: str) -> bool: |
| 93 | + """ |
| 94 | + This function takes the name of a sheet as a param, uses it to get the column names from which it should get data |
| 95 | + and the table in which to insert the data using the sheet_with_extra_table dictionary and then adds this data to the |
| 96 | + database. |
| 97 | +
|
| 98 | + :param sheet_name: the sheet that has an extra table |
| 99 | + :return: False if the sheet doesn't have an extra table, True if it committed to the database |
| 100 | + """ |
| 101 | + column, table = sheet_with_extra_table.get(sheet_name, (None, None)) |
| 102 | + if not column or not table: |
| 103 | + return False |
| 104 | + file_sheet: pd.DataFrame = dataframe[sheet_name] |
| 105 | + # The first column is almost always the names column |
| 106 | + names: pd.Series = get_column(file_sheet, 0) |
| 107 | + # Get only the columns that must be inserted in the extra table |
| 108 | + versions = file_sheet.filter(like=column) |
| 109 | + versions_names = {} |
| 110 | + insertion_query = f"INSERT OR REPLACE INTO {table} VALUES (?, ?)" |
| 111 | + values_to_insert = [] |
| 112 | + # prepare the mapping from index to column |
| 113 | + for pos, version in enumerate(versions.columns.to_list()): |
| 114 | + versions_names[pos] = version[1] |
| 115 | + |
| 116 | + for pos, content in versions.iterrows(): |
| 117 | + name = names[pos] |
| 118 | + # This variable i is used to cycle through the column's name without having to add it to the dataframe |
| 119 | + # It can probably be avoided by using the join in pandas, but I can't get it to work |
| 120 | + i = 0 |
| 121 | + for c in content: |
| 122 | + if pd.notna(c): |
| 123 | + values_to_insert.append( |
| 124 | + (versions_names[i % len(versions.columns)], name)) |
| 125 | + i += 1 |
| 126 | + cur.executemany(insertion_query, values_to_insert) |
| 127 | + conn.commit() |
| 128 | + return True |
| 129 | + |
| 130 | + |
| 131 | +if __name__ == "__main__": |
| 132 | + prepare_database() |
| 133 | + insert_guideline_info() |
| 134 | + guidelines_mapping = {} |
| 135 | + for guideline in guidelines: |
| 136 | + guidelines_mapping[guideline.upper()] = guideline |
| 137 | + for sheet in dataframe: |
| 138 | + sheet_mapped = sheets_mapping.get(sheet.strip()) |
| 139 | + if isinstance(sheet, str) and sheet_mapped: |
| 140 | + done = False |
| 141 | + values = [] |
| 142 | + if has_extra_table(sheet): |
| 143 | + fill_extra_table(sheet) |
| 144 | + general_dataframe, guidelines_dataframe = split_sheet(dataframe[sheet]) |
| 145 | + values_tuple = () |
| 146 | + # old_values is needed for some strange cases like key_signature |
| 147 | + old_values = [] |
| 148 | + for row in general_dataframe.iterrows(): |
| 149 | + # row[0] is the index, row[1] is the actual content of the line |
| 150 | + values_tuple = values_to_add(row[1], general_dataframe.columns) |
| 151 | + if not len(old_values): |
| 152 | + old_values = [v for v in values_tuple] |
| 153 | + else: |
| 154 | + tmp_list = [] |
| 155 | + for i, v in enumerate(values_tuple): |
| 156 | + if pd.isna(v) and v != old_values[i]: |
| 157 | + tmp_list.append(old_values[i]) |
| 158 | + else: |
| 159 | + tmp_list.append(v) |
| 160 | + values_tuple = tuple(tmp_list) |
| 161 | + old_values = tmp_list |
| 162 | + if values_tuple[0] != "Certificate Type": |
| 163 | + values.append(values_tuple) |
| 164 | + values_string = "(" |
| 165 | + values_string += "?," * len(values_tuple) |
| 166 | + # Remove last ',' and replace it with ')' |
| 167 | + values_string = values_string[:-1] + ")" |
| 168 | + sql_query = f"INSERT OR REPLACE INTO {sheet_mapped} VALUES " + values_string |
| 169 | + cur.executemany(sql_query, values) |
| 170 | + conn.commit() |
| 171 | + values = [] |
| 172 | + |
| 173 | + # Start of guideline specific part |
| 174 | + requirements_columns = get_requirements_columns(guidelines_dataframe, sheet) |
| 175 | + guidelines_columns_count = get_columns_count_for_guideline(guidelines_dataframe) |
| 176 | + |
| 177 | + values_dict = {} |
| 178 | + last_item = "" |
| 179 | + |
| 180 | + # maybe this whole part can be rewritten using iloc |
| 181 | + old_name = "" |
| 182 | + for row in guidelines_dataframe.iterrows(): |
| 183 | + row_dictionary = row[1].to_dict() |
| 184 | + for header in row_dictionary: |
| 185 | + # header[0] is guideline_name |
| 186 | + item_name = get_name_from_index_for_sheet(row[0], sheet) |
| 187 | + if pd.isna(item_name) and sheet in has_merged_names: |
| 188 | + item_name = old_name |
| 189 | + else: |
| 190 | + old_name = item_name |
| 191 | + guideline = get_guideline_name_for_database(header[0]) |
| 192 | + version_name = get_version_name_for_database(header[1]) |
| 193 | + table_name = sheet_mapped + guideline + version_name |
| 194 | + content = row_dictionary[header] |
| 195 | + if header[1] in requirements_columns[header[0]]: |
| 196 | + # This is the case for sheets like cipher suite |
| 197 | + if sheet_columns.get(sheet, {}).get(header[0]): |
| 198 | + level_column = get_first_col_for_guideline(guidelines_dataframe, guideline) |
| 199 | + level = get_cell_for_df(guidelines_dataframe, row[0], (guideline, level_column)) |
| 200 | + # If the cell is empty and the level isn’t negative (must not, not recommended) |
| 201 | + # then "must not" is used as the level. |
| 202 | + if level == "<Not mentioned>": |
| 203 | + content = level |
| 204 | + if pd.notna(content) or level in ["not recommended", "must not"]: |
| 205 | + if content not in ["recommended", "must"]: |
| 206 | + content = level |
| 207 | + else: |
| 208 | + content = "must not" |
| 209 | + |
| 210 | + # this block is to prepare the dictionary |
| 211 | + if not values_dict.get(table_name): |
| 212 | + values_dict[table_name] = {row[0]: []} |
| 213 | + if not values_dict[table_name].get(row[0]): |
| 214 | + values_dict[table_name][row[0]] = [] |
| 215 | + # end of the block |
| 216 | + |
| 217 | + # Vertically merged cells contain the value only in the first cell |
| 218 | + if pd.isna(item_name) and not pd.isna(content): |
| 219 | + item_name = values_dict[table_name][row[0] - 1][0] |
| 220 | + |
| 221 | + # First the guideline name is added |
| 222 | + values_dict[table_name][row[0]].append(guidelines_mapping.get(guideline, guideline)) |
| 223 | + |
| 224 | + # Then the name of the row is added |
| 225 | + values_dict[table_name][row[0]].append(item_name) |
| 226 | + # If this table needs extra data it gets added here |
| 227 | + for el in get_additional_info(row[0], sheet): |
| 228 | + values_dict[table_name][row[0]].append(el) |
| 229 | + |
| 230 | + values_dict[table_name][row[0]].append(content) |
| 231 | + |
| 232 | + elif pd.notna(header[1]) and \ |
| 233 | + get_first_col_for_guideline(guidelines_dataframe, header[0]) != header[1]: |
| 234 | + # update all the lists of the same guideline with the condition |
| 235 | + columns_to_apply = [] |
| 236 | + if " [" in header[1]: |
| 237 | + columns_to_apply = header[1].split(" [")[1].replace("]", "").split(",") |
| 238 | + columns_to_apply = [int(c.strip()) for c in columns_to_apply] |
| 239 | + counter = 0 |
| 240 | + for t_name in values_dict: |
| 241 | + guideline_db_name = get_guideline_name_for_database(header[0]) |
| 242 | + # this is needed only for the case of KeyLengthsBSI and KeyLengths BSI (from ...) |
| 243 | + has_valid_underscore = "_" in guideline_db_name and "_" in t_name |
| 244 | + if t_name.startswith(sheet_mapped + guideline_db_name): |
| 245 | + if "_" not in t_name or has_valid_underscore: |
| 246 | + counter += 1 |
| 247 | + if " [" in header[1] and counter not in columns_to_apply: |
| 248 | + continue |
| 249 | + values_dict[t_name][row[0]].append(content) |
| 250 | + if is_double_guideline(header[0]): |
| 251 | + tokens = header[0].split("+") |
| 252 | + base_guideline = tokens[0].replace("(", "").strip() |
| 253 | + for other_guideline in tokens[1:]: |
| 254 | + other_name = get_guideline_name_for_database(other_guideline) |
| 255 | + other_table = sheet_mapped + other_name + version_name |
| 256 | + values_dict[other_table] = deepcopy(values_dict[table_name]) |
| 257 | + for el in values_dict[other_table]: |
| 258 | + # Update the guideline name |
| 259 | + for i, entry in enumerate(values_dict[other_table][el]): |
| 260 | + if isinstance(entry, str) and entry.upper() == base_guideline.upper(): |
| 261 | + values_dict[other_table][el][i] = other_name |
| 262 | + |
| 263 | + # Convert all the data into tuples to add them to the database and group them by guideline name |
| 264 | + values_groups = {} |
| 265 | + for table in values_dict: |
| 266 | + # Get the number of columns for the actual table |
| 267 | + table_columns_count = len(cur.execute(f"PRAGMA table_info({table})").fetchall()) |
| 268 | + entries = values_dict[table] |
| 269 | + |
| 270 | + # # This is to prevent the "this or X" condition to appear in tables that don't need it |
| 271 | + # # this condition checks if the guideline has multiple versions for this sheet |
| 272 | + # if table.startswith("Protocol") and table[len("Protocol"):] not in [g.upper() for g in guidelines]: |
| 273 | + # for entry in entries: |
| 274 | + # entry = entries[entry] |
| 275 | + # # Since the problem is a condition, and it only verifies if there are four elements. |
| 276 | + # # Last element is the condition |
| 277 | + # # Second to last is the level |
| 278 | + # print(entry) |
| 279 | + # if len(entry) > 3 and pd.notna(entry[-1]): |
| 280 | + # if entry[-2][-1] != "°": |
| 281 | + # entry[-1] = None |
| 282 | + last_level = None |
| 283 | + |
| 284 | + # This is to prevent the "this or X" condition to appear in tables that don't need it, only works |
| 285 | + # for the case of Protocol sheet and only if the conditions are in adjacent lines |
| 286 | + if table.startswith("Protocol"): |
| 287 | + for index, entry in entries.items(): |
| 288 | + # skip first element |
| 289 | + if index == 0: |
| 290 | + continue |
| 291 | + if len(entry) > 3 and pd.notna(entry[-1]) and pd.notna(entries[index - 1][-1]): |
| 292 | + if entry[-2] != entries[index - 1][-2]: |
| 293 | + entry[-1] = None |
| 294 | + entries[index - 1][-1] = None |
| 295 | + |
| 296 | + if not values_groups.get(table): |
| 297 | + values_groups[table] = [] |
| 298 | + for index in entries: |
| 299 | + entry = entries[index] |
| 300 | + if pd.notna(entry[1]) and entry[1] != "Certificate Type" and entry[1] != "NIST": |
| 301 | + # The double check is needed because of the case Mozilla + AGID which share the same pointer to |
| 302 | + # the list of values |
| 303 | + if len(entry) < table_columns_count: |
| 304 | + entry.insert(0, index) |
| 305 | + # Every remaining column is filled with None |
| 306 | + while len(entry) < table_columns_count: |
| 307 | + entry.append(None) |
| 308 | + values_groups[table].append(tuple(entry)) |
| 309 | + for table in values_groups: |
| 310 | + values = values_groups[table] |
| 311 | + values_string = "(" |
| 312 | + # The values list should contain tuples that are all the same size |
| 313 | + values_string += "?," * (len(values[0])) |
| 314 | + # Remove last ',' and replace it with ')' |
| 315 | + values_string = values_string[:-1] + ")" |
| 316 | + sql_query = f"INSERT OR REPLACE INTO {table} VALUES " + values_string |
| 317 | + cur.executemany(sql_query, values) |
| 318 | + conn.commit() |
0 commit comments