Source code for ua_parser.caching

from __future__ import annotations

import abc
import dataclasses
import threading
from collections import OrderedDict, deque
from contextvars import ContextVar
from typing import (
    Callable,
    Deque,
    Dict,
    Optional,
    Protocol,
    Union,
)

from .core import Domain, PartialResult, Resolver

__all__ = [
    "Cache",
    "CachingResolver",
    "Lru",
    "S3Fifo",
    "Sieve",
]


[docs] class Cache(Protocol): """Cache() Cache abstract protocol. The :class:`CachingResolver` will look values up, merge what was returned (possibly nothing) with what it got from its actual parser, and *re-set the result*. """
[docs] @abc.abstractmethod def __setitem__(self, key: str, value: PartialResult) -> None: """Adds or replace ``value`` to the cache at key ``key``.""" ...
[docs] @abc.abstractmethod def __getitem__(self, key: str) -> Optional[PartialResult]: """Returns a partial result for ``key`` if there is any.""" ...
[docs] class Lru: """Cache following a least-recently used replacement policy: when there is no more room in the cache, whichever entry was last seen the least recently is removed. Simple LRUs are generally outdated and to avoid as they have relatively low hit rates for modern caches (at lower sizes). The main use case here is if the workload can lead to the cache being full of popular items then all of them being replaced at once: :class:`S3Fifo` and :class:`Sieve` are FIFO-based caches and have worst-case O(n) eviction. """ def __init__(self, maxsize: int): self.maxsize = maxsize self.cache: OrderedDict[str, PartialResult] = OrderedDict() self.lock = threading.Lock() def __getitem__(self, key: str) -> Optional[PartialResult]: with self.lock: e = self.cache.get(key) if e: self.cache.move_to_end(key) return e def __setitem__(self, key: str, value: PartialResult) -> None: with self.lock: if len(self.cache) >= self.maxsize and key not in self.cache: self.cache.popitem(last=False) self.cache[key] = value
@dataclasses.dataclass class CacheEntry: __slots__ = ["freq", "key", "value"] key: str value: PartialResult freq: int
[docs] class S3Fifo: """FIFO-based quick-demotion lazy-promotion cache [S3-FIFO]_. Experimentally provides excellent hit rate at lower cache sizes, for a relatively simple and efficient implementation. Notably excellent at handling "one hit wonders", aka entries seen only once during a work-set (or reasonable work window). """ def __init__(self, maxsize: int): self.maxsize = maxsize self.index: Dict[str, Union[CacheEntry, str]] = {} self.small_target = max(1, int(maxsize / 10)) self.small: Deque[CacheEntry] = deque() self.main_target = maxsize - self.small_target self.main: Deque[CacheEntry] = deque() self.ghost: Deque[str] = deque() self.lock = threading.Lock() def __getitem__(self, key: str) -> Optional[PartialResult]: if (e := self.index.get(key)) and type(e) is CacheEntry: # small race here, we could miss an increment e.freq = min(e.freq + 1, 3) return e.value return None def __setitem__(self, key: str, r: PartialResult) -> None: with self.lock: if (e := self.index.get(key)) and type(e) is CacheEntry: e.value = r return if len(self.small) + len(self.main) >= self.maxsize: # if main is not overcapacity, resize small if len(self.main) < self.main_target: self._evict_small() # evict_small could have moved every entry to main, in # which case we now need to evict from main if len(self.small) + len(self.main) >= self.maxsize: self._evict_main() entry = CacheEntry(key, r, 0) if type(self.index.get(key)) is str: self.main.appendleft(entry) else: self.small.appendleft(entry) self.index[key] = entry def _evict_main(self) -> None: while True: e = self.main.pop() if e.freq: e.freq -= 1 self.main.appendleft(e) else: del self.index[e.key] return def _evict_small(self) -> None: while self.small: e = self.small.pop() if e.freq: e.freq = 0 self.main.appendleft(e) else: g = self.index[e.key] = e.key self.ghost.appendleft(g) while len(self.ghost) > self.main_target: g = self.ghost.pop() if self.index.get(g) is g: del self.index[g] return
@dataclasses.dataclass class SieveNode: __slots__ = ("key", "next", "value", "visited") key: str value: PartialResult visited: bool next: Optional[SieveNode]
[docs] class Sieve: """FIFO-based quick-demotion cache [SIEVE]_. Simpler FIFO-based cache, cousin of :class:`S3Fifo`. Experimentally slightly lower hit rates than :class:`S3Fifo` (if way superior to LRU still), but much more compact (~50% lower memory overhead at larger cache sizes, up to 100% at very small cache sizes). Can be an interesting candidate when trying to save on memory, although the contained entries will generally be much larger than the cache itself. """ def __init__(self, maxsize: int) -> None: self.maxsize = maxsize self.cache: Dict[str, SieveNode] = {} self.head: Optional[SieveNode] = None self.tail: Optional[SieveNode] = None self.hand: Optional[SieveNode] = None self.prev: Optional[SieveNode] = None self.lock = threading.Lock() def __getitem__(self, key: str) -> Optional[PartialResult]: if entry := self.cache.get(key): entry.visited = True return entry.value return None def __setitem__(self, key: str, value: PartialResult) -> None: with self.lock: if e := self.cache.get(key): e.value = value return if len(self.cache) >= self.maxsize: self._evict() node = self.cache[key] = SieveNode(key, value, False, None) if self.head: self.head.next = node self.head = node if self.tail is None: self.tail = node def _evict(self) -> None: obj: Optional[SieveNode] if self.hand: obj, pobj = self.hand, self.prev else: obj, pobj = self.tail, None while obj and obj.visited: obj.visited = False if obj.next: obj, pobj = obj.next, obj else: obj, pobj = self.tail, None if not obj: return self.hand = obj.next self.prev = pobj del self.cache[obj.key] if not obj.next: self.head = pobj if pobj: pobj.next = obj.next else: self.tail = obj.next
[docs] class Local: """Thread local cache decorator. Takes a cache factory and lazily instantiates a cache for each thread it's accessed from. This means the cache capacity and memory consumption is figuratively multiplied by however many threads the cache is used from, but those threads don't share their caching, and thus don't contend on cache use. """ def __init__(self, factory: Callable[[], Cache]) -> None: self.cv: ContextVar[Cache] = ContextVar("local-cache") self.factory = factory @property def cache(self) -> Cache: c = self.cv.get(None) if c is None: c = self.factory() self.cv.set(c) return c def __getitem__(self, key: str) -> Optional[PartialResult]: return self.cache[key] def __setitem__(self, key: str, value: PartialResult) -> None: self.cache[key] = value
[docs] class CachingResolver: """A wrapper resolver which takes an underlying concrete :class:`Cache` for the actual caching and cache strategy. This resolver only interacts with the :class:`Cache` and delegates to the wrapped resolver in case of lookup failure. :class:`CachingParser` will set entries back in the cache when filling them up, it does not update results in place (and can't really, they're immutable). """ def __init__(self, resolver: Resolver, cache: Cache): self.parser: Resolver = resolver self.cache: Cache = cache def __call__(self, ua: str, domains: Domain, /) -> PartialResult: entry = self.cache[ua] if entry: if domains in entry.domains: return entry domains &= ~entry.domains r = self.parser(ua, domains) if entry: r = PartialResult( string=ua, domains=entry.domains | r.domains, user_agent=entry.user_agent or r.user_agent, os=entry.os or r.os, device=entry.device or r.device, ) self.cache[ua] = r return r