From 29abf332fb6d1a824a7b80283c27c8767a50834c Mon Sep 17 00:00:00 2001 From: zblurx Date: Tue, 27 May 2025 16:32:03 +0200 Subject: [PATCH 1/2] add custom cbt_value --- impacket/ldap/ldap.py | 52 +++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/impacket/ldap/ldap.py b/impacket/ldap/ldap.py index ee598204ae..0ad789ad62 100644 --- a/impacket/ldap/ldap.py +++ b/impacket/ldap/ldap.py @@ -104,6 +104,7 @@ def __init__(self, url, baseDN='', dstIp=None, signing=True): raise LDAPSessionError(errorString="Unknown URL prefix: '%s'" % url) self.__binded = False + self.__channel_binding_value = None ### SASL Auth LDAP Signing arguments self.sequenceNumber = 0 @@ -141,26 +142,25 @@ def __init__(self, url, baseDN='', dstIp=None, signing=True): self._socket.connect(sa) self._socket.do_handshake() - def generateChannelBindingValue(self): - # From: https://github.com/ly4k/ldap3/commit/87f5760e5a68c2f91eac8ba375f4ea3928e2b9e0#diff-c782b790cfa0a948362bf47d72df8ddd6daac12e5757afd9d371d89385b27ef6R1383 - from hashlib import md5 - # Ugly but effective, to get the digest of the X509 DER in bytes - peer_cert_digest_str = self._socket.get_peer_certificate().digest('sha256').decode() - peer_cert_digest_bytes = bytes.fromhex(peer_cert_digest_str.replace(':', '')) - - channel_binding_struct = b'' - initiator_address = b'\x00'*8 - acceptor_address = b'\x00'*8 - - # https://datatracker.ietf.org/doc/html/rfc5929#section-4 - application_data_raw = b'tls-server-end-point:' + peer_cert_digest_bytes - len_application_data = len(application_data_raw).to_bytes(4, byteorder='little', signed = False) - application_data = len_application_data - application_data += application_data_raw - channel_binding_struct += initiator_address - channel_binding_struct += acceptor_address - channel_binding_struct += application_data - return md5(channel_binding_struct).digest() + # From: https://github.com/ly4k/ldap3/commit/87f5760e5a68c2f91eac8ba375f4ea3928e2b9e0#diff-c782b790cfa0a948362bf47d72df8ddd6daac12e5757afd9d371d89385b27ef6R1383 + from hashlib import md5 + # Ugly but effective, to get the digest of the X509 DER in bytes + peer_cert_digest_str = self._socket.get_peer_certificate().digest('sha256').decode() + peer_cert_digest_bytes = bytes.fromhex(peer_cert_digest_str.replace(':', '')) + + channel_binding_struct = b'' + initiator_address = b'\x00'*8 + acceptor_address = b'\x00'*8 + + # https://datatracker.ietf.org/doc/html/rfc5929#section-4 + application_data_raw = b'tls-server-end-point:' + peer_cert_digest_bytes + len_application_data = len(application_data_raw).to_bytes(4, byteorder='little', signed = False) + application_data = len_application_data + application_data += application_data_raw + channel_binding_struct += initiator_address + channel_binding_struct += acceptor_address + channel_binding_struct += application_data + self.__channel_binding_value = md5(channel_binding_struct).digest() def kerberosLogin(self, user, password, domain='', lmhash='', nthash='', aesKey='', kdcHost=None, TGT=None, TGS=None, useCache=True): @@ -268,8 +268,8 @@ def kerberosLogin(self, user, password, domain='', lmhash='', nthash='', aesKey= # If TLS is used, setup channel binding - if self._SSL: - chkField['Bnd'] = self.generateChannelBindingValue() + if self._SSL and self.__channel_binding_value is not None: + chkField['Bnd'] = self.__channel_binding_value if self.__signing: chkField['Flags'] |= GSS_C_CONF_FLAG chkField['Flags'] |= GSS_C_INTEG_FLAG @@ -372,8 +372,8 @@ def login(self, user='', password='', domain='', lmhash='', nthash='', authentic # If TLS is used, setup channel binding channel_binding_value = b'' - if self._SSL: - channel_binding_value = self.generateChannelBindingValue() + if self._SSL and self.__channel_binding_value is not None: + channel_binding_value = self.__channel_binding_value # NTLM Auth type3, exportedSessionKey = getNTLMSSPType3(negotiate, bytes(type2), user, password, domain, lmhash, nthash, channel_binding_value=channel_binding_value) @@ -418,8 +418,8 @@ def login(self, user='', password='', domain='', lmhash='', nthash='', authentic # channel binding channel_binding_value = b'' - if self._SSL: - channel_binding_value = self.generateChannelBindingValue() + if self._SSL and self.__channel_binding_value is not None: + channel_binding_value = self.__channel_binding_value # NTLM Auth type3, exportedSessionKey = getNTLMSSPType3(negotiate, type2, user, password, domain, lmhash, nthash, service='ldap', version=self.version, use_ntlmv2=True, channel_binding_value=channel_binding_value) From 1c1b9382f5a5b53679dc03e9248dce799bfc2f34 Mon Sep 17 00:00:00 2001 From: zblurx Date: Wed, 20 Aug 2025 18:02:40 +0200 Subject: [PATCH 2/2] add example script to check LDAP status --- examples/CheckLDAPStatus.py | 128 ++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 examples/CheckLDAPStatus.py diff --git a/examples/CheckLDAPStatus.py b/examples/CheckLDAPStatus.py new file mode 100644 index 0000000000..64a79ce979 --- /dev/null +++ b/examples/CheckLDAPStatus.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# Impacket - Collection of Python classes for working with network protocols. +# +# Copyright Fortra, LLC and its affiliated companies +# +# All rights reserved. +# +# This software is provided under a slightly modified version +# of the Apache Software License. See the accompanying LICENSE file +# for more information. +# +# Description: +# Check LDAP signing status and LDAPS channel binding status. +# First, the script use the given domain controller IP and domain +# name to resolve all the domain controllers. Then the checks are +# performed against all domain controllers. +# +# Author: +# Thomas Seigneuret (@zblurx) + +import argparse +import logging +import sys +from dns.resolver import Resolver +from OpenSSL.SSL import SysCallError +from impacket import version +from impacket.examples import logger +from impacket.ldap.ldap import LDAPConnection, LDAPSessionError + +class CheckLDAP: + def __init__(self, domain, dc_ip, timeout): + self.domain = domain + self.dc_ip = dc_ip + self.timeout = timeout + + def list_dc(self): + dc_list = [] + resolver = Resolver() + resolver.timeout = self.timeout + resolver.nameservers = [self.dc_ip] + dc_query = resolver.resolve( + f"_ldap._tcp.dc._msdcs.{self.domain}", 'SRV', tcp=True) + for dc in dc_query: + dc_list.append(str(dc.target).rstrip(".")) + return dc_list + + def run(self): + dc_list = self.list_dc() + logging.info(f"Found {len(dc_list)} domain controller(s) in {self.domain}") + for dc in dc_list: + signing_required = self.check_ldap_signing(dc) + channel_binding_status = self.check_ldaps_cbt(dc) + print(f"Hostname: {dc}\n\t> LDAP Signing Required: {signing_required}\n\t> LDAPS Channel Binding Status: {channel_binding_status}") + + def check_ldaps_cbt(self, hostname): + cbt_status = "Never" + ldap_url = f"ldaps://{hostname}" + try: + ldap_connection = LDAPConnection(url=ldap_url) + ldap_connection._LDAPConnection__channel_binding_value = None + ldap_connection.login(user=" ", domain=self.domain) + except LDAPSessionError as e: + if str(e).find("data 80090346") >= 0: + cbt_status = "Always" # CBT is Required + # Login failed (wrong credentials). test if we get an error with an existing, but wrong CBT -> When supported + elif str(e).find("data 52e") >= 0: + ldap_connection = LDAPConnection(url=ldap_url) + new_cbv = bytearray(ldap_connection._LDAPConnection__channel_binding_value) + new_cbv[15] = (new_cbv[3] + 1) % 256 + ldap_connection._LDAPConnection__channel_binding_value = bytes(new_cbv) + try: + ldap_connection.login(user=" ", domain=self.domain) + except LDAPSessionError as e: + if str(e).find("data 80090346") >= 0: + logging.debug(f"LDAPS channel binding is set to 'When Supported' on host {hostname}") + cbt_status = "When Supported" # CBT is When Supported + else: + logging.debug(f"LDAPSessionError while checking for channel binding requirements (likely NTLM disabled): {e!s}") + except SysCallError as e: + logging.debug(f"Received SysCallError when trying to enumerate channel binding support: {e!s}") + if e.args[1] in ["ECONNRESET", "WSAECONNRESET", "Unexpected EOF"]: + cbt_status = "No TLS cert" + else: + raise + return cbt_status + + def check_ldap_signing(self, hostname): + signing_required = False + ldap_url = f"ldap://{hostname}" + try: + ldap_connection = LDAPConnection(url=ldap_url, signing=False) + ldap_connection.login(domain=self.domain) + logging.debug(f"LDAP signing is not enforced on {hostname}") + except LDAPSessionError as e: + if str(e).find("strongerAuthRequired") >= 0: + logging.debug(f"LDAP signing is enforced on {hostname}") + signing_required = True + else: + logging.debug(f"LDAPSessionError while checking for signing requirements (likely NTLM disabled): {e!s}") + return signing_required + +if __name__ == '__main__': + + print(version.BANNER) + + parser = argparse.ArgumentParser(add_help = True, description = "LDAP signing and channel binding enumeration utility.") + parser.add_argument('-dc-ip', required=True, action='store', metavar="ip address", help='IP Address of a domain controller or a DNS resolver for the domain.') + parser.add_argument('-domain', required=True, action='store', help='') + parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') + parser.add_argument('-timeout', action='store', type=int, default=15, help='DNS timeout') + parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + options = parser.parse_args() + logger.init(options.ts, options.debug) + + try: + dumper = CheckLDAP(options.domain, options.dc_ip, options.timeout) + logging.info(f"Targeted domain: {options.domain}") + dumper.run() + except Exception as e: + if logging.getLogger().level == logging.DEBUG: + import traceback + traceback.print_exc() + logging.error(str(e)) \ No newline at end of file