Aller au contenu

Client Registry Asynchrone

AsyncApicurioRegistryClient est l'équivalent asynchrone d'ApicurioRegistryClient. Il utilise httpx.AsyncClient pour une communication HTTP non bloquante avec l'API Apicurio Registry v3.

Utilisation de base

from apicurio_serdes import AsyncApicurioRegistryClient

client = AsyncApicurioRegistryClient(
    url="http://registry:8080/apis/registry/v3",
    group_id="my-group",
)

cached = await client.get_schema("UserEvent")
print(cached.schema)      # Dict de schema Avro parsé
print(cached.global_id)   # globalId du registry
print(cached.content_id)  # contentId du registry

Gestionnaire de contexte

Utilisez async with pour vous assurer que le pool de connexions HTTP sous-jacent est fermé lorsque vous avez terminé :

async with AsyncApicurioRegistryClient(
    url="http://registry:8080/apis/registry/v3",
    group_id="my-group",
) as client:
    cached = await client.get_schema("UserEvent")
# Le pool de connexions est fermé ici

Si vous n'utilisez pas de gestionnaire de contexte, appelez explicitement aclose() :

client = AsyncApicurioRegistryClient(
    url="http://registry:8080/apis/registry/v3",
    group_id="my-group",
)
try:
    cached = await client.get_schema("UserEvent")
finally:
    await client.aclose()

Intégration avec le lifespan FastAPI

from contextlib import asynccontextmanager

from fastapi import FastAPI

from apicurio_serdes import AsyncApicurioRegistryClient


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with AsyncApicurioRegistryClient(
        url="http://registry:8080/apis/registry/v3",
        group_id="my-group",
    ) as client:
        app.state.registry = client
        yield


app = FastAPI(lifespan=lifespan)


@app.post("/produce")
async def produce(request):
    client = request.app.state.registry
    cached = await client.get_schema("UserEvent")
    # Utiliser cached.schema pour la sérialisation...

Comparaison Sync vs Async

Fonctionnalité Sync Async
Classe ApicurioRegistryClient AsyncApicurioRegistryClient
Récupération de schema client.get_schema(id) await client.get_schema(id)
Enregistrement de schema client.register_schema(id, schema) await client.register_schema(id, schema)
Type de retour CachedSchema CachedSchema (même classe)
Paramètres du constructeur url, group_id, max_retries, retry_backoff_ms, retry_max_backoff_ms, http_client, auth identiques
Sécurité du cache threading.RLock asyncio.Lock
Nettoyage (GC automatique) async with ou await client.aclose()
Erreurs SchemaNotFoundError, RegistryConnectionError Mêmes types d'erreurs

Enregistrement de schemas

register_schema publie un nouvel artifact de schema dans le registry. Le résultat est mis en cache, de sorte qu'un appel ultérieur à get_schema pour le même artifact_id est toujours un succès de cache sans requête HTTP supplémentaire :

cached = await client.register_schema(
    "UserEvent",
    {
        "type": "record",
        "name": "UserEvent",
        "namespace": "com.example",
        "fields": [
            {"name": "userId", "type": "string"},
            {"name": "country", "type": "string"},
        ],
    },
    if_exists="FIND_OR_CREATE_VERSION",  # défaut — retourne la version existante ou en crée une nouvelle
)
print(cached.global_id)   # globalId assigné par le registry
print(cached.content_id)  # contentId assigné par le registry

Le paramètre if_exists accepte "FAIL", "FIND_OR_CREATE_VERSION" (défaut), ou "CREATE_VERSION". SchemaRegistrationError est levée quand le registry retourne une réponse 4xx ou 5xx.

Gestion des erreurs

Le client asynchrone lève les mêmes types d'erreurs que le client synchrone :

from apicurio_serdes._errors import (
    RegistryConnectionError,
    SchemaNotFoundError,
)

try:
    cached = await client.get_schema("NonExistent")
except SchemaNotFoundError as e:
    print(e.group_id)     # "my-group"
    print(e.artifact_id)  # "NonExistent"
except RegistryConnectionError as e:
    print(e.url)          # URL du registry injoignable

Mise en cache

Les schemas sont mis en cache après la première récupération. Les appels suivants pour le même artifact_id retournent le résultat en cache sans contacter le registry. Les coroutines concurrentes demandant le même schema non mis en cache produiront exactement une seule requête HTTP (prévention des stampedes via asyncio.Lock).