Skip to content

RadSec Client

RadSec is a TCP/TLS stream transport. RadSecClient reuses its TLS connection by default so multiple send_packet() calls can share the same connection. This is the recommended mode for normal RadSec use.

Use reuse_connection=False only as a legacy/compatibility escape hatch when a deployment specifically needs one TLS connection per packet, such as for interoperability debugging, short-lived scripts, or a peer that cannot handle multiple RADIUS packets on one TLS stream:

client = RadSecClient(
    server="127.0.0.1",
    secret=b"radsec",
    dict=dictionary,
    certfile="certs/client/client.cert.pem",
    keyfile="certs/client/client.key.pem",
    certfile_server="certs/ca/ca.cert.pem",
    reuse_connection=False,
)

The existing timeout value is used for connection establishment, writing, and waiting for each response packet. If a reusable connection fails, the client closes it, waits reconnect_backoff seconds, and retries up to retries times:

client = RadSecClient(
    server="127.0.0.1",
    secret=b"radsec",
    dict=dictionary,
    certfile="certs/client/client.cert.pem",
    keyfile="certs/client/client.key.pem",
    certfile_server="certs/ca/ca.cert.pem",
    retries=3,
    timeout=5,
    reconnect_backoff=0.25,
)

When you are done with a reusable client, close it explicitly or use it as an async context manager:

async with RadSecClient(...) as client:
    reply = await client.send_packet(request)

RadSecClient automatically adds Message-Authenticator to outgoing Access-Request packets that contain EAP-Message.

Use create_status_packet() for RFC 5997 Status-Server health checks. The request automatically includes the mandatory Message-Authenticator:

request = client.create_status_packet()
reply = await client.send_packet(request)

The UDP Status-Server example (examples/status.py) talks to a normal RADIUS server on UDP/1812. To check a RadSec server such as examples/server_radsec.py, use the TLS/TCP example instead:

make status_radsec

RadSecClient

