Skip to content
149 changes: 149 additions & 0 deletions anta/tests/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from datetime import datetime, timezone
from ipaddress import IPv4Address
from typing import ClassVar

from pydantic import BaseModel, Field, model_validator
Expand Down Expand Up @@ -654,3 +655,151 @@ def test(self) -> None:

if failed_log != f"{acl_name}:\n":
self.result.is_failure(f"{failed_log}")


class VerifyIPSecConnHealth(AntaTest):
"""
Verifies all IPv4 security connections.

Expected Results
----------------
* Success: The test will pass if all the IPv4 security connections are established in all vrf.
* Failure: The test will fail if IPv4 security is not configured or any of IPv4 security connections are not established in any vrf.

Examples
--------
```yaml
anta.tests.security:
- VerifyIPSecConnHealth:
```
"""

name = "VerifyIPSecConnHealth"
description = "Verifies all IPv4 security connections."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip security connection vrf all")]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyIPSecConnHealth."""
self.result.is_success()
failure_conn = []
command_output = self.instance_commands[0].json_output["connections"]

# Check if IP security connection is configured
if not command_output:
self.result.is_failure("IPv4 security connection are not configured.")
return

# Iterate over all ip sec connection
for connection, conn_data in command_output.items():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
failure_conn.append(connection)
if failure_conn:
failure_msg = "\n".join(failure_conn)
self.result.is_failure(f"Following IPv4 security connections are not establised:\n{failure_msg}.")


class VerifySpecificIPSecConn(AntaTest):
"""
Verifies IPv4 security connections state for a peer.

Expected Results
----------------
* Success: The test passes if the IPv4 security connection for a peer is established in the specified VRF.
* Failure: The test fails if IPv4 security is not configured, a connection is not found for a peer, or the connection is not established in the specified VRF.

Examples
--------
```yaml
anta.tests.security:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2
```
"""

name = "VerifySpecificIPSecConn"
description = "Verifies IPv4 security connections for a peer."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}")]

class Input(AntaTest.Input):
"""Input model for the VerifySpecificIPSecConn test."""

ip_security_connections: list[IPSecPeers]
"""List of IP4v security peers."""

class IPSecPeers(BaseModel):
"""Details of IPv4 security peers."""

peer: IPv4Address
"""IPv4 address of the peer."""

vrf: str = "default"
"""This is the optional VRF for the IP security peer. It defaults to `default` if not provided."""

connections: list[IPSecConn] | None = None
"""Optional list of IPv4 security connections of a peer."""

class IPSecConn(BaseModel):
"""Details of IPv4 security connections for a peer."""

source_address: IPv4Address
"""Source address of the connection."""
destination_address: IPv4Address
"""Destination address of the connection."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each input IP Sec connection."""
return [template.render(peer=conn.peer, vrf=conn.vrf) for conn in self.inputs.ip_security_connections]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySpecificIPSecConn."""
self.result.is_success()
for command_output, input_peer in zip(self.instance_commands, self.inputs.ip_security_connections):
conn_output = command_output.json_output["connections"]
peer = command_output.params["peer"]
connections = input_peer.connections

# Check if IPv4 security connection is configured
if not conn_output:
self.result.is_failure(f"IPv4 security connections are not configured for peer `{peer}`.")
return

# If connection details are not provided then check all connections of a peer
if connections is None:
for connection, conn_data in conn_output.items():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
self.result.is_failure(
f"Expected state of IPv4 security connection `{connection}` for peer `{peer}` is `Established` " f"but found `{state}` instead."
)
continue

# Create a dictionary of existing connections for faster lookup
existing_connections = {
(conn_data.get("saddr"), conn_data.get("daddr")): next(iter(conn_data["pathDict"].values())) for conn_data in conn_output.values()
}
for connection in connections:
source = str(connection.source_address)
destination = str(connection.destination_address)

if (source, destination) in existing_connections:
existing_state = existing_connections[(source, destination)]
if existing_state != "Established":
self.result.is_failure(
f"Expected state of IPv4 security connection `{source}-{destination}` for peer `{peer}` is `Established` "
f"but found `{existing_state}` instead."
)
else:
self.result.is_failure(f"IPv4 security connection `{source}-{destination}` for peer `{peer}` is not found.")
11 changes: 11 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ anta.tests.security:
action: permit icmp any any
- sequence: 20
action: permit tcp any any range 5900 5910
- VerifyIPSecConnHealth:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2

anta.tests.services:
- VerifyHostname:
Expand Down
Loading