Skip to content

Commit 0fafec3

Browse files
committed
Add test
1 parent 390c8a1 commit 0fafec3

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

oras/tests/auth/__init__.py

Whitespace-only changes.

oras/tests/auth/test_auth.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from types import SimpleNamespace
2+
3+
import boto3
4+
5+
import oras.auth.utils as auth_utils
6+
from oras.auth.ecr import EcrAuth
7+
from oras.auth.token import TokenAuth
8+
9+
10+
class DummyResponse:
11+
"""Bare-minimum imitation of requests.Response for testing."""
12+
13+
def __init__(self, headers):
14+
self.headers = headers
15+
16+
17+
def test_no_auth_header(monkeypatch):
18+
auth = EcrAuth()
19+
original = DummyResponse(headers={})
20+
21+
out_headers, retry = auth.authenticate_request(original, {})
22+
23+
assert retry is False
24+
assert out_headers == {}
25+
assert auth._tokens == {}
26+
27+
28+
def test_fallback_to_super(monkeypatch):
29+
monkeypatch.setattr(
30+
auth_utils,
31+
"parse_auth_header",
32+
lambda _: SimpleNamespace(service="not-ecr", realm="https://foo/"),
33+
)
34+
35+
called = {}
36+
37+
def fake_super(self, original, headers, refresh):
38+
called["yes"] = True
39+
return {}, True
40+
41+
monkeypatch.setattr(TokenAuth, "authenticate_request", fake_super)
42+
43+
auth = EcrAuth()
44+
resp = DummyResponse({"Www-Authenticate": 'Bearer realm="https://foo/"'})
45+
result = auth.authenticate_request(resp, {})
46+
47+
assert called == {"yes": True}
48+
assert result == ({}, True)
49+
50+
51+
def test_ecr_token_flow(monkeypatch):
52+
region = "eu-west-1"
53+
account = "123456789012"
54+
realm = f"https://{account}.dkr.ecr.{region}.amazonaws.com/"
55+
ww_auth = f'Bearer realm="{realm}",service="ecr.amazonaws.com"'
56+
57+
monkeypatch.setattr(
58+
auth_utils,
59+
"parse_auth_header",
60+
lambda _: SimpleNamespace(service="ecr.amazonaws.com", realm=realm),
61+
)
62+
63+
tokens_given = []
64+
65+
def fake_boto3_client(service, region_name):
66+
assert service == "ecr"
67+
assert region_name == region
68+
69+
token = f"token{len(tokens_given)+1}"
70+
tokens_given.append(token)
71+
72+
class FakeEcr:
73+
def get_authorization_token(self):
74+
return {"authorizationData": [{"authorizationToken": token}]}
75+
76+
return FakeEcr()
77+
78+
monkeypatch.setattr(boto3, "client", fake_boto3_client)
79+
80+
auth = EcrAuth()
81+
resp = DummyResponse({"Www-Authenticate": ww_auth})
82+
83+
# 1st call – always triggers AWS client
84+
hdrs1, retry1 = auth.authenticate_request(resp, {})
85+
assert retry1 is True
86+
assert hdrs1["Authorization"] == "Basic token1"
87+
assert tokens_given == ["token1"]
88+
89+
# 2nd call
90+
hdrs2, retry2 = auth.authenticate_request(resp, {}, refresh=False)
91+
92+
assert len(tokens_given) == 1
93+
assert hdrs2["Authorization"] == "Basic token1"
94+
95+
# 3rd call
96+
hdrs3, retry3 = auth.authenticate_request(resp, {}, refresh=True)
97+
98+
assert len(tokens_given) == 2
99+
assert hdrs3["Authorization"] == "Basic token2"
100+
101+
# Cache should contain the most recent token for the realm
102+
assert auth._tokens[realm] == "token2"

0 commit comments

Comments
 (0)