Skip to content

Commit fb69de8

Browse files
authored
Refactor web authentication (#223)
* Refactor web authentication - Replace regex with HTML parsers - Move doWebAuth to VWWebSession - Split doWebAuth into multiple methods - Add Terms and Conditions form handling - add `acceptTermsOnLogin` switch
1 parent 4fa17b5 commit fb69de8

File tree

6 files changed

+314
-540
lines changed

6 files changed

+314
-540
lines changed

weconnect/auth/auth_util.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,116 @@
1+
import json
2+
import re
3+
from html.parser import HTMLParser
4+
5+
16
def addBearerAuthHeader(token, headers=None):
27
headers = headers or {}
38
headers['Authorization'] = f'Bearer {token}'
49
return headers
10+
11+
12+
class HTMLFormParser(HTMLParser):
13+
def __init__(self, form_id):
14+
super().__init__()
15+
self._form_id = form_id
16+
self._inside_form = False
17+
self.target = None
18+
self.data = {}
19+
20+
def _get_attr(self, attrs, name):
21+
for attr in attrs:
22+
if attr[0] == name:
23+
return attr[1]
24+
return None
25+
26+
def handle_starttag(self, tag, attrs):
27+
if self._inside_form and tag == 'input':
28+
self.handle_input(attrs)
29+
return
30+
31+
if tag == 'form' and self._get_attr(attrs, 'id') == self._form_id:
32+
self._inside_form = True
33+
self.target = self._get_attr(attrs, 'action')
34+
35+
def handle_endtag(self, tag):
36+
if tag == 'form' and self._inside_form:
37+
self._inside_form = False
38+
39+
def handle_input(self, attrs):
40+
if not self._inside_form:
41+
return
42+
43+
name = self._get_attr(attrs, 'name')
44+
value = self._get_attr(attrs, 'value')
45+
46+
if name:
47+
self.data[name] = value
48+
49+
50+
class ScriptFormParser(HTMLParser):
51+
fields = []
52+
targetField = ''
53+
54+
def __init__(self):
55+
super().__init__()
56+
self._inside_script = False
57+
self.data = {}
58+
self.target = None
59+
60+
def handle_starttag(self, tag, attrs):
61+
if not self._inside_script and tag == 'script':
62+
self._inside_script = True
63+
64+
def handle_endtag(self, tag):
65+
if self._inside_script and tag == 'script':
66+
self._inside_script = False
67+
68+
def handle_data(self, data):
69+
if not self._inside_script:
70+
return
71+
72+
match = re.search(r'templateModel: (.*?),\n', data)
73+
if not match:
74+
return
75+
76+
result = json.loads(match.group(1))
77+
self.target = result.get(self.targetField, None)
78+
self.data = {k: v for k, v in result.items() if k in self.fields}
79+
80+
match2 = re.search(r'csrf_token: \'(.*?)\'', data)
81+
if match2:
82+
self.data['_csrf'] = match2.group(1)
83+
84+
85+
class CredentialsFormParser(ScriptFormParser):
86+
fields = ['relayState', 'hmac', 'registerCredentialsPath', 'error', 'errorCode']
87+
targetField = 'postAction'
88+
89+
90+
class TermsAndConditionsFormParser(ScriptFormParser):
91+
fields = ['relayState', 'hmac', 'countryOfResidence', 'legalDocuments']
92+
targetField = 'loginUrl'
93+
94+
def handle_data(self, data):
95+
if not self._inside_script:
96+
return
97+
98+
super().handle_data(data)
99+
100+
if 'countryOfResidence' in self.data:
101+
self.data['countryOfResidence'] = self.data['countryOfResidence'].upper()
102+
103+
if 'legalDocuments' not in self.data:
104+
return
105+
106+
for key in self.data['legalDocuments'][0]:
107+
# Skip unnecessary keys
108+
if key in ('skipLink', 'declineLink', 'majorVersion', 'minorVersion', 'changeSummary'):
109+
continue
110+
111+
# Move values under a new key while converting boolean values to 'yes' or 'no'
112+
v = self.data['legalDocuments'][0][key]
113+
self.data[f'legalDocuments[0].{key}'] = ('yes' if v else 'no') if isinstance(v, bool) else v
114+
115+
# Remove the original object
116+
del self.data['legalDocuments']

weconnect/auth/my_cupra_session.py

Lines changed: 2 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,18 @@
1-
from typing import Dict, Optional, Match
1+
from typing import Dict
22

3-
import re
43
import json
54
import logging
65
import requests
76

8-
from urllib.parse import parse_qsl, urlsplit
9-
10-
from urllib3.util.retry import Retry
11-
from requests.adapters import HTTPAdapter
12-
137
from oauthlib.common import to_unicode
148
from oauthlib.oauth2 import InsecureTransportError
159
from oauthlib.oauth2 import is_secure_transport
1610

1711
from requests.models import CaseInsensitiveDict
1812
from weconnect.auth.openid_session import AccessType
1913

20-
2114
from weconnect.auth.vw_web_session import VWWebSession
22-
from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError
15+
from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError
2316

2417

2518
LOG = logging.getLogger("weconnect")
@@ -55,172 +48,6 @@ def refresh(self):
5548
'https://identity.vwgroup.io/oidc/v1/token',
5649
)
5750

