Référence API
Référence complète de toutes les classes, méthodes et exceptions publiques
d'apicurio-serdes.
Noyau
ApicurioRegistryClient
apicurio_serdes._client.ApicurioRegistryClient
Bases: _RegistryClientBase
HTTP client for the Apicurio Registry v3 native API.
Handles schema retrieval by group_id / artifact_id with built-in caching and automatic retry on transient failures. Thread-safe for read operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Base URL of the Apicurio Registry v3 API.
Example: |
required |
group_id
|
str
|
Schema group identifier. Applied to every schema lookup made by this client instance. |
required |
max_retries
|
int
|
Maximum number of retry attempts on transient failures (transport errors and HTTP 429/502/503/504). Defaults to 3. Set to 0 to disable retries. |
3
|
retry_backoff_ms
|
int
|
Base backoff delay in milliseconds for the first retry. Subsequent retries use exponential backoff with full jitter. Defaults to 1000. |
1000
|
retry_max_backoff_ms
|
int
|
Maximum backoff delay cap in milliseconds. Defaults to 20000. |
20000
|
http_client
|
Client | None
|
Optional pre-configured |
None
|
auth
|
Any
|
Optional httpx-compatible authentication handler (e.g.
|
None
|
cache_max_size
|
int
|
Maximum number of entries in each cache (LRU eviction). Applies to both the schema cache and the ID cache. Defaults to 1000. |
1000
|
cache_ttl_seconds
|
float | None
|
Optional TTL in seconds for artifact-based schema
cache entries ( |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If url or group_id is empty, max_retries < 0, cache_max_size < 1, or cache_ttl_seconds <= 0. |
Example
from apicurio_serdes import ApicurioRegistryClient
client = ApicurioRegistryClient(
url="http://localhost:8080/apis/registry/v3",
group_id="com.example.schemas",
)
schema = client.get_schema("UserEvent")
get_schema
get_schema(artifact_id)
Retrieve an Avro schema by artifact ID.
Returns a cached result on subsequent calls for the same artifact_id (FR-006).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact_id
|
str
|
The artifact identifier within the configured group. |
required |
Returns:
| Type | Description |
|---|---|
CachedSchema
|
CachedSchema with parsed schema and content_id. |
Raises:
| Type | Description |
|---|---|
SchemaNotFoundError
|
If the artifact does not exist (HTTP 404). |
RegistryConnectionError
|
If the registry is unreachable or returns a persistent error after all retries are exhausted. |
RuntimeError
|
If the client has been closed. |
get_schema_by_global_id
get_schema_by_global_id(global_id)
Retrieve an Avro schema by its globalId.
Returns a cached result on subsequent calls for the same globalId (FR-007).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
global_id
|
int
|
The globalId from the wire format header. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed Avro schema as a Python dict. |
Raises:
| Type | Description |
|---|---|
SchemaNotFoundError
|
If no schema exists for this globalId (FR-010). |
RegistryConnectionError
|
If the registry is unreachable (FR-012). |
RuntimeError
|
If the client has been closed. |
get_schema_by_content_id
get_schema_by_content_id(content_id)
Retrieve an Avro schema by its contentId.
Returns a cached result on subsequent calls for the same contentId (FR-007).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content_id
|
int
|
The contentId from the wire format header. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed Avro schema as a Python dict. |
Raises:
| Type | Description |
|---|---|
SchemaNotFoundError
|
If no schema exists for this contentId (FR-010). |
RegistryConnectionError
|
If the registry is unreachable (FR-012). |
RuntimeError
|
If the client has been closed. |
register_schema
register_schema(
artifact_id, schema, if_exists="FIND_OR_CREATE_VERSION"
)
Register a schema artifact with the registry.
Posts the schema to the registry under the configured group. On success,
populates the internal cache so subsequent get_schema calls are cache
hits with no additional HTTP request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact_id
|
str
|
The artifact identifier to register under. |
required |
schema
|
dict[str, Any]
|
The Avro schema dict to register. |
required |
if_exists
|
Literal['FAIL', 'CREATE_VERSION', 'FIND_OR_CREATE_VERSION']
|
Behaviour when the artifact already exists.
|
'FIND_OR_CREATE_VERSION'
|
Returns:
| Type | Description |
|---|---|
CachedSchema
|
CachedSchema with the registered schema and registry-assigned IDs. |
Raises:
| Type | Description |
|---|---|
SchemaRegistrationError
|
If the registry returns a 4xx or 5xx response. |
RegistryConnectionError
|
If the registry is unreachable. |
RuntimeError
|
If the client has been closed. |
close
close()
Close the underlying HTTP connection pool.
Call this when the client is no longer needed and you are not
using it as a context manager. Safe to call multiple times.
When a custom http_client was provided at construction, it
is not closed — the caller retains ownership.
__enter__
__enter__()
Enter the context manager. Returns self.
__exit__
__exit__(exc_type, exc_val, exc_tb)
Exit the context manager. Calls close().
AsyncApicurioRegistryClient
apicurio_serdes._async_client.AsyncApicurioRegistryClient
Bases: _RegistryClientBase
Async HTTP client for the Apicurio Registry v3 native API.
Non-blocking counterpart to ApicurioRegistryClient. Uses httpx.AsyncClient for async I/O. Safe for concurrent use from multiple coroutines within the same event loop. Includes automatic retry on transient failures.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Base URL of the Apicurio Registry v3 API. Example: "http://registry:8080/apis/registry/v3" |
required |
group_id
|
str
|
Schema group identifier. Applied to every schema lookup made by this client instance. |
required |
max_retries
|
int
|
Maximum number of retry attempts on transient failures (transport errors and HTTP 429/502/503/504). Defaults to 3. Set to 0 to disable retries. |
3
|
retry_backoff_ms
|
int
|
Base backoff delay in milliseconds for the first retry. Subsequent retries use exponential backoff with full jitter. Defaults to 1000. |
1000
|
retry_max_backoff_ms
|
int
|
Maximum backoff delay cap in milliseconds. Defaults to 20000. |
20000
|
http_client
|
AsyncClient | None
|
Optional pre-configured |
None
|
auth
|
Any
|
Optional httpx-compatible authentication handler. Ignored
when |
None
|
cache_max_size
|
int
|
Maximum number of entries in each cache (LRU eviction). Applies to both the schema cache and the ID cache. Defaults to 1000. |
1000
|
cache_ttl_seconds
|
float | None
|
Optional TTL in seconds for artifact-based schema
cache entries ( |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If url or group_id is empty, max_retries < 0, cache_max_size < 1, or cache_ttl_seconds <= 0. |
get_schema
async
get_schema(artifact_id)
Retrieve an Avro schema by artifact ID (async).
Returns a cached result on subsequent calls for the same artifact_id. Safe for concurrent invocation: concurrent first-time fetches for the same artifact_id result in exactly one HTTP request (NFR-001).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact_id
|
str
|
The artifact identifier within the configured group. |
required |
Returns:
| Type | Description |
|---|---|
CachedSchema
|
CachedSchema with parsed schema dict, global_id, and content_id. |
Raises:
| Type | Description |
|---|---|
SchemaNotFoundError
|
If the artifact does not exist (HTTP 404). |
RegistryConnectionError
|
If the registry is unreachable or returns a persistent error after all retries are exhausted. |
RuntimeError
|
If the client has been closed. |
get_schema_by_global_id
async
get_schema_by_global_id(global_id)
Retrieve an Avro schema by its globalId (async).
Returns a cached result on subsequent calls for the same globalId.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
global_id
|
int
|
The globalId from the wire format header. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed Avro schema as a Python dict. |
Raises:
| Type | Description |
|---|---|
SchemaNotFoundError
|
If no schema exists for this globalId. |
RegistryConnectionError
|
If the registry is unreachable. |
RuntimeError
|
If the client has been closed. |
get_schema_by_content_id
async
get_schema_by_content_id(content_id)
Retrieve an Avro schema by its contentId (async).
Returns a cached result on subsequent calls for the same contentId.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content_id
|
int
|
The contentId from the wire format header. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed Avro schema as a Python dict. |
Raises:
| Type | Description |
|---|---|
SchemaNotFoundError
|
If no schema exists for this contentId. |
RegistryConnectionError
|
If the registry is unreachable. |
RuntimeError
|
If the client has been closed. |
register_schema
async
register_schema(
artifact_id, schema, if_exists="FIND_OR_CREATE_VERSION"
)
Register a schema artifact with the registry (async).
Posts the schema to the registry under the configured group. On success,
populates the internal cache so subsequent get_schema calls are cache
hits with no additional HTTP request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact_id
|
str
|
The artifact identifier to register under. |
required |
schema
|
dict[str, Any]
|
The Avro schema dict to register. |
required |
if_exists
|
Literal['FAIL', 'CREATE_VERSION', 'FIND_OR_CREATE_VERSION']
|
Behaviour when the artifact already exists.
|
'FIND_OR_CREATE_VERSION'
|
Returns:
| Type | Description |
|---|---|
CachedSchema
|
CachedSchema with the registered schema and registry-assigned IDs. |
Raises:
| Type | Description |
|---|---|
SchemaRegistrationError
|
If the registry returns a 4xx or 5xx response. |
RegistryConnectionError
|
If the registry is unreachable. |
RuntimeError
|
If the client has been closed. |
aclose
async
aclose()
Close the underlying HTTP connection pool.
Call this when the client is no longer needed and you are not
using it as an async context manager. Safe to call multiple times.
When a custom http_client was provided at construction, it
is not closed — the caller retains ownership.
__aenter__
async
__aenter__()
Enter the async context manager. Returns self.
__aexit__
async
__aexit__(exc_type, exc_val, exc_tb)
Exit the async context manager. Calls aclose().
CachedSchema
apicurio_serdes._base.CachedSchema
dataclass
Internal value object holding a resolved schema and registry metadata.
Attributes:
| Name | Type | Description |
|---|---|---|
schema |
dict[str, Any]
|
Parsed Avro schema (Python dict, fastavro-ready). |
global_id |
int
|
Apicurio globalId from X-Registry-GlobalId header. |
content_id |
int
|
Apicurio contentId from X-Registry-ContentId header. |
SerializationContext
apicurio_serdes.serialization.SerializationContext
dataclass
Carries Kafka metadata at serialization time.
Mirrors confluent-kafka's SerializationContext interface (SC-004).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The target Kafka topic name. |
required |
field
|
MessageField
|
Whether this datum is a message key or value. |
required |
MessageField
apicurio_serdes.serialization.MessageField
Bases: Enum
Identifies whether serialized data is a Kafka key or value.
Mirrors confluent-kafka's MessageField enum (SC-004).
WireFormat
apicurio_serdes.serialization.WireFormat
Bases: Enum
Selects the wire format framing for an AvroSerializer.
Members
CONFLUENT_PAYLOAD: Default. Schema identifier embedded in message bytes as a magic byte (0x00) + 4-byte big-endian uint32 prefix. KAFKA_HEADERS: Schema identifier communicated as a Kafka message header. Message bytes contain only the raw Avro binary payload.
Authentification
BearerAuth
apicurio_serdes._auth.BearerAuth
Bases: Auth
Bearer token authentication for Apicurio registry clients.
Exactly one of token or token_provider must be supplied.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str | None
|
Static Bearer token string. |
None
|
token_provider
|
Callable[[], str] | None
|
Zero-argument callable that returns a token string.
Called on every request, so it can return fresh tokens (e.g.
GCP OIDC identity tokens retrieved via |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither or both of token and token_provider are given, or if token is an empty string. |
Example
# Static token
auth = BearerAuth(token="my-static-token")
# Dynamic token (GCP OIDC)
auth = BearerAuth(token_provider=lambda: get_google_id_token())
client = ApicurioRegistryClient(url="...", group_id="...", auth=auth)
auth_flow
auth_flow(request)
Inject Authorization header; works for both sync and async clients.
KeycloakAuth
apicurio_serdes._auth.KeycloakAuth
Bases: Auth
OAuth2 client credentials auth against a Keycloak token endpoint.
Fetches a token on first use and automatically refreshes it when less than 20% of its TTL remains (threshold-based refresh).
Implements separate sync_auth_flow / async_auth_flow so that
async callers never block the event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_url
|
str
|
Full URL of the Keycloak token endpoint, e.g.
|
required |
client_id
|
str
|
OAuth2 client ID. |
required |
client_secret
|
str
|
OAuth2 client secret. |
required |
scope
|
str | None
|
Optional OAuth2 scope string (e.g. |
None
|
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If the token endpoint is unreachable, returns a
non-200 response, or returns a 200 response with a malformed body
(missing or empty |
Example
auth = KeycloakAuth(
token_url="https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token",
client_id="my-client",
client_secret="secret",
)
client = ApicurioRegistryClient(url="...", group_id="...", auth=auth)
sync_auth_flow
sync_auth_flow(request)
Sync auth flow: ensure token, inject header.
async_auth_flow
async
async_auth_flow(request)
Async auth flow: ensure token without blocking, inject header.
Avro
AvroSerializer
apicurio_serdes.avro._serializer.AvroSerializer
Serializes Python data to Avro bytes with configurable wire format framing.
Fetches the Avro schema from the registry on first call and caches it via the underlying ApicurioRegistryClient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_client
|
ApicurioRegistryClient
|
An ApicurioRegistryClient instance. |
required |
artifact_id
|
str | None
|
The static artifact identifier for the target schema.
Mutually exclusive with |
None
|
artifact_resolver
|
ArtifactResolver | None
|
A callable |
None
|
schema
|
dict[str, Any] | None
|
The Avro schema dict to register. Required when
|
None
|
auto_register
|
bool
|
When |
False
|
if_exists
|
Literal['FAIL', 'CREATE_VERSION', 'FIND_OR_CREATE_VERSION']
|
Behaviour when the artifact already exists during
auto-registration. One of |
'FIND_OR_CREATE_VERSION'
|
to_dict
|
Callable[[Any, SerializationContext], dict[str, Any]] | None
|
Optional callable that converts input data to a dict
before Avro encoding. Signature: |
None
|
use_id
|
Literal['globalId', 'contentId']
|
Which registry-assigned identifier to use as the schema ID.
|
'globalId'
|
strict
|
bool
|
When |
False
|
wire_format
|
WireFormat
|
The wire format framing mode. Defaults to WireFormat.CONFLUENT_PAYLOAD (FR-003). |
CONFLUENT_PAYLOAD
|
use_latest_version
|
bool
|
Reserved flag for API consistency with
|
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both or neither of |
Example
from apicurio_serdes import ApicurioRegistryClient
from apicurio_serdes.avro import AvroSerializer, TopicIdStrategy
from apicurio_serdes.serialization import (
SerializationContext,
MessageField,
)
client = ApicurioRegistryClient(
url="http://localhost:8080/apis/registry/v3",
group_id="com.example.schemas",
)
serializer = AvroSerializer(
registry_client=client,
artifact_resolver=TopicIdStrategy(),
)
ctx = SerializationContext(
topic="user-events", field=MessageField.VALUE,
)
payload: bytes = serializer(
{"userId": "abc-123", "country": "FR"}, ctx,
)
serialize
serialize(data, ctx)
Serialize data and return payload bytes plus any Kafka headers.
For CONFLUENT_PAYLOAD: returns framed bytes (unchanged from call) with an empty headers dict. For KAFKA_HEADERS: returns raw Avro binary as payload and a one-entry headers dict with the schema ID encoded per Apicurio's native convention.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
The data to serialize. Must be a dict (or convertible via to_dict) conforming to the Avro schema. |
required |
ctx
|
SerializationContext
|
Serialization context with topic and field metadata. |
required |
Returns:
| Type | Description |
|---|---|
SerializedMessage
|
SerializedMessage with payload bytes and headers dict. |
Raises:
| Type | Description |
|---|---|
ResolverError
|
If the artifact_resolver raises or returns something other than a non-empty str. |
SchemaNotFoundError
|
If the resolved artifact does not exist in the
registry and |
SchemaRegistrationError
|
If |
RegistryConnectionError
|
If the registry is unreachable. |
SerializationError
|
If the to_dict callable raises an exception. |
ValueError
|
If data does not conform to the Avro schema. |
__call__
__call__(data, ctx)
Serialize data to bytes. Delegates to serialize() and returns payload.
Only supported for CONFLUENT_PAYLOAD wire format. For KAFKA_HEADERS, use serialize() which returns a SerializedMessage with both payload and headers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
The data to serialize. Must be a dict (or convertible via to_dict) conforming to the Avro schema. |
required |
ctx
|
SerializationContext
|
Serialization context with topic and field metadata. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
Serialized bytes (CONFLUENT_PAYLOAD framed). |
Raises:
| Type | Description |
|---|---|
TypeError
|
If wire_format is KAFKA_HEADERS (use serialize() instead). |
ResolverError
|
If the artifact_resolver raises or returns something other than a non-empty str. |
SchemaNotFoundError
|
If the resolved artifact does not exist in the
registry and |
SchemaRegistrationError
|
If |
RegistryConnectionError
|
If the registry is unreachable. |
SerializationError
|
If the to_dict callable raises an exception. |
ValueError
|
If data does not conform to the Avro schema. |
AvroDeserializer
apicurio_serdes.avro._deserializer.AvroDeserializer
Bases: _BaseAvroDeserializer
Deserializes Confluent-framed Avro bytes to Python dicts.
Reads the wire format header to extract the schema identifier, resolves the schema from the registry, and decodes the Avro payload. Optionally applies a from_dict transformation hook.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_client
|
ApicurioRegistryClient
|
An ApicurioRegistryClient instance. |
required |
from_dict
|
Callable[[dict[str, Any], SerializationContext], Any] | None
|
Optional callable that converts the decoded dict to a domain object. Signature: (data, ctx) -> Any. When None, the decoded dict is returned directly (FR-008). |
None
|
use_id
|
Literal['globalId', 'contentId']
|
Which registry identifier type the 4-byte wire format field represents. Must match the serializer's use_id setting. Defaults to "globalId". |
'globalId'
|
artifact_id
|
str | None
|
Artifact identifier used to fetch the latest registry
schema as the reader schema. Requires
|
None
|
artifact_resolver
|
ArtifactResolver | None
|
Callable |
None
|
use_latest_version
|
bool
|
When |
False
|
reader_schema
|
dict[str, Any] | None
|
Optional Avro schema dict used as the reader schema
during deserialization. When provided, fastavro
performs schema resolution between the writer schema
(embedded in the message) and this reader schema,
enabling field defaults to fill gaps for added fields,
type promotions, and other Avro evolution rules. When
None (default), the writer schema is used for both
roles (no evolution). Parsed once at construction time.
Mutually exclusive with |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Example
from apicurio_serdes import ApicurioRegistryClient
from apicurio_serdes.avro import AvroDeserializer
from apicurio_serdes.serialization import SerializationContext, MessageField
client = ApicurioRegistryClient(
url="http://localhost:8080/apis/registry/v3",
group_id="com.example.schemas",
)
# Dynamically use the latest registry schema as the reader schema:
deserializer = AvroDeserializer(
registry_client=client,
artifact_id="UserEvent",
use_latest_version=True,
)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)
result = deserializer(raw_bytes, ctx)
__call__
__call__(data, ctx)
Deserialize Confluent-framed Avro bytes.
Input format (FR-003): Byte 0: Magic byte 0x00 Bytes 1-4: Schema identifier as 4-byte big-endian unsigned int Bytes 5+: Avro binary payload (schemaless encoding)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
bytes
|
Confluent wire format bytes to deserialize. |
required |
ctx
|
SerializationContext
|
Serialization context with topic and field metadata. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
A Python dict (or the result of from_dict if configured). |
Raises:
| Type | Description |
|---|---|
DeserializationError
|
If the input is fewer than 5 bytes (FR-004), the magic byte is not 0x00 (FR-003), the Avro payload cannot be decoded (FR-011), or the from_dict callable raises an exception (FR-009). |
SchemaNotFoundError
|
If the schema identifier does not correspond to any schema in the registry (FR-010). |
RegistryConnectionError
|
If the registry is unreachable during schema resolution (FR-012). |
ResolverError
|
If |
AsyncAvroDeserializer
apicurio_serdes.avro._async_deserializer.AsyncAvroDeserializer
Bases: _BaseAvroDeserializer
Deserializes Confluent-framed Avro bytes to Python dicts (async).
Non-blocking counterpart to AvroDeserializer. Uses AsyncApicurioRegistryClient for schema resolution, making it suitable for use in fully async services without blocking the event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_client
|
AsyncApicurioRegistryClient
|
An AsyncApicurioRegistryClient instance. |
required |
from_dict
|
Callable[[dict[str, Any], SerializationContext], Any] | None
|
Optional callable that converts the decoded dict to a domain object. Signature: (data, ctx) -> Any. When None, the decoded dict is returned directly. |
None
|
use_id
|
Literal['globalId', 'contentId']
|
Which registry identifier type the 4-byte wire format field represents. Must match the serializer's use_id setting. Defaults to "globalId". |
'globalId'
|
artifact_id
|
str | None
|
Artifact identifier used to fetch the latest registry
schema as the reader schema. Requires
|
None
|
artifact_resolver
|
ArtifactResolver | None
|
Callable |
None
|
use_latest_version
|
bool
|
When |
False
|
reader_schema
|
dict[str, Any] | None
|
Optional Avro schema dict used as the reader schema
during deserialization. When provided, fastavro
performs schema resolution between the writer schema
(embedded in the message) and this reader schema,
enabling field defaults to fill gaps for added fields,
type promotions, and other Avro evolution rules. When
None (default), the writer schema is used for both
roles (no evolution). Parsed once at construction time.
Mutually exclusive with |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Example
from apicurio_serdes import AsyncApicurioRegistryClient
from apicurio_serdes.avro import AsyncAvroDeserializer
from apicurio_serdes.serialization import SerializationContext, MessageField
client = AsyncApicurioRegistryClient(
url="http://localhost:8080/apis/registry/v3",
group_id="com.example.schemas",
)
deserializer = AsyncAvroDeserializer(registry_client=client)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)
async with client:
result = await deserializer(raw_bytes, ctx)
__call__
async
__call__(data, ctx)
Deserialize Confluent-framed Avro bytes (async).
Input format
Byte 0: Magic byte 0x00 Bytes 1-4: Schema identifier as 4-byte big-endian unsigned int Bytes 5+: Avro binary payload (schemaless encoding)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
bytes
|
Confluent wire format bytes to deserialize. |
required |
ctx
|
SerializationContext
|
Serialization context with topic and field metadata. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
A Python dict (or the result of from_dict if configured). |
Raises:
| Type | Description |
|---|---|
DeserializationError
|
If the input is fewer than 5 bytes, the magic byte is not 0x00, the Avro payload cannot be decoded, or the from_dict callable raises an exception. |
SchemaNotFoundError
|
If the schema identifier does not correspond to any schema in the registry. |
RegistryConnectionError
|
If the registry is unreachable during schema resolution. |
ResolverError
|
If |
TopicIdStrategy
apicurio_serdes.avro._strategies.TopicIdStrategy
Derives the artifact ID from the topic name and message field.
Returns "{topic}-{field}" (e.g. "orders-value" or
"orders-key"), matching the Apicurio Java reference implementation's
TopicIdStrategy.
Example
from apicurio_serdes.avro import TopicIdStrategy
from apicurio_serdes.serialization import MessageField, SerializationContext
strategy = TopicIdStrategy()
ctx = SerializationContext(topic="orders", field=MessageField.VALUE)
artifact_id = strategy(ctx) # "orders-value"
__call__
__call__(ctx)
Return "{topic}-{field}" for the given context.
SimpleTopicIdStrategy
apicurio_serdes.avro._strategies.SimpleTopicIdStrategy
Derives the artifact ID from the topic name only.
Returns "{topic}" (e.g. "orders"), matching the Apicurio Java
reference implementation's SimpleTopicIdStrategy.
Example
from apicurio_serdes.avro import SimpleTopicIdStrategy
from apicurio_serdes.serialization import MessageField, SerializationContext
strategy = SimpleTopicIdStrategy()
ctx = SerializationContext(topic="orders", field=MessageField.VALUE)
artifact_id = strategy(ctx) # "orders"
__call__
__call__(ctx)
Return the topic name for the given context.
QualifiedRecordIdStrategy
apicurio_serdes.avro._strategies.QualifiedRecordIdStrategy
Derives the artifact ID from the Avro schema's record name and namespace.
Returns "{namespace}.{name}" when namespace is present, "{name}"
otherwise. Matches the Confluent RecordNameStrategy and Apicurio's
qualified-record convention.
The topic and message field are ignored — the artifact ID is fixed at construction time from the schema.
Note
The Java RecordIdStrategy (groupId=namespace routing) is not
implemented. Use the group_id parameter on the Apicurio client for
that routing behaviour.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
dict[str, Any]
|
Avro schema dict. Must contain a non-empty |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Example
from apicurio_serdes.avro import QualifiedRecordIdStrategy
schema = {
"type": "record", "name": "Order",
"namespace": "com.example", "fields": [],
}
strategy = QualifiedRecordIdStrategy(schema)
# strategy(ctx) == "com.example.Order"
__call__
__call__(ctx)
Return the qualified record name, ignoring context.
TopicRecordIdStrategy
apicurio_serdes.avro._strategies.TopicRecordIdStrategy
Derives the artifact ID from the topic and Avro schema's record name.
Returns "{topic}-{namespace}.{name}" when namespace is present,
"{topic}-{name}" otherwise. Matches the Confluent
TopicRecordNameStrategy.
The artifact ID is partially fixed at construction time (record part) and partially resolved at call time (topic).
Note
The Java RecordIdStrategy (groupId=namespace routing) is not
implemented. Use the group_id parameter on the Apicurio client for
that routing behaviour.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
dict[str, Any]
|
Avro schema dict. Must contain a non-empty |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Example
from apicurio_serdes.avro import TopicRecordIdStrategy
from apicurio_serdes.serialization import MessageField, SerializationContext
schema = {
"type": "record", "name": "Order",
"namespace": "com.example", "fields": [],
}
strategy = TopicRecordIdStrategy(schema)
ctx = SerializationContext(topic="orders", field=MessageField.VALUE)
# strategy(ctx) == "orders-com.example.Order"
__call__
__call__(ctx)
Return "{topic}-{record}" for the given context.
Exceptions
SchemaNotFoundError
apicurio_serdes._errors.SchemaNotFoundError
Bases: Exception
Raised when an artifact or schema ID does not exist in the registry.
Two construction paths set different attributes:
Artifact-based lookups (SchemaNotFoundError(group_id, artifact_id)):
group_id: The group that was searched.
artifact_id: The artifact that was not found.
ID-based lookups (SchemaNotFoundError.from_id(id_type, id_value)):
id_type: The ID type that was searched ("globalId" or "contentId").
id_value: The numeric ID that was not found.
from_id
classmethod
from_id(id_type, id_value)
Create a SchemaNotFoundError for an ID-based lookup failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id_type
|
str
|
"globalId" or "contentId". |
required |
id_value
|
int
|
The numeric identifier that was not found. |
required |
Returns:
| Type | Description |
|---|---|
SchemaNotFoundError
|
A SchemaNotFoundError with id_type and id_value attributes set. |
RegistryConnectionError
apicurio_serdes._errors.RegistryConnectionError
Bases: Exception
Raised when the Apicurio Registry is unreachable.
Wraps the underlying network exception as __cause__. Includes
the registry URL in the error message so callers can identify which
endpoint failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
The registry base URL that could not be reached. |
required |
cause
|
Exception
|
The underlying network exception. |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
url |
The registry base URL that could not be reached. |
SchemaRegistrationError
apicurio_serdes._errors.SchemaRegistrationError
Bases: Exception
Raised when the registry rejects a schema registration request.
Covers 4xx and 5xx responses from the artifact creation endpoint and
missing JSON fields in the response body. Transport errors (network
failures) raise RegistryConnectionError instead.
Note
The exception message includes str(cause), which may contain the
full HTTP response body. Sanitise before logging in environments where
registry error responses may contain sensitive information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact_id
|
str
|
The artifact identifier that failed to register. |
required |
cause
|
Exception
|
The underlying exception (HTTP error or missing JSON field). |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
artifact_id |
The artifact identifier that failed to register. |
|
cause |
The underlying exception. |
SerializationError
apicurio_serdes._errors.SerializationError
Bases: Exception
Raised when the to_dict callable fails during serialization.
Wraps the original exception as __cause__ so the full traceback is
preserved. Distinguishes hook failure from Avro schema validation errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cause
|
Exception
|
The original exception raised by the |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
cause |
The original exception raised by the |
ResolverError
apicurio_serdes._errors.ResolverError
Bases: Exception
Raised when the artifact_resolver callable fails during serialization.
Covers two failure modes:
- The resolver raised an exception (original set as
__cause__). - The resolver returned a value that is not a non-empty string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Human-readable description of the failure. |
required |
cause
|
Exception | None
|
The original exception raised by the resolver, if any. |
None
|
Attributes:
| Name | Type | Description |
|---|---|---|
cause |
The original exception, if any. |
DeserializationError
apicurio_serdes._errors.DeserializationError
Bases: Exception
Raised when deserialization fails (FR-003, FR-004, FR-009, FR-011).
Covers: invalid magic byte, input too short, Avro decode failure, and from_dict hook failure. When wrapping an underlying exception, the original is set as cause to preserve the full traceback.
Attributes:
| Name | Type | Description |
|---|---|---|
cause |
The original exception, if any. |
AuthenticationError
apicurio_serdes._errors.AuthenticationError
Bases: Exception
Raised when authentication with the token endpoint fails.
Covers: token endpoint unreachable, non-200 response, or a 200 response
with a malformed body (missing or empty access_token, missing or
non-positive expires_in, or non-JSON).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Description of the authentication failure. |
required |