Mode Wire Format KAFKA_HEADERS
Le mode WireFormat.KAFKA_HEADERS permet de produire des messages Avro où
l'identifiant de schema est transporté dans les en-têtes du message Kafka plutôt
qu'intégré dans les octets du message. C'est un pattern de déploiement important
dans les configurations Apicurio Registry qui préfèrent des messages au contenu
propre avec une identification de schema hors-bande.
Contrairement au mode CONFLUENT_PAYLOAD par défaut (qui préfixe un octet magique
et un identifiant de schema sur 4 octets au payload), KAFKA_HEADERS produit du
binaire Avro brut comme corps du message et communique l'identifiant de schema via
un en-tête Kafka dédié.
Configuration
Aucune dépendance supplémentaire n'est requise. Toutes les classes nécessaires font partie de la bibliothèque principale.
from apicurio_serdes import ApicurioRegistryClient, WireFormat
from apicurio_serdes.avro import AvroSerializer
from apicurio_serdes.serialization import SerializationContext, MessageField
Créez un client registry comme d'habitude :
client = ApicurioRegistryClient(
url="http://registry:8080/apis/registry/v3",
group_id="com.example.schemas",
)
Utilisation
Sérialisation avec KAFKA_HEADERS
Passez wire_format=WireFormat.KAFKA_HEADERS lors de la construction du sérialiseur,
puis utilisez la méthode serialize() pour obtenir à la fois les octets du payload et
les en-têtes Kafka :
serializer = AvroSerializer(
registry_client=client,
artifact_id="UserEvent",
wire_format=WireFormat.KAFKA_HEADERS,
)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)
result = serializer.serialize({"userId": "abc", "country": "FR"}, ctx)
# result.payload -> binaire Avro brut (sans octet magique, sans préfixe d'ID de schema)
# result.headers -> {"apicurio.value.globalId": b"\x00\x00\x00\x00\x00\x00\x00\x01"}
# Passez les deux à votre producteur Kafka :
producer.produce(
topic=ctx.topic,
value=result.payload,
headers=list(result.headers.items()),
)
La méthode serialize() retourne un dataclass SerializedMessage avec deux champs :
payload(bytes) : Les données Avro binaires brutes — sans préfixe de cadrage.headers(dict[str, bytes]) : Un dict à une seule entrée avec l'en-tête d'identifiant de schema.
Sérialisation d'une CLE Kafka
Lors de la sérialisation d'une clé de message, utilisez MessageField.KEY dans le
contexte de sérialisation. Le nom de l'en-tête utilise automatiquement le préfixe
key :
serializer = AvroSerializer(
registry_client=client,
artifact_id="UserKey",
wire_format=WireFormat.KAFKA_HEADERS,
)
ctx = SerializationContext(topic="user-events", field=MessageField.KEY)
result = serializer.serialize({"userId": "abc"}, ctx)
# La clé de result.headers est "apicurio.key.globalId"
Utiliser contentId au lieu de globalId
Par défaut, l'en-tête transporte le globalId. Passez use_id="contentId" pour
utiliser l'identifiant de contenu à la place :
serializer = AvroSerializer(
registry_client=client,
artifact_id="UserEvent",
use_id="contentId",
wire_format=WireFormat.KAFKA_HEADERS,
)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)
result = serializer.serialize({"userId": "abc", "country": "FR"}, ctx)
# La clé de result.headers est "apicurio.value.contentId"
Le défaut CONFLUENT_PAYLOAD reste inchangé
Le code existant continue de fonctionner sans modification. Le wire format par défaut
reste WireFormat.CONFLUENT_PAYLOAD :
# Sans argument wire_format -- utilise CONFLUENT_PAYLOAD par défaut
serializer = AvroSerializer(
registry_client=client,
artifact_id="UserEvent",
)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)
payload = serializer({"userId": "abc", "country": "FR"}, ctx)
# payload[0:1] == b"\x00" (octet magique)
# payload[1:5] (ID de schema big-endian sur 4 octets)
# payload[5:] (données Avro binaires)
Passer wire_format=WireFormat.CONFLUENT_PAYLOAD explicitement produit une sortie
identique. La méthode serialize() fonctionne pour les deux modes — pour
CONFLUENT_PAYLOAD, result.headers est toujours un dict vide :
result = serializer.serialize({"userId": "abc", "country": "FR"}, ctx)
# result.payload == serializer({"userId": "abc", "country": "FR"}, ctx)
# result.headers == {}
Référence du format d'en-tête
Le nom de l'en-tête suit la convention de nommage native d'Apicurio Registry, combinant le type de champ du message et le type d'identifiant :
| MessageField | use_id | Nom de l'en-tête | Encodage de la valeur de l'en-tête |
|---|---|---|---|
| VALUE | "globalId" |
apicurio.value.globalId |
struct.pack(">q", global_id) — 8 octets |
| VALUE | "contentId" |
apicurio.value.contentId |
struct.pack(">q", content_id) — 8 octets |
| KEY | "globalId" |
apicurio.key.globalId |
struct.pack(">q", global_id) — 8 octets |
| KEY | "contentId" |
apicurio.key.contentId |
struct.pack(">q", content_id) — 8 octets |
Encodage de la valeur de l'en-tête
L'identifiant de schema est encodé comme un entier signé big-endian sur 8 octets
(struct.pack(">q", schema_id)). Cela correspond à l'encodage utilisé par le serde
Java KAFKA_HEADERS natif d'Apicurio Registry, assurant l'interopérabilité au niveau
des octets.
import struct
# Encodage (Python -> valeur d'en-tête Kafka)
header_value = struct.pack(">q", schema_id) # 8 octets
# Décodage (valeur d'en-tête Kafka -> Python)
(schema_id,) = struct.unpack(">q", header_value)
# Équivalent Java : ByteBuffer.wrap(headerBytes).getLong()
Mise en cache des schemas
La mise en cache des schemas est entièrement préservée en mode KAFKA_HEADERS. La clé
de cache est (group_id, artifact_id), identique au mode CONFLUENT_PAYLOAD. Quel que
soit le nombre de messages ou le paramètre de wire format, seul un appel HTTP est
effectué vers le registry par artifact unique :
# 1 appel HTTP quel que soit le nombre de messages ou le wire_format
for record in records:
result = serializer.serialize(record, ctx)
Pour aller plus loin
- Exemples de démarrage rapide :
specs/004-kafka-headers-wire-format/quickstart.md - Spécification de la fonctionnalité :
specs/004-kafka-headers-wire-format/spec.md - Référence API : générée automatiquement depuis les docstrings dans
apicurio_serdes.serializationetapicurio_serdes.avro