Avro Deserializer
AvroDeserializer lit des octets au format Confluent wire format, résout l'identifiant de schema intégré auprès d'Apicurio Registry, et retourne un dict Python (ou un objet du domaine si un hook from_dict est configuré).
Utilisation de base
from apicurio_serdes import ApicurioRegistryClient
from apicurio_serdes.avro import AvroDeserializer
from apicurio_serdes.serialization import SerializationContext, MessageField
client = ApicurioRegistryClient(
url="http://registry:8080/apis/registry/v3",
group_id="com.example.schemas",
)
deserializer = AvroDeserializer(client)
ctx = SerializationContext("my-topic", MessageField.VALUE)
record = deserializer(kafka_message.value(), ctx)
# record est un dict Python : {"userId": "abc123", "country": "FR"}
Paramètres
| Paramètre | Type | Défaut | Description |
|---|---|---|---|
registry_client |
ApicurioRegistryClient |
obligatoire | Le client registry utilisé pour résoudre les identifiants de schema. |
from_dict |
callable | None |
Transformation optionnelle (dict, ctx) -> Any appliquée après le décodage. |
use_id |
"contentId" ou "globalId" |
"globalId" |
Comment interpréter l'identifiant de schema de 4 octets dans l'en-tête du wire format. |
artifact_id |
str |
None |
Identifiant d'artifact statique utilisé pour récupérer le schema le plus récent du registry comme schema lecteur. Requiert use_latest_version=True. Mutuellement exclusif avec artifact_resolver. |
artifact_resolver |
callable | None |
Callable (ctx) -> str qui dérive l'identifiant d'artifact à l'appel. Requiert use_latest_version=True. Mutuellement exclusif avec artifact_id. |
use_latest_version |
bool |
False |
Quand True, récupère le schema le plus récent du registry pour l'artifact résolu lors du premier appel et l'utilise comme schema lecteur (mis en cache par instance). Requiert artifact_id ou artifact_resolver. Mutuellement exclusif avec reader_schema. |
reader_schema |
dict |
None |
Schema Avro optionnel utilisé comme schema lecteur. Quand il est fourni, fastavro effectue une résolution de schema entre le schema d'écriture (issu du message) et ce schema, permettant les ajouts de champs avec valeurs par défaut, les promotions de type et les renommages par alias. Parsé une seule fois à la construction. Mutuellement exclusif avec use_latest_version. |
Mode d'identifiant de schema (use_id)
Le format Confluent wire format stocke un identifiant de schema de 4 octets aux octets 1–4. Le paramètre use_id contrôle comment cet entier est résolu auprès du registry :
use_id |
Endpoint du registry |
|---|---|
"globalId" (défaut) |
GET /ids/globalIds/{id} |
"contentId" |
GET /ids/contentIds/{id} |
Important : use_id doit correspondre à la valeur utilisée par le producteur. Si le producteur a intégré un globalId, le désérialiseur doit utiliser use_id="globalId".
Aller-retour avec AvroSerializer
from apicurio_serdes.avro import AvroSerializer, AvroDeserializer
serializer = AvroSerializer(client, "UserEvent", use_id="contentId")
deserializer = AvroDeserializer(client, use_id="contentId")
ctx = SerializationContext("events", MessageField.VALUE)
data = {"userId": "abc123", "country": "FR"}
payload = serializer(data, ctx)
result = deserializer(payload, ctx)
assert result == data
Transformation personnalisée (from_dict)
Passez un callable from_dict pour convertir le dict décodé en objet du domaine :
from dataclasses import dataclass
@dataclass
class UserEvent:
userId: str
country: str
def from_dict(d: dict, ctx: SerializationContext) -> UserEvent:
return UserEvent(userId=d["userId"], country=d["country"])
deserializer = AvroDeserializer(client, from_dict=from_dict)
event = deserializer(payload, ctx)
# event est une instance de UserEvent, pas un dict brut
Quand from_dict est None (valeur par défaut), le dict décodé est retourné directement.
Évolution de schema (reader_schema)
Par défaut, le désérialiseur utilise le schema d'écriture pour les deux rôles — le schema
intégré dans le message. Si votre consommateur est sur une version de schema différente,
passez reader_schema et fastavro gère la résolution :
writer_schema = {
# C'est ce que le producteur a utilisé — il est intégré dans le message,
# pas passé au désérialiseur.
"type": "record",
"name": "UserEvent",
"namespace": "com.example",
"fields": [
{"name": "userId", "type": "string"},
{"name": "country", "type": "string"},
],
}
reader_schema = {
"type": "record",
"name": "UserEvent",
"namespace": "com.example",
"fields": [
{"name": "userId", "type": "string"},
{"name": "country", "type": "string"},
# Nouveau champ ajouté côté consommateur — la valeur par défaut comble l'écart
{"name": "region", "type": ["null", "string"], "default": None},
],
}
deserializer = AvroDeserializer(client, reader_schema=reader_schema)
# Les messages écrits avec l'ancien writer_schema décodent correctement ;
# "region" revient à None.
Quelques points à garder en tête lors de la résolution :
- Les champs attendus par le lecteur mais omis par l'écrivain sont remplis avec leur valeur par défaut ; sans valeur par défaut, le décodage échoue.
- Les champs de l'écrivain ignorés par le lecteur sont abandonnés silencieusement.
- Les promotions de type suivent les règles Avro :
int → long → float → double,string → bytes, etc. - Les alias de champs du schema lecteur résolvent les noms de champs de l'écrivain.
Si les schemas sont incompatibles — par exemple, un nouveau champ obligatoire sans valeur par
défaut — fastavro lève une ValueError encapsulée dans DeserializationError.
Schema lecteur dynamique (use_latest_version)
Lorsque le schema côté consommateur évolue fréquemment, maintenir un reader_schema
statique dans le code peut devenir contraignant. use_latest_version=True délègue la
résolution du schema au registry : le désérialiseur récupère la dernière version de
l'artifact donné lors du premier appel et l'utilise comme schema lecteur, mis en cache
pour toute la durée de vie de l'instance.
deserializer = AvroDeserializer(
client,
artifact_id="UserEvent",
use_latest_version=True,
)
ctx = SerializationContext("user-events", MessageField.VALUE)
# Au premier appel, le désérialiseur récupère le schema "UserEvent" le plus
# récent depuis le registry et l'utilise comme schema lecteur. Les appels
# suivants utilisent la version mise en cache — aucune requête HTTP supplémentaire.
record = deserializer(payload, ctx)
Vous pouvez également utiliser artifact_resolver à la place d'un artifact_id statique :
from apicurio_serdes.avro import SimpleTopicIdStrategy
deserializer = AvroDeserializer(
client,
artifact_resolver=SimpleTopicIdStrategy(),
use_latest_version=True,
)
Contraintes :
- Requiert exactement l'un de
artifact_idouartifact_resolver. - Mutuellement exclusif avec le paramètre statique
reader_schema. - Fournir
artifact_idouartifact_resolversansuse_latest_version=Truelève uneValueError.
Mise en cache du schema
Les schemas résolus lors de la désérialisation sont mis en cache après la première requête au registry. Les désérialisations répétées de messages partageant le même identifiant de schema n'entraînent qu'un seul appel HTTP au registry. Le cache est thread-safe et partagé entre tous les désérialiseurs utilisant la même instance d'ApicurioRegistryClient.
Gestion des erreurs
from apicurio_serdes._errors import (
DeserializationError,
RegistryConnectionError,
SchemaNotFoundError,
)
try:
result = deserializer(payload, ctx)
except DeserializationError as e:
# Magic byte invalide, trop peu d'octets, échec de décodage Avro, ou échec de from_dict
print(f"Désérialisation échouée : {e}")
if e.__cause__:
print(f"Causé par : {e.__cause__}")
except SchemaNotFoundError as e:
# L'identifiant de schema du wire format est introuvable dans le registry
print(f"Schema inconnu : {e.id_type}={e.id_value}")
except RegistryConnectionError as e:
# Registry inaccessible lors de la résolution du schema
print(f"Le registry à {e.url} est inaccessible : {e}")
Référence des exceptions
| Exception | Quand |
|---|---|
DeserializationError |
Entrée de moins de 5 octets, magic byte ≠ 0x00, payload Avro impossible à décoder, ou from_dict a levé une exception |
SchemaNotFoundError |
L'identifiant de schema du wire format est introuvable dans le registry (HTTP 404) |
RegistryConnectionError |
Registry inaccessible lors de la résolution du schema |