Skip to content

Inbound mle for http signature #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion authenticationsdk/core/Authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def get_token(self, mconfig, date_time, logger = None):
return sig_token
# JWT-Call
elif authentication_type.upper() == GlobalLabelParameters.JWT.upper():

jwt_sig_token = JwtSignatureToken()
jwt_sig_token.jwt_signature_token(mconfig, date_time)
sig_token_jwt = jwt_sig_token.get_token()
Expand Down
117 changes: 95 additions & 22 deletions authenticationsdk/core/MerchantConfiguration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
from CyberSource.logging.log_configuration import LogConfiguration
from authenticationsdk.util.CertificateUtility import CertificateUtility
from authenticationsdk.util.GlobalLabelParameters import *
from wsgiref.handlers import format_date_time
from datetime import datetime
Expand Down Expand Up @@ -52,9 +54,12 @@ def __init__(self):
self.__jwePEMFileDirectory = None
self.useMLEGlobally = None
self.mapToControlMLEonAPI = None
self.mleKeyAlias = None
self.mleKeyAlias = None
self.mleForRequestPublicCertPath = None
self.p12KeyFilePath = None
self.logger = LogFactory.setup_logger(self.__class__.__name__)

#region Getters and Setters
def set_merchant_keyid(self, value):
if not (value.get('merchant_keyid') is None):
self.merchant_keyid = value['merchant_keyid']
Expand Down Expand Up @@ -88,7 +93,7 @@ def set_use_metakey(self, value):
self.use_metakey = value['use_metakey']
else:
self.use_metakey = False

def set_portfolio_id(self, value):
if not (value.get('portfolio_id') is None):
self.portfolio_id = value['portfolio_id']
Expand Down Expand Up @@ -138,7 +143,7 @@ def set_enable_client_cert(self, value):
self.enable_client_cert = value['enable_client_cert']
else:
self.enable_client_cert = False

def set_client_cert_dir(self, value):
if not (value.get('client_cert_dir') is None):
self.client_cert_dir = value['client_cert_dir']
Expand Down Expand Up @@ -183,7 +188,7 @@ def set_jwePEMFileDirectory(self, value):

def get_jwePEMFileDirectory(self):
return self.__jwePEMFileDirectory

def set_useMLEGlobally(self, value):
if not (value.get('useMLEGlobally') is None):
self.useMLEGlobally = value['useMLEGlobally']
Expand All @@ -192,7 +197,7 @@ def set_useMLEGlobally(self, value):

def get_useMLEGlobally(self):
return self.useMLEGlobally

def set_mapToControlMLEonAPI(self, value):
map_to_control_mle_on_api = value.get('mapToControlMLEonAPI')
if map_to_control_mle_on_api is not None:
Expand All @@ -203,7 +208,7 @@ def set_mapToControlMLEonAPI(self, value):

def get_mapToControlMLEonAPI(self):
return self.mapToControlMLEonAPI

def set_mleKeyAlias(self, value):
if value.get('mleKeyAlias') is not None and value.get('mleKeyAlias').strip():
self.mleKeyAlias = value['mleKeyAlias'].strip()
Expand All @@ -213,6 +218,25 @@ def set_mleKeyAlias(self, value):
def get_mleKeyAlias(self):
return self.mleKeyAlias

def set_mleForRequestPublicCertPath(self, value):
if value.get('mleForRequestPublicCertPath') is not None and value.get('mleForRequestPublicCertPath').strip():
self.mleForRequestPublicCertPath = value['mleForRequestPublicCertPath'].strip()
else:
self.mleForRequestPublicCertPath = None

def get_mleForRequestPublicCertPath(self):
return self.mleForRequestPublicCertPath

def set_p12KeyFilePath(self, value):
if value.get('p12KeyFilePath') is not None and value.get('p12KeyFilePath').strip():
self.p12KeyFilePath = value['p12KeyFilePath'].strip()
else:
self.p12KeyFilePath = None

def get_p12KeyFilePath(self):
return self.p12KeyFilePath

#endregion

# This method sets the Merchant Configuration Variables to its respective values after reading from cybs.properties
def set_merchantconfig(self, val):
Expand Down Expand Up @@ -247,6 +271,7 @@ def set_merchantconfig(self, val):
self.set_jwePEMFileDirectory(val)
self.set_useMLEGlobally(val)
self.set_mapToControlMLEonAPI(val)
self.set_mleForRequestPublicCertPath(val)
self.set_mleKeyAlias(val)

# Returns the time in format as defined by RFC7231
Expand Down Expand Up @@ -281,12 +306,12 @@ def validate_merchant_details(self, details, mconfig = None):
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.CLIENT_CERT_DIR_EMPTY,
self.log_config)

if self.ssl_client_cert is None or self.ssl_client_cert == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.SSL_CLIENT_CERT_EMPTY,
self.log_config)

