Skip to content

Dedup

RFC 5080 §2.2.2 duplicate detection and response cache.

A RADIUS server MUST detect duplicate requests and resend the original response without re-running the handler. The duplicate key is (src IP, src UDP port, code, Identifier, Request Authenticator).

The cache here is in-memory only (RFC 5080 permits dropping state on restart). Entries are evicted on TTL expiry or when the LRU cap is hit.

DedupKey dataclass

RFC 5080 §2.2.2 duplicate-detection tuple.

Source code in pyrad2/dedup.py
@dataclass(frozen=True, slots=True)
class DedupKey:
    """RFC 5080 §2.2.2 duplicate-detection tuple."""

    src_ip: Any
    src_port: Any
    code: int
    identifier: int
    request_authenticator: bytes

DispatchAction

Bases: IntEnum

Outcome of consulting the response cache for an incoming request.

Source code in pyrad2/dedup.py
class DispatchAction(IntEnum):
    """Outcome of consulting the response cache for an incoming request."""

    PROCESS = 0  # No cache hit. Caller should run the handler.
    DROP = 1  # Duplicate of an in-flight request. Drop silently.
    RESENT = 2  # Cached reply was found and replayed by ``consult_cache``.

ResponseCache

LRU+TTL cache of reply bytes keyed by DedupKey.

Thread-safe so it can be shared between the sync server's main loop and any worker threads a subclass may use. The async server reuses the same class without contention.

Source code in pyrad2/dedup.py
class ResponseCache:
    """LRU+TTL cache of reply bytes keyed by ``DedupKey``.

    Thread-safe so it can be shared between the sync server's main loop
    and any worker threads a subclass may use. The async server reuses
    the same class without contention.
    """

    def __init__(
        self,
        ttl: float = 30.0,
        max_entries: int = 4096,
        clock: Callable[[], float] = time.monotonic,
    ) -> None:
        if ttl <= 0:
            raise ValueError("ttl must be positive")
        if max_entries <= 0:
            raise ValueError("max_entries must be positive")
        self.ttl = ttl
        self.max_entries = max_entries
        self._clock = clock
        self._lock = threading.RLock()
        self._in_flight: set[DedupKey] = set()
        # OrderedDict ordered by recency of insert/refresh — newest at end.
        self._cached: "OrderedDict[DedupKey, tuple[bytes, float]]" = OrderedDict()

    def lookup(self, key: DedupKey):
        """Return cached reply bytes, the IN_FLIGHT sentinel, or ``None``."""
        with self._lock:
            entry = self._cached.get(key)
            if entry is not None:
                raw, expires_at = entry
                if self._clock() < expires_at:
                    self._cached.move_to_end(key)
                    return raw
                del self._cached[key]
            if key in self._in_flight:
                return IN_FLIGHT
            return None

    def mark_in_flight(self, key: DedupKey) -> None:
        with self._lock:
            self._in_flight.add(key)

    def drop_in_flight(self, key: DedupKey) -> None:
        with self._lock:
            self._in_flight.discard(key)

    def record_reply(
        self, key: DedupKey, raw: bytes, ttl: Optional[float] = None
    ) -> None:
        """Atomically transition the entry from in-flight to cached."""
        if not isinstance(raw, (bytes, bytearray)):
            raise TypeError("raw must be bytes")
        expires_at = self._clock() + (self.ttl if ttl is None else ttl)
        with self._lock:
            self._in_flight.discard(key)
            self._cached[key] = (bytes(raw), expires_at)
            self._cached.move_to_end(key)
            self._evict_locked()

    def clear(self) -> None:
        with self._lock:
            self._cached.clear()
            self._in_flight.clear()

    def __len__(self) -> int:
        with self._lock:
            return len(self._cached)

    def _evict_locked(self) -> None:
        now = self._clock()
        # Drop expired entries from the front (oldest).
        while self._cached:
            key, (_, expires_at) = next(iter(self._cached.items()))
            if expires_at > now:
                break
            del self._cached[key]
        # Enforce the LRU cap.
        while len(self._cached) > self.max_entries:
            self._cached.popitem(last=False)

lookup(key)

Return cached reply bytes, the IN_FLIGHT sentinel, or None.