58-
def doWebAuth(self, authorizationUrl): # noqa: C901
59-
websession: requests.Session = requests.Session()
60-
retries = Retry(total=self.retries,
61-
backoff_factor=0.1,
62-
status_forcelist=[500],
63-
raise_on_status=False)
64-
websession.proxies.update(self.proxies)
65-
websession.mount('https://', HTTPAdapter(max_retries=retries))
66-
websession.headers = CaseInsensitiveDict({
67-
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148',
68-
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
69-
'accept-language': 'en-US,en;q=0.9',
70-
'accept-encoding': 'gzip, deflate, br'
71-
})
72-
while True:
73-
loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False)
74-
if loginFormResponse.status_code == requests.codes['ok']:
75-
break
76-
elif loginFormResponse.status_code == requests.codes['found']:
77-
if 'Location' in loginFormResponse.headers:
78-
authorizationUrl = loginFormResponse.headers['Location']
79-
else:
80-
raise APICompatibilityError('Forwarding without Location in Header')
81-
elif loginFormResponse.status_code == requests.codes['internal_server_error']:
82-
raise RetrievalError('Temporary server error during login')
83-
else:
84-
raise APICompatibilityError('Retrieving credentials page was not successfull,'
85-
f' status code: {loginFormResponse.status_code}')
86-
87-
# Find login form on page to obtain inputs
88-
emailFormRegex = r'<form.+id=\"emailPasswordForm\".*action=\"(?P<formAction>[^\"]+)\"[^>]*>' \
89-
r'(?P<formContent>.+?(?=</form>))</form>'
90-
match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL)
91-
if match is None:
92-
raise APICompatibilityError('No login email form found')
93-
# retrieve target url from form
94-
target: str = match.groupdict()['formAction']
95-
96-
# Find all inputs and put those in formData dictionary
97-
inputRegex = r'<input[\\n\\r\s][^/]*name=\"(?P<name>[^\"]+)\"([\\n\\r\s]value=\"(?P<value>[^\"]+)\")?[^/]*/>'
98-
formData: Dict[str, str] = {}
99-
for match in re.finditer(inputRegex, match.groupdict()['formContent']):
100-
if match.groupdict()['name']:
101-
formData[match.groupdict()['name']] = match.groupdict()['value']
102-
if not all(x in ['_csrf', 'relayState', 'hmac', 'email'] for x in formData):
103-
raise APICompatibilityError('Could not find all required input fields in login page')
104-
105-
# Set email to the provided username
106-
formData['email'] = self.sessionuser.username
107-
108-
# build url from form action
109-
login2Url: str = 'https://identity.vwgroup.io' + target
110-
111-
loginHeadersForm: CaseInsensitiveDict = websession.headers.copy()
112-
loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded'
113-
114-
# Post form content and retrieve credentials page
115-
login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True)
116-
117-
if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101
118-
if login2Response.status_code == requests.codes['internal_server_error']:
119-
raise RetrievalError('Temporary server error during login')
120-
raise APICompatibilityError('Retrieving credentials page was not successfull,'
121-
f' status code: {login2Response.status_code}')
122-
123-
credentialsTemplateRegex = r'<script>\s+window\._IDK\s+=\s+\{\s' \
124-
r'(?P<templateModel>.+?(?=\s+\};?\s+</script>))\s+\};?\s+</script>'
125-
match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL)
126-
if match is None:
127-
raise APICompatibilityError('No credentials form found')
128-
if match.groupdict()['templateModel']:
129-
lineRegex = r'\s*(?P<name>[^\:]+)\:\s+[\'\{]?(?P<value>.+)[\'\}][,]?'
130-
form2Data: Dict[str, str] = {}
131-
for match in re.finditer(lineRegex, match.groupdict()['templateModel']):
132-
if match.groupdict()['name'] == 'templateModel':
133-
templateModelString = '{' + match.groupdict()['value'] + '}'
134-
if templateModelString.endswith(','):
135-
templateModelString = templateModelString[:-len(',')]
136-
templateModel = json.loads(templateModelString)
137-
if 'relayState' in templateModel:
138-
form2Data['relayState'] = templateModel['relayState']
139-
if 'hmac' in templateModel:
140-
form2Data['hmac'] = templateModel['hmac']
141-
if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']:
142-
form2Data['email'] = templateModel['emailPasswordForm']['email']
143-
if 'error' in templateModel and templateModel['error'] is not None:
144-
if templateModel['error'] == 'validator.email.invalid':
145-
raise AuthentificationError('Error during login, email invalid')
146-
raise AuthentificationError(f'Error during login: {templateModel["error"]}')
147-
if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register':
148-
raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist')
149-
if 'errorCode' in templateModel:
150-
raise AuthentificationError('Error during login, is the username correct?')
151-
if 'postAction' in templateModel:
152-
target = templateModel['postAction']
153-
else:
154-
raise APICompatibilityError('Form does not contain postAction')
155-
elif match.groupdict()['name'] == 'csrf_token':
156-
form2Data['_csrf'] = match.groupdict()['value']
157-
form2Data['password'] = self.sessionuser.password
158-
if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data):
159-
raise APICompatibilityError('Could not find all required input fields in login page')
160-
161-
login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}'
162-
163-
# Post form content and retrieve userId in forwarding Location
164-
login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False)
165-
if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']):
166-
if login3Response.status_code == requests.codes['internal_server_error']:
167-
raise RetrievalError('Temporary server error during login')
168-
raise APICompatibilityError('Forwarding expected (status code 302),'
169-
f' but got status code {login3Response.status_code}')
170-
if 'Location' not in login3Response.headers:
171-
raise APICompatibilityError('No url for forwarding in response headers')
172-
173-
# Parse parametes from forwarding url
174-
params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query))
175-
176-
# Check if error
177-
if 'error' in params and params['error']:
178-
errorMessages: Dict[str, str] = {
179-
'login.errors.password_invalid': 'Password is invalid',
180-
'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some'
181-
' minutes until a new login attempt is possible'
182-
}
183-
if params['error'] in errorMessages:
184-
error = errorMessages[params['error']]
185-
else:
186-
error = params['error']
187-
raise AuthentificationError(error)
188-
189-
# Check for user id
190-
if 'userId' not in params or not params['userId']:
191-
if 'updated' in params and params['updated'] == 'dataprivacy':
192-
raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions')
193-
raise APICompatibilityError('No user id provided')
194-
self.userId = params['userId'] # pylint: disable=unused-private-member
195-
196-
# Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#'
197-
afterLoginUrl: str = login3Response.headers['Location']
198-
199-
consentURL = None
200-
while True:
201-
if 'consent' in afterLoginUrl:
202-
consentURL = afterLoginUrl
203-
afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE)
204-
if afterLoginResponse.status_code == requests.codes['internal_server_error']:
205-
raise RetrievalError('Temporary server error during login')
206-
207-
if 'Location' not in afterLoginResponse.headers:
208-
if consentURL is not None:
209-
raise AuthentificationError('It seems like you need to accept the terms and conditions for the MyCupra service.'
210-
f' Try to visit the URL "{consentURL}" or log into the MyCupra smartphone app')
211-
raise APICompatibilityError('No Location for forwarding in response headers')
212-
213-
afterLoginUrl = afterLoginResponse.headers['Location']
214-
215-
if afterLoginUrl.startswith(self.redirect_uri):
216-
break
217-
218-
if afterLoginUrl.startswith(self.redirect_uri + '#'):
219-
queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?')
220-
else:
221-
queryurl = afterLoginUrl
222-
return queryurl
223-
22451
def fetchTokens(
22552
self,
22653
token_url,

0 commit comments

Comments
 (0)