From 9e4505ce01fdf8ae08ae5997e64e47c9c9096e0c Mon Sep 17 00:00:00 2001 From: Gabriel Broadwin Nongsiej Date: Thu, 17 Jul 2025 18:18:31 +0530 Subject: [PATCH 1/3] Fixes for MLE signature for Request for HTTP Signature --- .../core/MerchantConfiguration.py | 117 +++++++++++++--- authenticationsdk/util/Cache.py | 93 ++++++++++++- authenticationsdk/util/CertificateUtility.py | 131 ++++++++++++++++++ .../util/GlobalLabelParameters.py | 2 + authenticationsdk/util/MLEUtility.py | 131 ++++++++---------- 5 files changed, 372 insertions(+), 102 deletions(-) create mode 100644 authenticationsdk/util/CertificateUtility.py diff --git a/authenticationsdk/core/MerchantConfiguration.py b/authenticationsdk/core/MerchantConfiguration.py index 202bab91..f9dc974d 100644 --- a/authenticationsdk/core/MerchantConfiguration.py +++ b/authenticationsdk/core/MerchantConfiguration.py @@ -1,4 +1,6 @@ +import copy from CyberSource.logging.log_configuration import LogConfiguration +from authenticationsdk.util.CertificateUtility import CertificateUtility from authenticationsdk.util.GlobalLabelParameters import * from wsgiref.handlers import format_date_time from datetime import datetime @@ -52,9 +54,12 @@ def __init__(self): self.__jwePEMFileDirectory = None self.useMLEGlobally = None self.mapToControlMLEonAPI = None - self.mleKeyAlias = None + self.mleKeyAlias = None + self.mleForRequestPublicCertPath = None + self.p12KeyFilePath = None self.logger = LogFactory.setup_logger(self.__class__.__name__) +#region Getters and Setters def set_merchant_keyid(self, value): if not (value.get('merchant_keyid') is None): self.merchant_keyid = value['merchant_keyid'] @@ -88,7 +93,7 @@ def set_use_metakey(self, value): self.use_metakey = value['use_metakey'] else: self.use_metakey = False - + def set_portfolio_id(self, value): if not (value.get('portfolio_id') is None): self.portfolio_id = value['portfolio_id'] @@ -138,7 +143,7 @@ def set_enable_client_cert(self, value): self.enable_client_cert = value['enable_client_cert'] else: self.enable_client_cert = False - + def set_client_cert_dir(self, value): if not (value.get('client_cert_dir') is None): self.client_cert_dir = value['client_cert_dir'] @@ -183,7 +188,7 @@ def set_jwePEMFileDirectory(self, value): def get_jwePEMFileDirectory(self): return self.__jwePEMFileDirectory - + def set_useMLEGlobally(self, value): if not (value.get('useMLEGlobally') is None): self.useMLEGlobally = value['useMLEGlobally'] @@ -192,7 +197,7 @@ def set_useMLEGlobally(self, value): def get_useMLEGlobally(self): return self.useMLEGlobally - + def set_mapToControlMLEonAPI(self, value): map_to_control_mle_on_api = value.get('mapToControlMLEonAPI') if map_to_control_mle_on_api is not None: @@ -203,7 +208,7 @@ def set_mapToControlMLEonAPI(self, value): def get_mapToControlMLEonAPI(self): return self.mapToControlMLEonAPI - + def set_mleKeyAlias(self, value): if value.get('mleKeyAlias') is not None and value.get('mleKeyAlias').strip(): self.mleKeyAlias = value['mleKeyAlias'].strip() @@ -213,6 +218,25 @@ def set_mleKeyAlias(self, value): def get_mleKeyAlias(self): return self.mleKeyAlias + def set_mleForRequestPublicCertPath(self, value): + if value.get('mleForRequestPublicCertPath') is not None and value.get('mleForRequestPublicCertPath').strip(): + self.mleForRequestPublicCertPath = value['mleForRequestPublicCertPath'].strip() + else: + self.mleForRequestPublicCertPath = None + + def get_mleForRequestPublicCertPath(self): + return self.mleForRequestPublicCertPath + + def set_p12KeyFilePath(self, value): + if value.get('p12KeyFilePath') is not None and value.get('p12KeyFilePath').strip(): + self.p12KeyFilePath = value['p12KeyFilePath'].strip() + else: + self.p12KeyFilePath = None + + def get_p12KeyFilePath(self): + return self.p12KeyFilePath + +#endregion # This method sets the Merchant Configuration Variables to its respective values after reading from cybs.properties def set_merchantconfig(self, val): @@ -247,6 +271,7 @@ def set_merchantconfig(self, val): self.set_jwePEMFileDirectory(val) self.set_useMLEGlobally(val) self.set_mapToControlMLEonAPI(val) + self.set_mleForRequestPublicCertPath(val) self.set_mleKeyAlias(val) # Returns the time in format as defined by RFC7231 @@ -281,12 +306,12 @@ def validate_merchant_details(self, details, mconfig = None): authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.CLIENT_CERT_DIR_EMPTY, self.log_config) - + if self.ssl_client_cert is None or self.ssl_client_cert == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.SSL_CLIENT_CERT_EMPTY, self.log_config) - + if self.private_key is None or self.private_key == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.PRIVATE_KEY_EMPTY, @@ -302,7 +327,7 @@ def validate_merchant_details(self, details, mconfig = None): authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.MERCHANTID_REQ, self.log_config) - + if self.merchant_keyid is None or self.merchant_keyid == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.MERCHANT_KEY_ID_REQ, @@ -318,7 +343,7 @@ def validate_merchant_details(self, details, mconfig = None): authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.MERCHANTID_REQ, self.log_config) - + if self.key_alias is None or self.key_alias == "": self.key_alias = self.merchant_id authenticationsdk.util.ExceptionAuth.validate_default_values(self.logger, @@ -349,23 +374,26 @@ def validate_merchant_details(self, details, mconfig = None): GlobalLabelParameters.KEY_FILE_EMPTY, self.log_config) - elif self.authentication_type.lower() == GlobalLabelParameters.OAUTH.lower(): + if not self.check_key_file(): + authenticationsdk.util.ExceptionAuth.log_exception(self.logger, f"Error finding or accessing the Key Directory or Key File. Please review the values in the merchant configuration.", self.log_config) + + elif self.authentication_type.lower() == GlobalLabelParameters.OAUTH.lower(): if self.access_token is None or self.access_token == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.ACCESS_TOKEN_EMPTY, self.log_config) - + if self.ssl_client_cert is None or self.ssl_client_cert == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.REFRESH_TOKEN_EMPTY, self.log_config) - - elif self.authentication_type.lower() == GlobalLabelParameters.MUTUAL_AUTH.lower(): + + elif self.authentication_type.lower() == GlobalLabelParameters.MUTUAL_AUTH.lower(): if self.client_id is None or self.client_id == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.CLIENT_ID_EMPTY, self.log_config) - + if self.client_secret is None or self.client_secret == "": authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.CLIENT_SECRET_EMPTY, @@ -374,6 +402,7 @@ def validate_merchant_details(self, details, mconfig = None): authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger, GlobalLabelParameters.AUTH_ERROR, self.log_config) + # useMLEGlobally check for auth Type if self.useMLEGlobally is True or self.mapToControlMLEonAPI is not None: if self.useMLEGlobally is True and self.authentication_type.lower() != GlobalLabelParameters.JWT.lower(): @@ -388,14 +417,58 @@ def validate_merchant_details(self, details, mconfig = None): GlobalLabelParameters.MLE_AUTH_ERROR, self.log_config) + self.validate_MLE_configuration() + self.p12KeyFilePath = os.path.join(self.key_file_path, self.key_file_name) + GlobalLabelParameters.P12_PREFIX + log_items = GlobalLabelParameters.HIDE_MERCHANT_CONFIG_PROPS # This displays the logic for logging all cybs.json values + details_copy = copy.deepcopy(details) if self.log_config.enable_log is True: - for key, value in list(details.items()): + for key, value in list(details_copy.items()): if key in log_items: - del details[key] - - for keys, values in list(details.items()): - details[keys] = str(values) - - self.logger.info("Mconfig > " + str(ast.literal_eval(json.dumps(details)))) + del details_copy[key] + + for keys, values in list(details_copy.items()): + details_copy[keys] = str(values) + + self.logger.info("Mconfig > " + str(ast.literal_eval(json.dumps(details_copy)))) + + def check_key_file(self): + if not(self.key_file_name and self.key_file_name.strip()): + self.logger.error("Key Filename not provided. Assigning the value of Merchant ID") + if self.merchant_id and self.merchant_id.strip(): + self.key_file_name = self.merchant_id + + if not(self.key_file_path and self.key_file_path.strip()): + self.key_file_path = GlobalLabelParameters.DEFAULT_KEY_FILE_PATH + self.logger.error(f"Keys Directory not provided. Using Default Path: {self.key_file_path}") + + # Directory exists? + if not os.path.isdir(self.key_file_path): + self.logger.error(f"Keys Directory not found. Entered directory : {self.key_file_path}") + return False + + keyFilePath = os.path.join(self.key_file_path, self.key_file_name) + GlobalLabelParameters.P12_PREFIX + + # File exists? + if not os.path.isfile(keyFilePath): + self.logger.error(f"Key File not found. Check path/filename entered. Entered path/filename : {keyFilePath}") + return False + + self.logger.info(f"Entered value for Key File Path : {keyFilePath}") + + # Can file be opened for reading? + try: + with open(keyFilePath, 'rb'): + return True + except Exception: + self.logger.info(f"File cannot be accessed. Permission denied : {keyFilePath}") + return False + + def validate_MLE_configuration(self): + if self.mleForRequestPublicCertPath and self.mleForRequestPublicCertPath.strip(): + try: + CertificateUtility.validate_path_and_file(self.mleForRequestPublicCertPath, "mleForRequestPublicCertPath", self.log_config) + except Exception as err: + self.logger.error(err) + raise err diff --git a/authenticationsdk/util/Cache.py b/authenticationsdk/util/Cache.py index 37e4620a..6425e07c 100644 --- a/authenticationsdk/util/Cache.py +++ b/authenticationsdk/util/Cache.py @@ -2,7 +2,6 @@ import ssl from jwcrypto import jwk -from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend @@ -10,12 +9,19 @@ from typing_extensions import deprecated +from authenticationsdk.util.CertificateUtility import CertificateUtility from authenticationsdk.util.GlobalLabelParameters import * +import CyberSource.logging.log_factory as LogFactory +import threading +class CertInfo: + def __init__(self, mle_certificate, timestamp): + self.MLECertificate = mle_certificate + self.Timestamp = timestamp class FileCache: - _instance = None + logger = None def __new__(cls, *args, **kwargs): if not cls._instance: @@ -27,6 +33,8 @@ def __new__(cls, *args, **kwargs): def _initialize_cache(self): # Your cache initialization code here self.filecache = {} + self.mlecache = {} + self._cache_lock = threading.RLock() @deprecated("This method has been marked as Deprecated and will be removed in coming releases.") def get_private_key_from_pem(self, pem_file_path): @@ -34,14 +42,14 @@ def get_private_key_from_pem(self, pem_file_path): cert = pem_file.read() private_key = jwk.JWK.from_pem(cert.encode('utf-8')) return private_key - + def load_certificates(self, filepath, filename, password): return pkcs12.load_key_and_certificates( open(os.path.join(filepath, filename) + GlobalLabelParameters.P12_PREFIX, 'rb').read(), password=password.encode(), backend=default_backend() ) - + def get_cert_based_on_key_alias(self, certificate, additional_certificates, key_alias): target_cert = None @@ -67,7 +75,6 @@ def get_cert_based_on_key_alias(self, certificate, additional_certificates, key_ return target_cert - def update_cache(self, mconfig, filepath, filename): file_mod_time = os.stat(os.path.join(filepath, filename) + GlobalLabelParameters.P12_PREFIX).st_mtime private_key, certificate, additional_certificates = self.load_certificates(filepath, filename, mconfig.key_password) @@ -94,3 +101,79 @@ def get_cached_private_key_from_pem(self, file_path, cache_key): private_key = self.get_private_key_from_pem(file_path) self.filecache[str(cache_key)] = [private_key, file_mod_time] return self.filecache[str(cache_key)][0] + + def get_request_mle_cert_from_cache(self, merchant_config): + if FileCache.logger is None: + FileCache.logger = LogFactory.setup_logger(__name__, merchant_config.log_config) + logger = FileCache.logger + + merchant_id = merchant_config.merchant_id + certificate_identifier = None + certificate_file_path = None + + # Priority #1: Get cert from merchantConfig.mle_for_request_public_cert_path if certPath is provided + if merchant_config.mleForRequestPublicCertPath and merchant_config.mleForRequestPublicCertPath.strip(): + certificate_identifier = GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_CONFIG_CERT + certificate_file_path = merchant_config.mleForRequestPublicCertPath + # Priority #2: If mle_for_request_public_cert_path not provided, get mlecert from p12 if provided and jwt auth type + elif (GlobalLabelParameters.JWT.lower() == merchant_config.authentication_type.lower() and merchant_config.p12KeyFilePath): + certificate_identifier = GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_P12_CERT + certificate_file_path = merchant_config.p12KeyFilePath + # Priority #3: Get mlecert from default cert in SDK as per CAS or PROD env. + else: + logger.debug("The certificate to use for MLE for requests is not provided in the merchant configuration. Please ensure that the certificate path is provided.") + return None + + cache_key = f"{merchant_id}_{certificate_identifier}" + mle_certificate = self.get_mle_cert_based_on_cache_key(merchant_config, cache_key, certificate_file_path) + + CertificateUtility.validate_certificate_expiry(mle_certificate, merchant_config.key_alias, certificate_identifier, merchant_config.log_config) + + return mle_certificate + + def get_mle_cert_based_on_cache_key(self, merchant_config, cache_key, certificate_file_path): + with self._cache_lock: + cert_info = self.mlecache.get(cache_key) + + file_timestamp = os.path.getmtime(certificate_file_path) + if cert_info is None or cert_info.Timestamp != file_timestamp: + self.setup_mle_cache(merchant_config, cache_key, certificate_file_path) + cert_info = self.mlecache.get(cache_key) + + return cert_info.MLECertificate if cert_info else None + + def setup_mle_cache(self, merchant_config, cache_key, certificate_file_path): + if FileCache.logger is None: + FileCache.logger = LogFactory.setup_logger(__name__, merchant_config.log_config) + logger = FileCache.logger + + mle_certificate = None + + # Handle PEM certificate case + if cache_key.endswith(GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_CONFIG_CERT): + certificates = CertificateUtility.load_certificates_from_pem_file(certificate_file_path) + try: + mle_certificate = CertificateUtility.get_cert_based_on_key_alias(certificates, merchant_config.mleKeyAlias) + except Exception: + if mle_certificate is None: + file_name = os.path.basename(certificate_file_path) + logger.warning(f"No certificate found for the specified mle_key_alias '{merchant_config.mleKeyAlias}'. Using the first certificate from file {file_name} as the MLE request certificate.") + mle_certificate = certificates[0] # Take first certificate + + # Handle P12 certificate case + if cache_key.endswith(GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_P12_CERT): + try: + certificates = CertificateUtility.fetch_certificate_collection_from_p12_file(merchant_config.p12KeyFilePath, merchant_config.key_password) + mle_certificate = CertificateUtility.get_cert_based_on_key_alias(certificates, merchant_config.mleKeyAlias) + except Exception: + file_name = os.path.basename(merchant_config.p12KeyFilePath) + logger.error(f"No certificate found for the specified mle_key_alias '{merchant_config.mleKeyAlias}' in file {file_name}.") + raise ValueError(f"No certificate found for the specified mle_key_alias '{merchant_config.mleKeyAlias}' in file {file_name}.") + + # Save to cache (thread-safe) + cert_info = CertInfo( + mle_certificate, + os.path.getmtime(certificate_file_path) + ) + with self._cache_lock: + self.mlecache[cache_key] = cert_info \ No newline at end of file diff --git a/authenticationsdk/util/CertificateUtility.py b/authenticationsdk/util/CertificateUtility.py new file mode 100644 index 00000000..8934dff1 --- /dev/null +++ b/authenticationsdk/util/CertificateUtility.py @@ -0,0 +1,131 @@ +import os +import re +from datetime import datetime, timezone +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.x509.oid import NameOID +from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters +import CyberSource.logging.log_factory as LogFactory +from CyberSource.logging.log_configuration import LogConfiguration +from cryptography.hazmat.primitives.serialization import pkcs12 + +class CertificateUtility: + logger = None + + @staticmethod + def validate_path_and_file(file_path, path_type, log_config): + if CertificateUtility.logger is None: + CertificateUtility.logger = LogFactory.setup_logger(__name__, log_config) + logger = CertificateUtility.logger + + if not file_path or not file_path.strip(): + logger.error(f"{path_type} path cannot be null or empty.") + raise ValueError(f"{path_type} path cannot be null or empty.") + + # Normalize Windows-style paths that start with a slash before the drive letter + normalized_path = file_path + if os.sep == '\\' and re.match(r'^/[A-Za-z]:.*', normalized_path): + normalized_path = normalized_path[1:] + + path = normalized_path + + if not os.path.exists(path): + logger.error(f"{path_type} does not exist: {path}") + raise IOError(f"{path_type} does not exist: {path}") + + if os.path.isdir(path): + logger.error(f"{path_type} does not have valid file: {path}") + raise IOError(f"{path_type} does not have valid file: {path}") + + try: + with open(path, "rb"): + return path + except Exception: + logger.error(f"{path_type} is not readable: {path}") + raise IOError(f"{path_type} is not readable: {path}") + + @staticmethod + def validate_certificate_expiry(certificate, key_alias, mle_cache_key_identifier, log_config): + if CertificateUtility.logger is None: + CertificateUtility.logger = LogFactory.setup_logger(__name__, log_config) + logger = CertificateUtility.logger + + warning_message_for_no_expiry_date = "Certificate does not have expiry date" + warning_message_for_certificate_expiring_soon = "Certificate with alias {} is going to expire on {}. Please update the certificate before then." + warning_message_for_expired_certificate = "Certificate with alias {} is expired as of {}. Please update the certificate." + + if GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_CONFIG_CERT == mle_cache_key_identifier: + warning_message_for_no_expiry_date = "Certificate for MLE Requests does not have expiry date from mleForRequestPublicCertPath in merchant configuration." + warning_message_for_certificate_expiring_soon = "Certificate for MLE Requests with alias {} is going to expire on {}. Please update the certificate provided in mleForRequestPublicCertPath in merchant configuration before then." + warning_message_for_expired_certificate = "Certificate for MLE Requests with alias {} is expired as of {}. Please update the certificate provided in mleForRequestPublicCertPath in merchant configuration." + + if GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_P12_CERT == mle_cache_key_identifier: + warning_message_for_no_expiry_date = "Certificate for MLE Requests does not have expiry date in the P12 file." + warning_message_for_certificate_expiring_soon = "Certificate for MLE Requests with alias {} is going to expire on {}. Please update the P12 file before then." + warning_message_for_expired_certificate = "Certificate for MLE Requests with alias {} is expired as of {}. Please update the P12 file." + + # cryptography.x509.Certificate uses .not_valid_after for the expiry date, returns a datetime object + not_after = certificate.not_valid_after_utc + if not_after is None: + logger.warning(warning_message_for_no_expiry_date) + else: + now = datetime.now(timezone.utc) + if not_after < now: + logger.warning(warning_message_for_expired_certificate.format(key_alias, not_after)) + else: + time_to_expire = not_after - now + if time_to_expire.days < GlobalLabelParameters.CERTIFICATE_EXPIRY_DATE_WARNING_DAYS: + logger.warning(warning_message_for_certificate_expiring_soon.format(key_alias, not_after)) + + @staticmethod + def load_certificates_from_pem_file(certificate_file_path): + """ + Load all certificates from a PEM file and return as a list of cryptography.x509.Certificate objects. + """ + certificates = [] + with open(certificate_file_path, 'rb') as f: + pem_data = f.read() + + # Split the file into individual PEM certificates + for cert in pem_data.split(b'-----END CERTIFICATE-----'): + if b'-----BEGIN CERTIFICATE-----' in cert: + cert_block = cert + b'-----END CERTIFICATE-----' + certificate = x509.load_pem_x509_certificate(cert_block, default_backend()) + certificates.append(certificate) + return certificates + + @staticmethod + def get_cert_based_on_key_alias(certs, key_alias): + for cert in certs: + # Extract the Common Name (CN) from the certificate subject + common_names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + if common_names: + cn = common_names[0].value + if cn.lower() == key_alias.lower(): + return cert + raise Exception(f"ERROR : Certificate with alias {key_alias} not found.") + + @staticmethod + def fetch_certificate_collection_from_p12_file(p12_file_path, key_password): + """ + Loads all certificates from a PKCS#12 (.p12/.pfx) file. + + Returns: + List of cryptography.x509.Certificate objects. + """ + with open(p12_file_path, "rb") as f: + p12_data = f.read() + + # key_password should be bytes or None + if key_password is not None: + key_password_bytes = key_password.encode() + else: + key_password_bytes = None + + private_key, cert, additional_certs = pkcs12.load_key_and_certificates(p12_data, key_password_bytes) + certs = [] + if cert is not None: + certs.append(cert) + if additional_certs is not None: + certs.extend(additional_certs) + return certs diff --git a/authenticationsdk/util/GlobalLabelParameters.py b/authenticationsdk/util/GlobalLabelParameters.py index 568958b0..fd80faac 100644 --- a/authenticationsdk/util/GlobalLabelParameters.py +++ b/authenticationsdk/util/GlobalLabelParameters.py @@ -97,3 +97,5 @@ class GlobalLabelParameters: CERTIFICATE_EXPIRY_DATE_WARNING_DAYS = 90 MESSAGE_BEFORE_MLE_REQUEST = "Request before MLE: " MESSAGE_AFTER_MLE_REQUEST = "Request after MLE: " + MLE_CACHE_IDENTIFIER_FOR_CONFIG_CERT = "mleCertFromMerchantConfig" + MLE_CACHE_IDENTIFIER_FOR_P12_CERT = "mleCertFromP12" diff --git a/authenticationsdk/util/MLEUtility.py b/authenticationsdk/util/MLEUtility.py index 45e72a3e..10258640 100644 --- a/authenticationsdk/util/MLEUtility.py +++ b/authenticationsdk/util/MLEUtility.py @@ -1,15 +1,17 @@ import json import time -from datetime import datetime, timezone -from cryptography import x509 -from jwcrypto import jwk, jwe +from cryptography.hazmat.primitives import serialization +from cryptography.x509.oid import NameOID +from jwcrypto import jwe, jwk from authenticationsdk.util.Cache import FileCache from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters import CyberSource.logging.log_factory as LogFactory class MLEUtility: + logger = None + class MLEException(Exception): def __init__(self, message, errors=None): super().__init__(message) @@ -17,7 +19,6 @@ def __init__(self, message, errors=None): @staticmethod def check_is_mle_for_api(merchant_config, is_mle_supported_by_cybs_for_api, operation_ids): - is_mle_for_api = False if is_mle_supported_by_cybs_for_api and merchant_config.get_useMLEGlobally(): is_mle_for_api = True @@ -32,101 +33,81 @@ def check_is_mle_for_api(merchant_config, is_mle_supported_by_cybs_for_api, oper @staticmethod def encrypt_request_payload(merchant_config, request_body): - if request_body is None or request_body == "": - return request_body + if MLEUtility.logger is None: + MLEUtility.logger = LogFactory.setup_logger(__name__, merchant_config.log_config) + logger = MLEUtility.logger - logger = LogFactory.setup_logger(__name__, merchant_config.log_config) - - if merchant_config.log_config.enable_log: - logger.debug(f"{GlobalLabelParameters.MESSAGE_BEFORE_MLE_REQUEST}{request_body}") + if request_body is None: + return None - cert = MLEUtility.get_mle_certificate(merchant_config, logger) + payload = str(request_body) + logger.debug(GlobalLabelParameters.MESSAGE_BEFORE_MLE_REQUEST + ' ' + payload) - try: - serialized_jwe_token = MLEUtility.generate_token(cert, request_body, merchant_config.log_config, logger) - mleRequest = MLEUtility.create_json_object(serialized_jwe_token) - if merchant_config.log_config.enable_log: - logger.debug(f"{GlobalLabelParameters.MESSAGE_AFTER_MLE_REQUEST}{mleRequest}") - return mleRequest + mle_certificate = MLEUtility.get_mle_certificate(merchant_config) - except Exception as e: - if merchant_config.log_config.enable_log: - logger.error(f"Error encrypting request payload.") - raise MLEUtility.MLEException(f"Error encrypting request payload.") + # Special case: MLE Certificate is not currently available for HTTP Signature + if (mle_certificate is None and merchant_config.authentication_type.lower() == GlobalLabelParameters.HTTP.lower()): + logger.debug("The certificate to use for MLE for requests is not provided in the merchant configuration. Please ensure that the certificate path is provided.") + logger.debug("Currently, MLE for requests using HTTP Signature as authentication is not supported by Cybersource. By default, the SDK will fall back to non-encrypted requests.") + return request_body - @staticmethod - def generate_token(cert, request_body, log_config, logger): - public_key = cert.public_key() - serial_number = MLEUtility.extract_serial_number(cert, log_config, logger) + serial_number = MLEUtility.get_serial_number_from_certificate(mle_certificate, merchant_config) + payload_bytes = payload.encode('utf-8') + + # Extract the public RSA key from the certificate + public_key = mle_certificate.public_key() # cryptography.x509.Certificate object - jwk_key = jwk.JWK.from_pyca(public_key) - payload = request_body.encode('utf-8') + # Convert public key to jwk + rsa_pem = public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) + jwk_key = jwk.JWK.from_pem(rsa_pem) - header = { + # Prepare JWE headers + headers = { "alg": "RSA-OAEP-256", "enc": "A256GCM", - "cty": "JWT", "kid": serial_number, "iat": int(time.time()) } - jwe_token = jwe.JWE(plaintext=payload, protected=json.dumps(header)) + # Create the JWE object and encrypt + jwe_token = jwe.JWE(plaintext=payload_bytes, protected=json.dumps(headers)) jwe_token.add_recipient(jwk_key) - return jwe_token.serialize(compact=True) + mle_request = MLEUtility.create_json_object(jwe_token.serialize(compact=True)) + logger.debug(GlobalLabelParameters.MESSAGE_AFTER_MLE_REQUEST + ' ' + str(mle_request)) + + return mle_request @staticmethod - def get_mle_certificate(merchant_config, logger): + def get_mle_certificate(merchant_config): cache_obj = FileCache() - try: - cert_data = cache_obj.grab_file(merchant_config, merchant_config.key_file_path, merchant_config.key_file_name) - mle_certificate_x509 = cert_data[3] - if mle_certificate_x509 is not None: - MLEUtility.validate_certificate_expiry(mle_certificate_x509, merchant_config.get_mleKeyAlias(), merchant_config.log_config, logger) - return mle_certificate_x509 - else: - if merchant_config.log_config.enable_log: - logger.error(f"No certificate found for MLE for given mleKeyAlias {merchant_config.get_mleKeyAlias()} in p12 file {merchant_config.key_file_name}.p12") - raise MLEUtility.MLEException(f"No certificate found for MLE for given mleKeyAlias {merchant_config.get_mleKeyAlias()} in p12 file {merchant_config.key_file_name}.p12") - except KeyError: - if merchant_config.log_config.enable_log: - logger.error(f"No certificate found for MLE for given mleKeyAlias {merchant_config.get_mleKeyAlias()} in p12 file {merchant_config.key_file_name}.p12") - raise MLEUtility.MLEException(f"No certificate found for MLE for given mleKeyAlias {merchant_config.get_mleKeyAlias()} in p12 file {merchant_config.key_file_name}.p12") - except Exception as e: - if merchant_config.log_config.enable_log: - logger.error(f"Unable to load certificate: {str(e)}") - raise MLEUtility.MLEException(f"Unable to load PEM file: {str(e)}") + mle_certificate = cache_obj.get_request_mle_cert_from_cache(merchant_config) + return mle_certificate @staticmethod - def extract_serial_number(x509_certificate, log_config, logger): - serial_number = None + def get_serial_number_from_certificate(certificate, merchant_config): + if MLEUtility.logger is None: + MLEUtility.logger = LogFactory.setup_logger(__name__, merchant_config.log_config) + logger = MLEUtility.logger + + if certificate is None: + raise ValueError("MLE certificate is null") - for attribute in x509_certificate.subject: - if attribute.oid == x509.NameOID.SERIAL_NUMBER: + # Get subject and look for 'serialNumber' + subject = certificate.subject + serial_number = None + for attribute in subject: + if attribute.oid == NameOID.SERIAL_NUMBER: serial_number = attribute.value break - if serial_number is None: - if log_config.enable_log: - logger.warning("Serial number not found in MLE certificate for alias.") - serial_number = str(x509_certificate.serial_number) + + if not serial_number: + logger.warning(f"Serial number not found in MLE certificate for alias {merchant_config.mle_key_alias} in {merchant_config.p12_keyfilepath}.p12") + # Use the hex serial number from the certificate as fallback + return format(certificate.serial_number, 'x') + return serial_number @staticmethod def create_json_object(jwe_token): - return json.dumps({"encryptedRequest": jwe_token}) - - @staticmethod - def validate_certificate_expiry(certificate, key_alias, log_config, logger): - try: - if certificate.not_valid_after_utc < datetime.now(timezone.utc): - if log_config.enable_log: - logger.warning(f"Certificate with MLE alias {key_alias} is expired as of {certificate.not_valid_after_utc}. Please update p12 file.") - # raise Exception(f"Certificate with MLE alias {key_alias} is expired.") - else: - time_to_expire = (certificate.not_valid_after_utc - datetime.now(timezone.utc)).total_seconds() - if time_to_expire < GlobalLabelParameters.CERTIFICATE_EXPIRY_DATE_WARNING_DAYS * 24 * 60 * 60: - if log_config.enable_log: - logger.warning(f"Certificate for MLE with alias {key_alias} is going to expire on {certificate.not_valid_after_utc}. Please update p12 file before that.") - except Exception as e: - if log_config.enable_log: - logger.error(f"Error while checking certificate expiry: {str(e)}") \ No newline at end of file + return json.dumps({"encryptedRequest": jwe_token}) \ No newline at end of file From 92b053e54546e52720c840e778f884423fe97fb4 Mon Sep 17 00:00:00 2001 From: gnongsie Date: Tue, 22 Jul 2025 16:01:31 +0530 Subject: [PATCH 2/3] Fixes for thread safety --- authenticationsdk/core/Authorization.py | 1 - authenticationsdk/jwt/Token.py | 118 +++---------------- authenticationsdk/util/Cache.py | 106 ++++++++--------- authenticationsdk/util/CertificateUtility.py | 2 +- 4 files changed, 64 insertions(+), 163 deletions(-) diff --git a/authenticationsdk/core/Authorization.py b/authenticationsdk/core/Authorization.py index 1e49b1ab..e3ceb301 100644 --- a/authenticationsdk/core/Authorization.py +++ b/authenticationsdk/core/Authorization.py @@ -45,7 +45,6 @@ def get_token(self, mconfig, date_time, logger = None): return sig_token # JWT-Call elif authentication_type.upper() == GlobalLabelParameters.JWT.upper(): - jwt_sig_token = JwtSignatureToken() jwt_sig_token.jwt_signature_token(mconfig, date_time) sig_token_jwt = jwt_sig_token.get_token() diff --git a/authenticationsdk/jwt/Token.py b/authenticationsdk/jwt/Token.py index 070ac7a2..c4325d8e 100644 --- a/authenticationsdk/jwt/Token.py +++ b/authenticationsdk/jwt/Token.py @@ -23,33 +23,24 @@ def get_token(self): def set_token(self): token = "" - if self.jwt_method.upper() == GlobalLabelParameters.GET: - token = self.token_for_get() - elif self.jwt_method.upper() == GlobalLabelParameters.POST: - token = self.token_for_post() - elif self.jwt_method.upper() == GlobalLabelParameters.PUT: - token = self.token_for_put() - elif self.jwt_method.upper() == GlobalLabelParameters.DELETE: - token = self.token_for_delete() - elif self.jwt_method.upper() == GlobalLabelParameters.PATCH: - token = self.token_for_patch() + if self.jwt_method.upper() == GlobalLabelParameters.GET or self.jwt_method.upper() == GlobalLabelParameters.DELETE: + token = self.token_for_get_and_delete() + elif self.jwt_method.upper() == GlobalLabelParameters.POST or self.jwt_method.upper() == GlobalLabelParameters.PUT or self.jwt_method.upper() == GlobalLabelParameters.PATCH: + token = self.token_for_post_and_put_and_patch() return token # This function has the logic to generate token when the Jwt_method is get - def token_for_get(self): - + def token_for_get_and_delete(self): # Setting the jwt body for JWT-get jwt_body = {GlobalLabelParameters.JWT_TIME: self.date} # reading the p12 file from cache memory cache_obj = FileCache() - cache_memory = cache_obj.grab_file(self.merchant_config, self.merchant_config.key_file_path, - self.merchant_config.key_file_name) - der_cert_string = cache_memory[0] - private_key = cache_memory[1] + cache_memory = cache_obj.fetch_cached_certificate(self.merchant_config, self.merchant_config.p12KeyFilePath, self.merchant_config.key_password) + der_cert_string = cache_memory.certificate + private_key = cache_memory.private_key # setting the headers -merchant_id and the public key - headers_jwt = { - GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} + headers_jwt = {GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} public_key_list = ([]) public_key_list.append(der_cert_string.decode("utf-8")) public_key_headers = {GlobalLabelParameters.PUBLIC_KEY: public_key_list} @@ -61,99 +52,20 @@ def token_for_get(self): return encoded_jwt - def token_for_post(self): - + def token_for_post_and_put_and_patch(self): digest_payload_obj = DigestAndPayload() digest = digest_payload_obj.string_digest_generation( self.merchant_config.request_json_path_data) # Setting the jwt body for JWT-post - jwt_body = {GlobalLabelParameters.JWT_DIGEST: digest.decode("utf-8"), GlobalLabelParameters.JWT_ALGORITHM: "SHA-256", - GlobalLabelParameters.JWT_TIME: self.date} - # reading the p12 file from cache memory - cache_obj = FileCache() - cache_memory = cache_obj.grab_file(self.merchant_config, self.merchant_config.key_file_path, - self.merchant_config.key_file_name) - der_cert_string = cache_memory[0] - private_key = cache_memory[1] - # setting the headers -merchant_id and the public key - headers_jwt = { - GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} - public_key_list = ([]) - public_key_list.append(der_cert_string.decode("utf-8")) - public_key_headers = {GlobalLabelParameters.PUBLIC_KEY: public_key_list} - headers_jwt.update(public_key_headers) - # generating the token of jwt - encoded_jwt = jwt.encode(jwt_body, private_key, algorithm='RS256', headers=headers_jwt) - - return encoded_jwt - - def token_for_put(self): - - digest_payload_obj = DigestAndPayload() - digest = digest_payload_obj.string_digest_generation( - self.merchant_config.request_json_path_data) - # Setting the jwt body for JWT-post - jwt_body = {GlobalLabelParameters.JWT_DIGEST: digest.decode("utf-8"), GlobalLabelParameters.JWT_ALGORITHM: "SHA-256", - GlobalLabelParameters.JWT_TIME: self.date} - # reading the p12 file from cache memory - cache_obj = FileCache() - cache_memory = cache_obj.grab_file(self.merchant_config, self.merchant_config.key_file_path, - self.merchant_config.key_file_name) - der_cert_string = cache_memory[0] - private_key = cache_memory[1] - # setting the headers -merchant_id and the public key - headers_jwt = { - GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} - public_key_list = ([]) - public_key_list.append(der_cert_string.decode("utf-8")) - public_key_headers = {GlobalLabelParameters.PUBLIC_KEY: public_key_list} - headers_jwt.update(public_key_headers) - # generating the token of jwt - encoded_jwt = jwt.encode(jwt_body, private_key, algorithm='RS256', headers=headers_jwt) - - return encoded_jwt - - def token_for_patch(self): - - digest_payload_obj = DigestAndPayload() - digest = digest_payload_obj.string_digest_generation( - self.merchant_config.request_json_path_data) - # Setting the jwt body for JWT-post - jwt_body = {GlobalLabelParameters.JWT_DIGEST: digest.decode("utf-8"), - GlobalLabelParameters.JWT_ALGORITHM: "SHA-256", - GlobalLabelParameters.JWT_TIME: self.date} - # reading the p12 file from cache memory - cache_obj = FileCache() - cache_memory = cache_obj.grab_file(self.merchant_config, self.merchant_config.key_file_path, - self.merchant_config.key_file_name) - der_cert_string = cache_memory[0] - private_key = cache_memory[1] - # setting the headers -merchant_id and the public key - headers_jwt = { - GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} - public_key_list = ([]) - public_key_list.append(der_cert_string.decode("utf-8")) - public_key_headers = {GlobalLabelParameters.PUBLIC_KEY: public_key_list} - headers_jwt.update(public_key_headers) - # generating the token of jwt - encoded_jwt = jwt.encode(jwt_body, private_key, algorithm='RS256', headers=headers_jwt) - - return encoded_jwt - - def token_for_delete(self): - - # Setting the jwt body for JWT-get - jwt_body = {GlobalLabelParameters.JWT_TIME: self.date} + jwt_body = {GlobalLabelParameters.JWT_DIGEST: digest.decode("utf-8"), GlobalLabelParameters.JWT_ALGORITHM: "SHA-256", GlobalLabelParameters.JWT_TIME: self.date} # reading the p12 file from cache memory cache_obj = FileCache() - cache_memory = cache_obj.grab_file(self.merchant_config, self.merchant_config.key_file_path, - self.merchant_config.key_file_name) - der_cert_string = cache_memory[0] - private_key = cache_memory[1] + cache_memory = cache_obj.fetch_cached_certificate(self.merchant_config, self.merchant_config.p12KeyFilePath, self.merchant_config.key_password) + der_cert_string = cache_memory.certificate + private_key = cache_memory.private_key # setting the headers -merchant_id and the public key - headers_jwt = { - GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} + headers_jwt = {GlobalLabelParameters.MERCHANT_ID: str(self.merchant_config.key_alias)} public_key_list = ([]) public_key_list.append(der_cert_string.decode("utf-8")) public_key_headers = {GlobalLabelParameters.PUBLIC_KEY: public_key_list} diff --git a/authenticationsdk/util/Cache.py b/authenticationsdk/util/Cache.py index 6425e07c..4def6039 100644 --- a/authenticationsdk/util/Cache.py +++ b/authenticationsdk/util/Cache.py @@ -1,8 +1,10 @@ import base64 import ssl +import threading from jwcrypto import jwk +from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import pkcs12 @@ -15,9 +17,10 @@ import threading class CertInfo: - def __init__(self, mle_certificate, timestamp): - self.MLECertificate = mle_certificate - self.Timestamp = timestamp + def __init__(self, certificate, timestamp, private_key): + self.certificate = certificate + self.timestamp = timestamp + self.private_key = private_key class FileCache: _instance = None @@ -33,8 +36,38 @@ def __new__(cls, *args, **kwargs): def _initialize_cache(self): # Your cache initialization code here self.filecache = {} + self._p12_cache_lock = threading.RLock() self.mlecache = {} - self._cache_lock = threading.RLock() + self._mle_cache_lock = threading.RLock() + + def fetch_cached_certificate(self, merchant_config, p12_file_path, key_password): + try: + file_last_modified_time = os.stat(p12_file_path).st_mtime + filename_without_ext = os.path.splitext(os.path.basename(p12_file_path))[0] + if filename_without_ext not in self.filecache or file_last_modified_time != self.filecache[filename_without_ext].timestamp: + self.update_cache(merchant_config, p12_file_path, key_password) + return self.filecache[filename_without_ext] + except Exception: + raise ValueError(f"ERROR : KeyPassword provided: {key_password} is incorrect.") + + def update_cache(self, merchant_config, p12_file_path, key_password): + file_last_modified_time = os.stat(p12_file_path).st_mtime + filename_without_ext = os.path.splitext(os.path.basename(p12_file_path))[0] + private_key, certificate_list = CertificateUtility.fetch_certificate_collection_from_p12_file(p12_file_path, key_password) + + jwt_certificate = CertificateUtility.get_cert_based_on_key_alias(certificate_list, merchant_config.key_alias) + jwt_certificate_pem = jwt_certificate.public_bytes(serialization.Encoding.PEM) + jwt_certificate_pem_unicode_str = jwt_certificate_pem.decode('utf-8') + jwt_certificate_der_format = base64.b64encode(ssl.PEM_cert_to_DER_cert(jwt_certificate_pem_unicode_str)) + + certificate_information = CertInfo( + jwt_certificate_der_format, + file_last_modified_time, + private_key + ) + + with self._p12_cache_lock: + self.filecache[str(filename_without_ext)] = certificate_information @deprecated("This method has been marked as Deprecated and will be removed in coming releases.") def get_private_key_from_pem(self, pem_file_path): @@ -50,50 +83,6 @@ def load_certificates(self, filepath, filename, password): backend=default_backend() ) - def get_cert_based_on_key_alias(self, certificate, additional_certificates, key_alias): - target_cert = None - - # Check the main certificate - for attribute in certificate.subject: - if attribute.oid.dotted_string == '2.5.4.3': # OID for CN - cn_value = attribute.value - if cn_value == key_alias: - target_cert = certificate - break - - # Check the additional certificates if not found in the main certificate - if not target_cert: - for cert in additional_certificates: - for attribute in cert.subject: - if attribute.oid.dotted_string == '2.5.4.3': # OID for CN - cn_value = attribute.value - if cn_value == key_alias: - target_cert = cert - break - if target_cert: - break - - return target_cert - - def update_cache(self, mconfig, filepath, filename): - file_mod_time = os.stat(os.path.join(filepath, filename) + GlobalLabelParameters.P12_PREFIX).st_mtime - private_key, certificate, additional_certificates = self.load_certificates(filepath, filename, mconfig.key_password) - - jwt_cert= self.get_cert_based_on_key_alias(certificate, additional_certificates, mconfig.key_alias) - jwt_cert_pem = jwt_cert.public_bytes(serialization.Encoding.PEM) - jwt_cert_pem_str = jwt_cert_pem.decode('utf-8') - jwt_der_cert_string = base64.b64encode(ssl.PEM_cert_to_DER_cert(jwt_cert_pem_str)) - - mle_cert = self.get_cert_based_on_key_alias(certificate, additional_certificates, mconfig.get_mleKeyAlias()) - - self.filecache[str(filename)] = [jwt_der_cert_string, private_key, file_mod_time, mle_cert] - - def grab_file(self, mconfig, filepath, filename): - file_mod_time = os.stat(os.path.join(filepath, filename) + GlobalLabelParameters.P12_PREFIX).st_mtime - if filename not in self.filecache or file_mod_time != self.filecache[filename][2]: - self.update_cache(mconfig, filepath, filename) - return self.filecache[filename] - @deprecated("This method has been marked as Deprecated and will be removed in coming releases.") def get_cached_private_key_from_pem(self, file_path, cache_key): file_mod_time = os.stat(file_path).st_mtime @@ -132,15 +121,14 @@ def get_request_mle_cert_from_cache(self, merchant_config): return mle_certificate def get_mle_cert_based_on_cache_key(self, merchant_config, cache_key, certificate_file_path): - with self._cache_lock: - cert_info = self.mlecache.get(cache_key) + cert_info = self.mlecache.get(cache_key) - file_timestamp = os.path.getmtime(certificate_file_path) - if cert_info is None or cert_info.Timestamp != file_timestamp: - self.setup_mle_cache(merchant_config, cache_key, certificate_file_path) - cert_info = self.mlecache.get(cache_key) + file_timestamp = os.path.getmtime(certificate_file_path) + if cert_info is None or cert_info.timestamp != file_timestamp: + self.setup_mle_cache(merchant_config, cache_key, certificate_file_path) + cert_info = self.mlecache.get(cache_key) - return cert_info.MLECertificate if cert_info else None + return cert_info.certificate if cert_info else None def setup_mle_cache(self, merchant_config, cache_key, certificate_file_path): if FileCache.logger is None: @@ -148,6 +136,7 @@ def setup_mle_cache(self, merchant_config, cache_key, certificate_file_path): logger = FileCache.logger mle_certificate = None + private_key = None # Handle PEM certificate case if cache_key.endswith(GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_CONFIG_CERT): @@ -163,7 +152,7 @@ def setup_mle_cache(self, merchant_config, cache_key, certificate_file_path): # Handle P12 certificate case if cache_key.endswith(GlobalLabelParameters.MLE_CACHE_IDENTIFIER_FOR_P12_CERT): try: - certificates = CertificateUtility.fetch_certificate_collection_from_p12_file(merchant_config.p12KeyFilePath, merchant_config.key_password) + private_key, certificates = CertificateUtility.fetch_certificate_collection_from_p12_file(merchant_config.p12KeyFilePath, merchant_config.key_password) mle_certificate = CertificateUtility.get_cert_based_on_key_alias(certificates, merchant_config.mleKeyAlias) except Exception: file_name = os.path.basename(merchant_config.p12KeyFilePath) @@ -173,7 +162,8 @@ def setup_mle_cache(self, merchant_config, cache_key, certificate_file_path): # Save to cache (thread-safe) cert_info = CertInfo( mle_certificate, - os.path.getmtime(certificate_file_path) + os.path.getmtime(certificate_file_path), + private_key ) - with self._cache_lock: + with self._mle_cache_lock: self.mlecache[cache_key] = cert_info \ No newline at end of file diff --git a/authenticationsdk/util/CertificateUtility.py b/authenticationsdk/util/CertificateUtility.py index 8934dff1..c2dbb5ca 100644 --- a/authenticationsdk/util/CertificateUtility.py +++ b/authenticationsdk/util/CertificateUtility.py @@ -128,4 +128,4 @@ def fetch_certificate_collection_from_p12_file(p12_file_path, key_password): certs.append(cert) if additional_certs is not None: certs.extend(additional_certs) - return certs + return private_key, certs From 8ee7d1ee48938180b16a5d38f716e9c97a6faa27 Mon Sep 17 00:00:00 2001 From: gnongsie Date: Wed, 23 Jul 2025 20:18:12 +0530 Subject: [PATCH 3/3] Fixed name for method to clarify functionality --- authenticationsdk/jwt/Token.py | 4 ++-- authenticationsdk/util/Cache.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/authenticationsdk/jwt/Token.py b/authenticationsdk/jwt/Token.py index c4325d8e..38fb87af 100644 --- a/authenticationsdk/jwt/Token.py +++ b/authenticationsdk/jwt/Token.py @@ -36,7 +36,7 @@ def token_for_get_and_delete(self): jwt_body = {GlobalLabelParameters.JWT_TIME: self.date} # reading the p12 file from cache memory cache_obj = FileCache() - cache_memory = cache_obj.fetch_cached_certificate(self.merchant_config, self.merchant_config.p12KeyFilePath, self.merchant_config.key_password) + cache_memory = cache_obj.fetch_cached_p12_certificate(self.merchant_config, self.merchant_config.p12KeyFilePath, self.merchant_config.key_password) der_cert_string = cache_memory.certificate private_key = cache_memory.private_key # setting the headers -merchant_id and the public key @@ -61,7 +61,7 @@ def token_for_post_and_put_and_patch(self): jwt_body = {GlobalLabelParameters.JWT_DIGEST: digest.decode("utf-8"), GlobalLabelParameters.JWT_ALGORITHM: "SHA-256", GlobalLabelParameters.JWT_TIME: self.date} # reading the p12 file from cache memory cache_obj = FileCache() - cache_memory = cache_obj.fetch_cached_certificate(self.merchant_config, self.merchant_config.p12KeyFilePath, self.merchant_config.key_password) + cache_memory = cache_obj.fetch_cached_p12_certificate(self.merchant_config, self.merchant_config.p12KeyFilePath, self.merchant_config.key_password) der_cert_string = cache_memory.certificate private_key = cache_memory.private_key # setting the headers -merchant_id and the public key diff --git a/authenticationsdk/util/Cache.py b/authenticationsdk/util/Cache.py index 4def6039..3320290b 100644 --- a/authenticationsdk/util/Cache.py +++ b/authenticationsdk/util/Cache.py @@ -40,7 +40,7 @@ def _initialize_cache(self): self.mlecache = {} self._mle_cache_lock = threading.RLock() - def fetch_cached_certificate(self, merchant_config, p12_file_path, key_password): + def fetch_cached_p12_certificate(self, merchant_config, p12_file_path, key_password): try: file_last_modified_time = os.stat(p12_file_path).st_mtime filename_without_ext = os.path.splitext(os.path.basename(p12_file_path))[0]