Source code in pyrad2/dedup.py
def lookup(self, key: DedupKey):
    """Return cached reply bytes, the IN_FLIGHT sentinel, or ``None``."""
    with self._lock:
        entry = self._cached.get(key)
        if entry is not None:
            raw, expires_at = entry
            if self._clock() < expires_at:
                self._cached.move_to_end(key)
                return raw
            del self._cached[key]
        if key in self._in_flight:
            return IN_FLIGHT
        return None

record_reply(key, raw, ttl=None)

Atomically transition the entry from in-flight to cached.

Source code in pyrad2/dedup.py
def record_reply(
    self, key: DedupKey, raw: bytes, ttl: Optional[float] = None
) -> None:
    """Atomically transition the entry from in-flight to cached."""
    if not isinstance(raw, (bytes, bytearray)):
        raise TypeError("raw must be bytes")
    expires_at = self._clock() + (self.ttl if ttl is None else ttl)
    with self._lock:
        self._in_flight.discard(key)
        self._cached[key] = (bytes(raw), expires_at)
        self._cached.move_to_end(key)
        self._evict_locked()

key_for(pkt, source=None)

Build the RFC 5080 dedup key for pkt or return None.

source defaults to pkt.source; pass it explicitly for the async server which keeps the address alongside the packet rather than on it.

Returns None for packet shapes that don't carry the fields we need (e.g. unit-test stand-ins) or for codes the spec excludes from dedup.

Source code in pyrad2/dedup.py
def key_for(pkt: Any, source: Any = None) -> Optional[DedupKey]:
    """Build the RFC 5080 dedup key for ``pkt`` or return ``None``.

    ``source`` defaults to ``pkt.source``; pass it explicitly for the
    async server which keeps the address alongside the packet rather
    than on it.

    Returns ``None`` for packet shapes that don't carry the fields we need
    (e.g. unit-test stand-ins) or for codes the spec excludes from dedup.
    """
    code = getattr(pkt, "code", None)
    if code is None or int(code) not in _DEDUPABLE_CODES:
        return None
    src = source if source is not None else getattr(pkt, "source", None)
    if not src or len(src) < 2:
        return None
    # RFC 9765 §4.1: in RADIUS/1.1 the Request Authenticator is replaced
    # by a 4-byte Token. Dedup keys on whichever field carries the
    # client-chosen correlator: token first (v1.1), authenticator
    # otherwise (v1.0).
    correlator = getattr(pkt, "token", None) or getattr(pkt, "authenticator", None)
    if not correlator:
        return None
    ident = getattr(pkt, "id", None)
    if ident is None:
        return None
    return DedupKey(src[0], src[1], int(code), int(ident), bytes(correlator))

consult_cache(cache, key, resend)

Single point of policy for the dedup state machine.

Returns one of:

  • PROCESS if the caller should run the handler. The key is marked in-flight before returning, so retries that arrive while the handler is still running are dropped.
  • DROP if a duplicate arrived while the original is in-flight.
  • RESENT if a cached reply was found; resend(raw_bytes) has already been invoked.
Source code in pyrad2/dedup.py
def consult_cache(
    cache: Optional[ResponseCache],
    key: Optional[DedupKey],
    resend: Callable[[bytes], None],
) -> DispatchAction:
    """Single point of policy for the dedup state machine.

    Returns one of:

    - ``PROCESS`` if the caller should run the handler. The key is marked
      in-flight before returning, so retries that arrive while the
      handler is still running are dropped.
    - ``DROP`` if a duplicate arrived while the original is in-flight.
    - ``RESENT`` if a cached reply was found; ``resend(raw_bytes)`` has
      already been invoked.
    """
    if cache is None or key is None:
        return DispatchAction.PROCESS
    entry = cache.lookup(key)
    if entry is IN_FLIGHT:
        return DispatchAction.DROP
    if entry is not None:
        resend(entry)  # type: ignore[arg-type]
        return DispatchAction.RESENT
    cache.mark_in_flight(key)
    return DispatchAction.PROCESS

record_if_keyed(cache, reply, raw)

Cache raw if the reply carries a dedup key from its request.

Source code in pyrad2/dedup.py
def record_if_keyed(
    cache: Optional[ResponseCache], reply: Any, raw: bytes
) -> None:
    """Cache ``raw`` if the reply carries a dedup key from its request."""
    key = getattr(reply, "_dedup_key", None)
    if key is not None and cache is not None:
        cache.record_reply(key, raw)