Source code in pyrad2/radsec/client.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
class RadSecClient:
    DEFAULT_MINIMUM_TLS_VERSION = ssl.TLSVersion.TLSv1_2

    def __init__(
        self,
        server: str = "127.0.0.1",
        port: int = 2083,
        secret: bytes = b"radsec",
        dict=None,
        retries: int = 3,
        timeout: int = 5,
        certfile: str = "certs/client/client.cert.pem",
        keyfile: str = "certs/client/client.key.pem",
        certfile_server: str = "certs//ca/ca.cert.pem",
        check_hostname: bool = True,
        minimum_tls_version: ssl.TLSVersion = DEFAULT_MINIMUM_TLS_VERSION,
        ciphers: Optional[str] = None,
        allowed_server_fingerprints: Optional[Iterable[str]] = None,
        reuse_connection: bool = True,
        reconnect_backoff: float = 0.25,
        radius_versions: Sequence[RadiusVersion] = (RadiusVersion.V1_0,),
    ):
        """Initializes a RadSec client.

        Args:
            server (str): IP address to connect to.
            port (int): RadSec port, defaults to 2083.
            secret (bytes): Secret. Defaults to radsec as per RFC 6614.
                Different implementations support setting an arbitrary
                shared secret but if you want to stick to the RFC,
                the shared secret must be `radsec`.
            dict (Dictionary): RADIUS dictionary to use.
            certfile (str): Path to client SSL certificate
            keyfile (str): Path to client SSL certificate
            certfile_server (str): Path to server SSL certificate
            check_hostname (bool): Validate the server certificate name.
            minimum_tls_version (ssl.TLSVersion): Lowest TLS version to negotiate.
            ciphers (str): Optional OpenSSL cipher string override.
            allowed_server_fingerprints (Iterable[str]): Optional SHA-256 certificate
                fingerprint allowlist for the server certificate.
            reuse_connection (bool): Reuse the TLS connection for multiple packets.
            reconnect_backoff (float): Seconds to wait before retrying after a
                connection or read failure.
            radius_versions (Sequence[RadiusVersion]): RFC 9765 protocol
                versions to advertise via ALPN. Defaults to ``(V1_0,)`` —
                identical handshake behavior to historic RadSec. Pass
                ``(V1_0, V1_1)`` to advertise both; the server picks the
                highest mutually supported version. **Experimental.**

        """
        self.server = server
        self.port = port
        self.secret = secret
        self.retries = retries
        self.timeout = timeout
        self.dict = dict
        self.reuse_connection = reuse_connection
        self.reconnect_backoff = reconnect_backoff
        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._io_lock = asyncio.Lock()

        self.allowed_server_fingerprints = {
            normalize_cert_fingerprint(fingerprint)
            for fingerprint in (allowed_server_fingerprints or [])
        }
        self.radius_versions: tuple[RadiusVersion, ...] = tuple(radius_versions)
        if not self.radius_versions:
            raise ValueError("radius_versions must contain at least one entry")
        # RFC 9765 §3.4: RADIUS/1.1 requires TLS 1.3+. Auto-promote the
        # configured floor when v1.1 is advertised.
        minimum_tls_version = enforce_tls_version_floor(
            minimum_tls_version, self.radius_versions
        )
        # Negotiated post-handshake. _token_counter is only meaningful for v1.1.
        self._negotiated_version: RadiusVersion = RadiusVersion.V1_0
        self._token_counter: TokenCounter | None = None
        # Last fatal error from send_packet, exposed so callers can tell a
        # strict-mode negotiation refusal apart from a normal timeout/no-reply
        # (both currently surface as ``send_packet`` returning ``None``).
        # Cleared at the start of each send_packet call.
        self.last_error: Exception | None = None

        self.setup_ssl(
            certfile,
            keyfile,
            certfile_server,
            check_hostname,
            minimum_tls_version,
            ciphers,
        )

    def setup_ssl(
        self,
        certfile: str,
        keyfile: str,
        certfile_server: str,
        check_hostname: bool,
        minimum_tls_version: ssl.TLSVersion,
        ciphers: Optional[str],
    ):
        try:
            self.ssl_ctx = ssl.create_default_context(
                ssl.Purpose.SERVER_AUTH, cafile=certfile_server
            )

            self.ssl_ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
        except FileNotFoundError as e:
            ssl_paths = ", ".join([certfile, keyfile, certfile_server])
            msg = "One or more SSL files could not be found. Current paths: {}"
            logger.error(msg, ssl_paths)
            raise FileNotFoundError(msg.format(ssl_paths)) from e

        self.ssl_ctx.check_hostname = check_hostname
        self.ssl_ctx.minimum_version = minimum_tls_version
        if ciphers is not None:
            self.ssl_ctx.set_ciphers(ciphers)

        # RFC 9765 §3.1: advertise the configured RADIUS protocol versions.
        # No-op when only V1_0 is configured.
        apply_alpn(self.ssl_ctx, self.radius_versions)

    def _verify_server_fingerprint(self, writer: asyncio.StreamWriter) -> bool:
        """Verify the connected server certificate against the fingerprint allowlist.

        If no fingerprints were configured, the certificate trust decision is
        left to Python's TLS verification.
        """
        if not self.allowed_server_fingerprints:
            return True

        ssl_object = writer.get_extra_info("ssl_object")
        if ssl_object is None:
            return False

        cert = ssl_object.getpeercert(binary_form=True)
        if cert is None:
            return False

        return cert_fingerprint_matches(cert, self.allowed_server_fingerprints)

    @staticmethod
    def _writer_is_closing(writer: asyncio.StreamWriter | None) -> bool:
        """Return whether a stream writer is absent or already closing."""
        if writer is None:
            return True
        is_closing = getattr(writer, "is_closing", None)
        if is_closing is None:
            return False
        return is_closing()

    async def _close_writer(self, writer: asyncio.StreamWriter | None) -> None:
        """Close a stream writer and wait until the close completes."""
        if writer is None:
            return
        writer.close()
        await writer.wait_closed()

    async def close(self) -> None:
        """Close any reusable RadSec connection held by the client."""
        writer = self._writer
        self._reader = None
        self._writer = None
        # Negotiated version + Token counter are per-connection; clear them.
        self._negotiated_version = RadiusVersion.V1_0
        self._token_counter = None
        await self._close_writer(writer)

    async def __aenter__(self) -> "RadSecClient":
        """Return this client for use as an async context manager."""
        return self

    async def __aexit__(self, exc_type, exc, traceback) -> None:
        """Close the reusable RadSec connection when leaving a context manager."""
        await self.close()

    async def _open_connection(
        self,
    ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
        """Open and validate a TLS connection to the RadSec server."""
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(self.server, self.port, ssl=self.ssl_ctx),
            timeout=self.timeout,
        )

        ssl_object = writer.get_extra_info("ssl_object")
        selected_alpn = (
            ssl_object.selected_alpn_protocol() if ssl_object is not None else None
        )
        try:
            self._negotiated_version = negotiate(self.radius_versions, selected_alpn)
        except NoCommonRadiusVersion as exc:
            # RFC 9765 §3.3: a strict-mode client (no v1.0 in
            # radius_versions) must not silently downgrade. Close the
            # half-open connection and surface a clean failure.
            await self._close_writer(writer)
            raise PacketError(
                "No common RADIUS protocol version with RadSec server: " + str(exc)
            ) from exc
        self._token_counter = (
            TokenCounter()
            if self._negotiated_version == RadiusVersion.V1_1
            else None
        )

        logger.info(
            "Connected to RADSEC server on {}:{} (ALPN={}, RADIUS/{})",
            self.server,
            self.port,
            selected_alpn or "none",
            "1.1" if self._negotiated_version == RadiusVersion.V1_1 else "1.0",
        )

        if not self._verify_server_fingerprint(writer):
            await self._close_writer(writer)
            raise PacketError("Server certificate fingerprint is not allowed")

        return reader, writer

    async def _ensure_connection(
        self,
    ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
        """Return an existing reusable connection or open a new one."""
        if (
            self.reuse_connection
            and self._reader is not None
            and not self._writer_is_closing(self._writer)
        ):
            assert self._writer is not None
            return self._reader, self._writer

        await self.close()
        self._reader, self._writer = await self._open_connection()
        return self._reader, self._writer

    async def _write_packet(
        self, writer: asyncio.StreamWriter, packet: PacketImplementation
    ) -> None:
        """Write one RADIUS packet to the RadSec stream within the client timeout."""
        self._stamp_radius_version(packet)
        self._prepare_outgoing_packet(packet)
        writer.write(packet.request_packet())
        await asyncio.wait_for(writer.drain(), timeout=self.timeout)

    def _stamp_radius_version(self, packet: PacketImplementation) -> None:
        """Tag an outgoing packet with the negotiated RADIUS version.

        Always overwrites ``packet.radius_version`` so a packet that was
        previously serialized under a different negotiated version (for
        example after a reconnect that dropped from v1.1 back to v1.0)
        gets re-serialized correctly on retry rather than carrying its
        prior v1.1 state — Token, zero Identifier, plaintext password —
        onto a v1.0 wire format.

        For v1.1 we also stamp a fresh Token — done exactly once per
        packet so a retry reuses the same Token (hitting the server's
        RFC 5080 dedup cache). The Token lives in its own slot,
        distinct from ``packet.authenticator``, so any prior v1.0 flow
        that populated authenticator (e.g. pw_crypt) can't leak random
        bytes into the v1.1 Reserved-2 region (RFC 9765 §4.1).
        """
        packet.radius_version = self._negotiated_version
        if self._negotiated_version == RadiusVersion.V1_1:
            if packet.token is None and self._token_counter is not None:
                packet.token = self._token_counter.next()
        else:
            # Clear any leftover v1.1 Token so the v1.0 serializer doesn't
            # see it; the v1.0 path computes its own authenticator anyway,
            # so the Token slot has no meaning here.
            packet.token = None

    def _prepare_outgoing_packet(self, packet: PacketImplementation) -> None:
        """Apply Message-Authenticator policy before a packet is sent."""
        prepare_request_message_authenticator(packet)

    async def _read_packet(self, reader: asyncio.StreamReader) -> bytes:
        """Read one RADIUS packet from the RadSec stream within the client timeout."""
        return await asyncio.wait_for(read_radius_packet(reader), timeout=self.timeout)

    async def _send_packet_once(self, packet: PacketImplementation) -> Optional[Packet]:
        """Send one RADIUS packet over the current connection strategy."""
        reader: asyncio.StreamReader
        writer: asyncio.StreamWriter | None = None

        if self.reuse_connection:
            reader, writer = await self._ensure_connection()
        else:
            reader, writer = await self._open_connection()

        try:
            await self._write_packet(writer, packet)
            response = await self._read_packet(reader)

            logger.info("Received {} bytes from server", len(response))
            logger.debug("Response: {}", response.hex())

            reply = packet.create_reply(packet=response)
            if packet.verify_reply(reply, response):
                return reply

            raise PacketError("Received invalid RADSEC reply")
        finally:
            if not self.reuse_connection:
                await self._close_writer(writer)

    def create_auth_packet(self, **kwargs) -> AuthPacket:
        """Create a new RADIUS packet.
        This utility function creates a new RADIUS packet which can
        be used to communicate with the RADIUS server this client
        talks to. This is initializing the new packet with the
        dictionary and secret used for the client.

        Returns:
            Packet: A new AuthPacket instance
        """
        id = kwargs.pop("id", Packet.create_id())
        return AuthPacket(
            dict=self.dict,
            id=id,
            secret=self.secret,
            **kwargs,
        )

    def create_acct_packet(self, **kwargs) -> AcctPacket:
        """Create a new RADIUS packet.
        This utility function creates a new RADIUS packet which can
        be used to communicate with the RADIUS server this client
        talks to. This is initializing the new packet with the
        dictionary and secret used for the client.

        Returns:
            Packet: A new AcctPacket instance
        """
        id = kwargs.pop("id", Packet.create_id())
        return AcctPacket(
            id=id,
            dict=self.dict,
            secret=self.secret,
            **kwargs,
        )

    def create_coa_packet(self, **kwargs) -> CoAPacket:
        """Create a new RADIUS packet.
        This utility function creates a new RADIUS packet which can
        be used to communicate with the RADIUS server this client
        talks to. This is initializing the new packet with the
        dictionary and secret used for the client.

        Returns:
            Packet: A new CoA packet instance
        """
        id = kwargs.pop("id", Packet.create_id())
        return CoAPacket(id=id, dict=self.dict, secret=self.secret, **kwargs)

    def create_status_packet(self, **kwargs) -> StatusPacket:
        """Create an RFC 5997 Status-Server health-check packet."""
        id = kwargs.pop("id", Packet.create_id())
        return StatusPacket(id=id, dict=self.dict, secret=self.secret, **kwargs)

    def create_packet(self, id, **kwargs) -> Packet:
        """Create a generic RADIUS packet with this client's dictionary and secret."""
        return Packet(id=id, dict=self.dict, secret=self.secret, **kwargs)

    async def _send_packet(self, packet: PacketImplementation) -> Optional[Packet]:
        """Send a packet to a RadSec server with timeout and reconnect handling.

        Args:
            packet (Packet): The packet to send
        """
        self.last_error = None
        attempts = max(1, self.retries)
        retryable_errors = (
            asyncio.IncompleteReadError,
            asyncio.TimeoutError,
            ConnectionError,
            EOFError,
            OSError,
        )

        async with self._io_lock:
            for attempt in range(attempts):
                try:
                    return await self._send_packet_once(packet)
                except PacketError as exc:
                    # Most PacketErrors here are non-retryable handshake-level
                    # failures: ALPN refused downgrade, certificate fingerprint
                    # mismatch, or a malformed server reply. Stash the cause
                    # so callers can distinguish them from "no reply received"
                    # (which leaves last_error as None).
                    self.last_error = exc
                    tag = (
                        "RADSEC negotiation failure"
                        if "No common RADIUS protocol" in str(exc)
                        else "RADSEC packet error"
                    )
                    logger.error("{}: {}", tag, exc)
                    await self.close()
                    return None
                except retryable_errors as exc:
                    self.last_error = exc
                    logger.warning(
                        "RADSEC request attempt {}/{} failed: {}",
                        attempt + 1,
                        attempts,
                        exc,
                    )
                    await self.close()

                if attempt + 1 < attempts and self.reconnect_backoff > 0:
                    await asyncio.sleep(self.reconnect_backoff)

        return None

    async def send_packet(self, packet: PacketImplementation) -> Optional[Packet]:
        """Send a packet to a RADIUS server.

        Args:
            packet (Packet): The packet to send
        """
        if isinstance(packet, AuthPacket):
            if packet.auth_type == "eap-md5":
                eap.inject_eap_identity(packet)
            reply = await self._send_packet(packet)
            if (
                reply
                and reply.code == PacketType.AccessChallenge
                and packet.auth_type == "eap-md5"
            ):
                eap.apply_eap_md5_challenge(packet, reply)
                reply = await self._send_packet(packet)
            return reply
        elif isinstance(packet, CoAPacket):
            return await self._send_packet(packet)
        else:
            return await self._send_packet(packet)

__init__(server='127.0.0.1', port=2083, secret=b'radsec', dict=None, retries=3, timeout=5, certfile='certs/client/client.cert.pem', keyfile='certs/client/client.key.pem', certfile_server='certs//ca/ca.cert.pem', check_hostname=True, minimum_tls_version=DEFAULT_MINIMUM_TLS_VERSION, ciphers=None, allowed_server_fingerprints=None, reuse_connection=True, reconnect_backoff=0.25, radius_versions=(RadiusVersion.V1_0,))

Initializes a RadSec client.

Parameters:

Name Type Description Default
server str

IP address to connect to.

'127.0.0.1'
port int

RadSec port, defaults to 2083.

2083
secret bytes

Secret. Defaults to radsec as per RFC 6614. Different implementations support setting an arbitrary shared secret but if you want to stick to the RFC, the shared secret must be radsec.

b'radsec'
dict Dictionary

RADIUS dictionary to use.

None
certfile str

Path to client SSL certificate

'certs/client/client.cert.pem'
keyfile str

Path to client SSL certificate

'certs/client/client.key.pem'
certfile_server str

Path to server SSL certificate

'certs//ca/ca.cert.pem'
check_hostname bool

Validate the server certificate name.

True
minimum_tls_version TLSVersion

Lowest TLS version to negotiate.

DEFAULT_MINIMUM_TLS_VERSION
ciphers str

Optional OpenSSL cipher string override.

None
allowed_server_fingerprints Iterable[str]

Optional SHA-256 certificate fingerprint allowlist for the server certificate.

None
reuse_connection bool

Reuse the TLS connection for multiple packets.

True
reconnect_backoff float

Seconds to wait before retrying after a connection or read failure.

0.25
radius_versions Sequence[RadiusVersion]

RFC 9765 protocol versions to advertise via ALPN. Defaults to (V1_0,) — identical handshake behavior to historic RadSec. Pass (V1_0, V1_1) to advertise both; the server picks the highest mutually supported version. Experimental.

(V1_0,)
Source code in pyrad2/radsec/client.py
def __init__(
    self,
    server: str = "127.0.0.1",
    port: int = 2083,
    secret: bytes = b"radsec",
    dict=None,
    retries: int = 3,
    timeout: int = 5,
    certfile: str = "certs/client/client.cert.pem",
    keyfile: str = "certs/client/client.key.pem",
    certfile_server: str = "certs//ca/ca.cert.pem",
    check_hostname: bool = True,
    minimum_tls_version: ssl.TLSVersion = DEFAULT_MINIMUM_TLS_VERSION,
    ciphers: Optional[str] = None,
    allowed_server_fingerprints: Optional[Iterable[str]] = None,
    reuse_connection: bool = True,
    reconnect_backoff: float = 0.25,
    radius_versions: Sequence[RadiusVersion] = (RadiusVersion.V1_0,),
):
    """Initializes a RadSec client.

    Args:
        server (str): IP address to connect to.
        port (int): RadSec port, defaults to 2083.
        secret (bytes): Secret. Defaults to radsec as per RFC 6614.
            Different implementations support setting an arbitrary
            shared secret but if you want to stick to the RFC,
            the shared secret must be `radsec`.
        dict (Dictionary): RADIUS dictionary to use.
        certfile (str): Path to client SSL certificate
        keyfile (str): Path to client SSL certificate
        certfile_server (str): Path to server SSL certificate
        check_hostname (bool): Validate the server certificate name.
        minimum_tls_version (ssl.TLSVersion): Lowest TLS version to negotiate.
        ciphers (str): Optional OpenSSL cipher string override.
        allowed_server_fingerprints (Iterable[str]): Optional SHA-256 certificate
            fingerprint allowlist for the server certificate.
        reuse_connection (bool): Reuse the TLS connection for multiple packets.
        reconnect_backoff (float): Seconds to wait before retrying after a
            connection or read failure.
        radius_versions (Sequence[RadiusVersion]): RFC 9765 protocol
            versions to advertise via ALPN. Defaults to ``(V1_0,)`` —
            identical handshake behavior to historic RadSec. Pass
            ``(V1_0, V1_1)`` to advertise both; the server picks the
            highest mutually supported version. **Experimental.**

    """
    self.server = server
    self.port = port
    self.secret = secret
    self.retries = retries
    self.timeout = timeout
    self.dict = dict
    self.reuse_connection = reuse_connection
    self.reconnect_backoff = reconnect_backoff
    self._reader: asyncio.StreamReader | None = None
    self._writer: asyncio.StreamWriter | None = None
    self._io_lock = asyncio.Lock()

    self.allowed_server_fingerprints = {
        normalize_cert_fingerprint(fingerprint)
        for fingerprint in (allowed_server_fingerprints or [])
    }
    self.radius_versions: tuple[RadiusVersion, ...] = tuple(radius_versions)
    if not self.radius_versions:
        raise ValueError("radius_versions must contain at least one entry")
    # RFC 9765 §3.4: RADIUS/1.1 requires TLS 1.3+. Auto-promote the
    # configured floor when v1.1 is advertised.
    minimum_tls_version = enforce_tls_version_floor(
        minimum_tls_version, self.radius_versions
    )
    # Negotiated post-handshake. _token_counter is only meaningful for v1.1.
    self._negotiated_version: RadiusVersion = RadiusVersion.V1_0
    self._token_counter: TokenCounter | None = None
    # Last fatal error from send_packet, exposed so callers can tell a
    # strict-mode negotiation refusal apart from a normal timeout/no-reply
    # (both currently surface as ``send_packet`` returning ``None``).
    # Cleared at the start of each send_packet call.
    self.last_error: Exception | None = None

    self.setup_ssl(
        certfile,
        keyfile,
        certfile_server,
        check_hostname,
        minimum_tls_version,
        ciphers,
    )

close() async

Close any reusable RadSec connection held by the client.

Source code in pyrad2/radsec/client.py
async def close(self) -> None:
    """Close any reusable RadSec connection held by the client."""
    writer = self._writer
    self._reader = None
    self._writer = None
    # Negotiated version + Token counter are per-connection; clear them.
    self._negotiated_version = RadiusVersion.V1_0
    self._token_counter = None
    await self._close_writer(writer)

__aenter__() async

Return this client for use as an async context manager.

Source code in pyrad2/radsec/client.py
async def __aenter__(self) -> "RadSecClient":
    """Return this client for use as an async context manager."""
    return self

__aexit__(exc_type, exc, traceback) async

Close the reusable RadSec connection when leaving a context manager.

Source code in pyrad2/radsec/client.py
async def __aexit__(self, exc_type, exc, traceback) -> None:
    """Close the reusable RadSec connection when leaving a context manager."""
    await self.close()

create_auth_packet(**kwargs)

Create a new RADIUS packet. This utility function creates a new RADIUS packet which can be used to communicate with the RADIUS server this client talks to. This is initializing the new packet with the dictionary and secret used for the client.

Returns:

Name Type Description
Packet AuthPacket

A new AuthPacket instance

Source code in pyrad2/radsec/client.py
def create_auth_packet(self, **kwargs) -> AuthPacket:
    """Create a new RADIUS packet.
    This utility function creates a new RADIUS packet which can
    be used to communicate with the RADIUS server this client
    talks to. This is initializing the new packet with the
    dictionary and secret used for the client.

    Returns:
        Packet: A new AuthPacket instance
    """
    id = kwargs.pop("id", Packet.create_id())
    return AuthPacket(
        dict=self.dict,
        id=id,
        secret=self.secret,
        **kwargs,
    )

create_acct_packet(**kwargs)

Create a new RADIUS packet. This utility function creates a new RADIUS packet which can be used to communicate with the RADIUS server this client talks to. This is initializing the new packet with the dictionary and secret used for the client.

Returns:

Name Type Description
Packet AcctPacket

A new AcctPacket instance

Source code in pyrad2/radsec/client.py
def create_acct_packet(self, **kwargs) -> AcctPacket:
    """Create a new RADIUS packet.
    This utility function creates a new RADIUS packet which can
    be used to communicate with the RADIUS server this client
    talks to. This is initializing the new packet with the
    dictionary and secret used for the client.

    Returns:
        Packet: A new AcctPacket instance
    """
    id = kwargs.pop("id", Packet.create_id())
    return AcctPacket(
        id=id,
        dict=self.dict,
        secret=self.secret,
        **kwargs,
    )

create_coa_packet(**kwargs)

Create a new RADIUS packet. This utility function creates a new RADIUS packet which can be used to communicate with the RADIUS server this client talks to. This is initializing the new packet with the dictionary and secret used for the client.

Returns:

Name Type Description
Packet CoAPacket

A new CoA packet instance

Source code in pyrad2/radsec/client.py
def create_coa_packet(self, **kwargs) -> CoAPacket:
    """Create a new RADIUS packet.
    This utility function creates a new RADIUS packet which can
    be used to communicate with the RADIUS server this client
    talks to. This is initializing the new packet with the
    dictionary and secret used for the client.

    Returns:
        Packet: A new CoA packet instance
    """
    id = kwargs.pop("id", Packet.create_id())
    return CoAPacket(id=id, dict=self.dict, secret=self.secret, **kwargs)

create_status_packet(**kwargs)

Create an RFC 5997 Status-Server health-check packet.

Source code in pyrad2/radsec/client.py
def create_status_packet(self, **kwargs) -> StatusPacket:
    """Create an RFC 5997 Status-Server health-check packet."""
    id = kwargs.pop("id", Packet.create_id())
    return StatusPacket(id=id, dict=self.dict, secret=self.secret, **kwargs)

create_packet(id, **kwargs)

Create a generic RADIUS packet with this client's dictionary and secret.

Source code in pyrad2/radsec/client.py
def create_packet(self, id, **kwargs) -> Packet:
    """Create a generic RADIUS packet with this client's dictionary and secret."""
    return Packet(id=id, dict=self.dict, secret=self.secret, **kwargs)

send_packet(packet) async

Send a packet to a RADIUS server.

Parameters:

Name Type Description Default
packet Packet

The packet to send

required
Source code in pyrad2/radsec/client.py
async def send_packet(self, packet: PacketImplementation) -> Optional[Packet]:
    """Send a packet to a RADIUS server.

    Args:
        packet (Packet): The packet to send
    """
    if isinstance(packet, AuthPacket):
        if packet.auth_type == "eap-md5":
            eap.inject_eap_identity(packet)
        reply = await self._send_packet(packet)
        if (
            reply
            and reply.code == PacketType.AccessChallenge
            and packet.auth_type == "eap-md5"
        ):
            eap.apply_eap_md5_challenge(packet, reply)
            reply = await self._send_packet(packet)
        return reply
    elif isinstance(packet, CoAPacket):
        return await self._send_packet(packet)
    else:
        return await self._send_packet(packet)