if self.private_key is None or self.private_key == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.PRIVATE_KEY_EMPTY,
Expand All @@ -302,7 +327,7 @@ def validate_merchant_details(self, details, mconfig = None):
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.MERCHANTID_REQ,
self.log_config)

if self.merchant_keyid is None or self.merchant_keyid == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.MERCHANT_KEY_ID_REQ,
Expand All @@ -318,7 +343,7 @@ def validate_merchant_details(self, details, mconfig = None):
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.MERCHANTID_REQ,
self.log_config)

if self.key_alias is None or self.key_alias == "":
self.key_alias = self.merchant_id
authenticationsdk.util.ExceptionAuth.validate_default_values(self.logger,
Expand Down Expand Up @@ -349,23 +374,26 @@ def validate_merchant_details(self, details, mconfig = None):
GlobalLabelParameters.KEY_FILE_EMPTY,
self.log_config)

elif self.authentication_type.lower() == GlobalLabelParameters.OAUTH.lower():
if not self.check_key_file():
authenticationsdk.util.ExceptionAuth.log_exception(self.logger, f"Error finding or accessing the Key Directory or Key File. Please review the values in the merchant configuration.", self.log_config)

elif self.authentication_type.lower() == GlobalLabelParameters.OAUTH.lower():
if self.access_token is None or self.access_token == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.ACCESS_TOKEN_EMPTY,
self.log_config)

if self.ssl_client_cert is None or self.ssl_client_cert == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.REFRESH_TOKEN_EMPTY,
self.log_config)
elif self.authentication_type.lower() == GlobalLabelParameters.MUTUAL_AUTH.lower():

elif self.authentication_type.lower() == GlobalLabelParameters.MUTUAL_AUTH.lower():
if self.client_id is None or self.client_id == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.CLIENT_ID_EMPTY,
self.log_config)

if self.client_secret is None or self.client_secret == "":
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.CLIENT_SECRET_EMPTY,
Expand All @@ -374,6 +402,7 @@ def validate_merchant_details(self, details, mconfig = None):
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
GlobalLabelParameters.AUTH_ERROR,
self.log_config)

# useMLEGlobally check for auth Type
if self.useMLEGlobally is True or self.mapToControlMLEonAPI is not None:
if self.useMLEGlobally is True and self.authentication_type.lower() != GlobalLabelParameters.JWT.lower():
Expand All @@ -388,14 +417,58 @@ def validate_merchant_details(self, details, mconfig = None):
GlobalLabelParameters.MLE_AUTH_ERROR,
self.log_config)

self.validate_MLE_configuration()
self.p12KeyFilePath = os.path.join(self.key_file_path, self.key_file_name) + GlobalLabelParameters.P12_PREFIX

log_items = GlobalLabelParameters.HIDE_MERCHANT_CONFIG_PROPS
# This displays the logic for logging all cybs.json values
details_copy = copy.deepcopy(details)
if self.log_config.enable_log is True:
for key, value in list(details.items()):
for key, value in list(details_copy.items()):
if key in log_items:
del details[key]

for keys, values in list(details.items()):
details[keys] = str(values)

self.logger.info("Mconfig > " + str(ast.literal_eval(json.dumps(details))))
del details_copy[key]

for keys, values in list(details_copy.items()):
details_copy[keys] = str(values)

self.logger.info("Mconfig > " + str(ast.literal_eval(json.dumps(details_copy))))

def check_key_file(self):
if not(self.key_file_name and self.key_file_name.strip()):
self.logger.error("Key Filename not provided. Assigning the value of Merchant ID")
if self.merchant_id and self.merchant_id.strip():
self.key_file_name = self.merchant_id

if not(self.key_file_path and self.key_file_path.strip()):
self.key_file_path = GlobalLabelParameters.DEFAULT_KEY_FILE_PATH
self.logger.error(f"Keys Directory not provided. Using Default Path: {self.key_file_path}")

# Directory exists?
if not os.path.isdir(self.key_file_path):
self.logger.error(f"Keys Directory not found. Entered directory : {self.key_file_path}")
return False

keyFilePath = os.path.join(self.key_file_path, self.key_file_name) + GlobalLabelParameters.P12_PREFIX

# File exists?
if not os.path.isfile(keyFilePath):
self.logger.error(f"Key File not found. Check path/filename entered. Entered path/filename : {keyFilePath}")
return False

self.logger.info(f"Entered value for Key File Path : {keyFilePath}")

# Can file be opened for reading?
try:
with open(keyFilePath, 'rb'):
return True
except Exception:
self.logger.info(f"File cannot be accessed. Permission denied : {keyFilePath}")
return False

def validate_MLE_configuration(self):
if self.mleForRequestPublicCertPath and self.mleForRequestPublicCertPath.strip():
try:
CertificateUtility.validate_path_and_file(self.mleForRequestPublicCertPath, "mleForRequestPublicCertPath", self.log_config)
except Exception as err:
self.logger.error(err)
raise err
Loading