|
2 | 2 |
|
3 | 3 | from typing import Dict, Any
|
4 | 4 | import logging
|
| 5 | +from pathlib import Path |
5 | 6 |
|
6 | 7 |
|
7 | 8 | import pytest
|
8 | 9 | import cryptography.hazmat.primitives.serialization.pkcs12
|
9 | 10 | from async_asgi_testclient import TestClient # pylint: disable=import-error
|
| 11 | +from libpvarki.mtlshelp.csr import async_create_keypair, async_create_client_csr |
| 12 | +from libadvian.testhelpers import nice_tmpdir # pylint: disable=unused-import |
10 | 13 |
|
11 | 14 | from rasenmaeher_api.rmsettings import RMSettings
|
12 | 15 |
|
13 | 16 | LOGGER = logging.getLogger(__name__)
|
14 | 17 |
|
| 18 | +# pylint: disable=W0621 |
| 19 | + |
15 | 20 |
|
16 | 21 | # GENERATE VERIFICATEION CODE
|
17 | 22 | @pytest.mark.asyncio(loop_scope="session")
|
@@ -604,3 +609,55 @@ async def test_enroll_with_invite_code( # pylint: disable=R0915
|
604 | 609 | resp.raise_for_status()
|
605 | 610 |
|
606 | 611 | del unauth_client_session.headers["Authorization"]
|
| 612 | + |
| 613 | + |
| 614 | +# ENROLL WITH CSR (and invite-code |
| 615 | +@pytest.mark.asyncio(loop_scope="session") |
| 616 | +async def test_enroll_with_csr( # pylint: disable=R0915, R0914 |
| 617 | + tilauspalvelu_jwt_admin_client: TestClient, unauth_client_session: TestClient, nice_tmpdir: str |
| 618 | +) -> None: |
| 619 | + """test enrolling with CSR""" |
| 620 | + tempdir = Path(nice_tmpdir) |
| 621 | + resp = await tilauspalvelu_jwt_admin_client.post("/api/v1/enrollment/invitecode/create") |
| 622 | + resp_dict: Dict[Any, Any] = resp.json() |
| 623 | + LOGGER.debug(resp_dict) |
| 624 | + inv_code = resp_dict["invite_code"] |
| 625 | + assert resp.status_code == 200 |
| 626 | + assert inv_code != "" |
| 627 | + |
| 628 | + callsign = "csrroller" |
| 629 | + privkeyfile = Path(tempdir) / "user.key" |
| 630 | + pubkeyfile = Path(tempdir) / "user.pub" |
| 631 | + csrfile = Path(tempdir) / "user.csr" |
| 632 | + ckp = await async_create_keypair(privkeyfile, pubkeyfile) |
| 633 | + csrpem = await async_create_client_csr(ckp, csrfile, {"CN": callsign}) |
| 634 | + |
| 635 | + json_dict: Dict[Any, Any] = {"invite_code": inv_code, "callsign": callsign, "csr": csrpem} |
| 636 | + resp = await unauth_client_session.post("/api/v1/enrollment/invitecode/enroll", json=json_dict) |
| 637 | + resp_dict = resp.json() |
| 638 | + LOGGER.debug(resp_dict) |
| 639 | + assert resp.status_code == 200 |
| 640 | + assert resp_dict["jwt"] != "" |
| 641 | + assert resp_dict["approvecode"] != "" |
| 642 | + user_jwt = resp_dict["jwt"] |
| 643 | + user_ac = resp_dict["approvecode"] |
| 644 | + |
| 645 | + # Accept the enrollment |
| 646 | + json_dict = {"callsign": callsign, "approvecode": user_ac} |
| 647 | + resp = await tilauspalvelu_jwt_admin_client.post("/api/v1/enrollment/accept", json=json_dict) |
| 648 | + resp_dict = resp.json() |
| 649 | + LOGGER.debug(resp_dict) |
| 650 | + assert resp.status_code == 200 |
| 651 | + |
| 652 | + # Fetch the PFX |
| 653 | + unauth_client_session.headers.clear() |
| 654 | + unauth_client_session.headers.update({"Authorization": f"Bearer {user_jwt}"}) |
| 655 | + resp = await unauth_client_session.get(f"/api/v1/enduserpfx/{callsign}.pfx") |
| 656 | + resp.raise_for_status() |
| 657 | + pfxdata = cryptography.hazmat.primitives.serialization.pkcs12.load_pkcs12(resp.content, callsign.encode("utf-8")) |
| 658 | + assert not pfxdata.key |
| 659 | + assert pfxdata.additional_certs[0] |
| 660 | + cert = pfxdata.additional_certs[0] |
| 661 | + assert cert.friendly_name |
| 662 | + assert cert.friendly_name.decode("utf-8") == callsign |
| 663 | + # TODO: check extensions |
0 commit comments