Aller au contenu

Référence API

Référence complète de toutes les classes, méthodes et exceptions publiques d'apicurio-serdes.

Noyau

ApicurioRegistryClient

apicurio_serdes._client.ApicurioRegistryClient

Bases: _RegistryClientBase

HTTP client for the Apicurio Registry v3 native API.

Handles schema retrieval by group_id / artifact_id with built-in caching and automatic retry on transient failures. Thread-safe for read operations.

Parameters:

Name Type Description Default
url str

Base URL of the Apicurio Registry v3 API. Example: "http://registry:8080/apis/registry/v3".

required
group_id str

Schema group identifier. Applied to every schema lookup made by this client instance.

required
max_retries int

Maximum number of retry attempts on transient failures (transport errors and HTTP 429/502/503/504). Defaults to 3. Set to 0 to disable retries.

3
retry_backoff_ms int

Base backoff delay in milliseconds for the first retry. Subsequent retries use exponential backoff with full jitter. Defaults to 1000.

1000
retry_max_backoff_ms int

Maximum backoff delay cap in milliseconds. Defaults to 20000.

20000
http_client Client | None

Optional pre-configured httpx.Client to use for all HTTP requests. When provided, the client is used as-is and will not be closed by :meth:close. Use this to configure custom timeouts, proxies, mTLS, or transport-level retry. When None (default), a new httpx.Client is created and managed internally.

None
auth Any

Optional httpx-compatible authentication handler (e.g. BearerAuth). Ignored when http_client is provided.

None
cache_max_size int

Maximum number of entries in each cache (LRU eviction). Applies to both the schema cache and the ID cache. Defaults to 1000.

1000
cache_ttl_seconds float | None

Optional TTL in seconds for artifact-based schema cache entries (get_schema, register_schema). ID-based lookups (get_schema_by_global_id, get_schema_by_content_id) are content-addressed and never expire. Defaults to None (no expiry).

None

Raises:

Type Description
ValueError

If url or group_id is empty, max_retries < 0, cache_max_size < 1, or cache_ttl_seconds <= 0.

Example
from apicurio_serdes import ApicurioRegistryClient

client = ApicurioRegistryClient(
    url="http://localhost:8080/apis/registry/v3",
    group_id="com.example.schemas",
)
schema = client.get_schema("UserEvent")

get_schema

get_schema(artifact_id)

Retrieve an Avro schema by artifact ID.

Returns a cached result on subsequent calls for the same artifact_id (FR-006).

Parameters:

Name Type Description Default
artifact_id str

The artifact identifier within the configured group.

required

Returns:

Type Description
CachedSchema

CachedSchema with parsed schema and content_id.

Raises:

Type Description
SchemaNotFoundError

If the artifact does not exist (HTTP 404).

RegistryConnectionError

If the registry is unreachable or returns a persistent error after all retries are exhausted.

RuntimeError

If the client has been closed.

get_schema_by_global_id

get_schema_by_global_id(global_id)

Retrieve an Avro schema by its globalId.

Returns a cached result on subsequent calls for the same globalId (FR-007).

Parameters:

Name Type Description Default
global_id int

The globalId from the wire format header.

required

Returns:

Type Description
dict[str, Any]

Parsed Avro schema as a Python dict.

Raises:

Type Description
SchemaNotFoundError

If no schema exists for this globalId (FR-010).

RegistryConnectionError

If the registry is unreachable (FR-012).

RuntimeError

If the client has been closed.

get_schema_by_content_id

get_schema_by_content_id(content_id)

Retrieve an Avro schema by its contentId.

Returns a cached result on subsequent calls for the same contentId (FR-007).

Parameters:

Name Type Description Default
content_id int

The contentId from the wire format header.

required

Returns:

Type Description
dict[str, Any]

Parsed Avro schema as a Python dict.

Raises:

Type Description
SchemaNotFoundError

If no schema exists for this contentId (FR-010).

RegistryConnectionError

If the registry is unreachable (FR-012).

RuntimeError

If the client has been closed.

register_schema

register_schema(
    artifact_id, schema, if_exists="FIND_OR_CREATE_VERSION"
)

Register a schema artifact with the registry.

Posts the schema to the registry under the configured group. On success, populates the internal cache so subsequent get_schema calls are cache hits with no additional HTTP request.

Parameters:

Name Type Description Default
artifact_id str

The artifact identifier to register under.

required
schema dict[str, Any]

The Avro schema dict to register.

required
if_exists Literal['FAIL', 'CREATE_VERSION', 'FIND_OR_CREATE_VERSION']

Behaviour when the artifact already exists. "FIND_OR_CREATE_VERSION" (default) returns the existing version if the content matches, otherwise creates a new version. "FAIL" raises on conflict. "CREATE_VERSION" always creates a new version.

'FIND_OR_CREATE_VERSION'

Returns:

Type Description
CachedSchema

CachedSchema with the registered schema and registry-assigned IDs.

Raises:

Type Description
SchemaRegistrationError

If the registry returns a 4xx or 5xx response.

RegistryConnectionError

If the registry is unreachable.

RuntimeError

If the client has been closed.

close

close()

Close the underlying HTTP connection pool.

Call this when the client is no longer needed and you are not using it as a context manager. Safe to call multiple times. When a custom http_client was provided at construction, it is not closed — the caller retains ownership.

__enter__

__enter__()

Enter the context manager. Returns self.

__exit__

__exit__(exc_type, exc_val, exc_tb)

Exit the context manager. Calls close().

AsyncApicurioRegistryClient

apicurio_serdes._async_client.AsyncApicurioRegistryClient

Bases: _RegistryClientBase

Async HTTP client for the Apicurio Registry v3 native API.

Non-blocking counterpart to ApicurioRegistryClient. Uses httpx.AsyncClient for async I/O. Safe for concurrent use from multiple coroutines within the same event loop. Includes automatic retry on transient failures.

Parameters:

Name Type Description Default
url str

Base URL of the Apicurio Registry v3 API. Example: "http://registry:8080/apis/registry/v3"

required
group_id str

Schema group identifier. Applied to every schema lookup made by this client instance.

required
max_retries int

Maximum number of retry attempts on transient failures (transport errors and HTTP 429/502/503/504). Defaults to 3. Set to 0 to disable retries.

3
retry_backoff_ms int

Base backoff delay in milliseconds for the first retry. Subsequent retries use exponential backoff with full jitter. Defaults to 1000.

1000
retry_max_backoff_ms int

Maximum backoff delay cap in milliseconds. Defaults to 20000.

20000
http_client AsyncClient | None

Optional pre-configured httpx.AsyncClient to use for all HTTP requests. When provided, the client is used as-is and will not be closed by :meth:aclose. When None (default), a new httpx.AsyncClient is created and managed internally.

None
auth Any

Optional httpx-compatible authentication handler. Ignored when http_client is provided.

None
cache_max_size int

Maximum number of entries in each cache (LRU eviction). Applies to both the schema cache and the ID cache. Defaults to 1000.

1000
cache_ttl_seconds float | None

Optional TTL in seconds for artifact-based schema cache entries (get_schema, register_schema). ID-based lookups (get_schema_by_global_id, get_schema_by_content_id) are content-addressed and never expire. Defaults to None (no expiry).

None

Raises:

Type Description
ValueError

If url or group_id is empty, max_retries < 0, cache_max_size < 1, or cache_ttl_seconds <= 0.

get_schema async

get_schema(artifact_id)

Retrieve an Avro schema by artifact ID (async).

Returns a cached result on subsequent calls for the same artifact_id. Safe for concurrent invocation: concurrent first-time fetches for the same artifact_id result in exactly one HTTP request (NFR-001).

Parameters:

Name Type Description Default
artifact_id str

The artifact identifier within the configured group.

required

Returns:

Type Description
CachedSchema

CachedSchema with parsed schema dict, global_id, and content_id.

Raises:

Type Description
SchemaNotFoundError

If the artifact does not exist (HTTP 404).

RegistryConnectionError

If the registry is unreachable or returns a persistent error after all retries are exhausted.

RuntimeError

If the client has been closed.

get_schema_by_global_id async

get_schema_by_global_id(global_id)

Retrieve an Avro schema by its globalId (async).

Returns a cached result on subsequent calls for the same globalId.

Parameters:

Name Type Description Default
global_id int

The globalId from the wire format header.

required

Returns:

Type Description
dict[str, Any]

Parsed Avro schema as a Python dict.

Raises:

Type Description
SchemaNotFoundError

If no schema exists for this globalId.

RegistryConnectionError

If the registry is unreachable.

RuntimeError

If the client has been closed.

get_schema_by_content_id async

get_schema_by_content_id(content_id)

Retrieve an Avro schema by its contentId (async).

Returns a cached result on subsequent calls for the same contentId.

Parameters:

Name Type Description Default
content_id int

The contentId from the wire format header.

required

Returns:

Type Description
dict[str, Any]

Parsed Avro schema as a Python dict.

Raises:

Type Description
SchemaNotFoundError

If no schema exists for this contentId.

RegistryConnectionError

If the registry is unreachable.

RuntimeError

If the client has been closed.

register_schema async

register_schema(
    artifact_id, schema, if_exists="FIND_OR_CREATE_VERSION"
)

Register a schema artifact with the registry (async).

Posts the schema to the registry under the configured group. On success, populates the internal cache so subsequent get_schema calls are cache hits with no additional HTTP request.

Parameters:

Name Type Description Default
artifact_id str

The artifact identifier to register under.

required
schema dict[str, Any]

The Avro schema dict to register.

required
if_exists Literal['FAIL', 'CREATE_VERSION', 'FIND_OR_CREATE_VERSION']

Behaviour when the artifact already exists. "FIND_OR_CREATE_VERSION" (default) returns the existing version if the content matches, otherwise creates a new version. "FAIL" raises on conflict. "CREATE_VERSION" always creates a new version.

'FIND_OR_CREATE_VERSION'

Returns:

Type Description
CachedSchema

CachedSchema with the registered schema and registry-assigned IDs.

Raises:

Type Description
SchemaRegistrationError

If the registry returns a 4xx or 5xx response.

RegistryConnectionError

If the registry is unreachable.

RuntimeError

If the client has been closed.

aclose async

aclose()

Close the underlying HTTP connection pool.

Call this when the client is no longer needed and you are not using it as an async context manager. Safe to call multiple times. When a custom http_client was provided at construction, it is not closed — the caller retains ownership.

__aenter__ async

__aenter__()

Enter the async context manager. Returns self.

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Exit the async context manager. Calls aclose().

CachedSchema

apicurio_serdes._base.CachedSchema dataclass

Internal value object holding a resolved schema and registry metadata.

Attributes:

Name Type Description
schema dict[str, Any]

Parsed Avro schema (Python dict, fastavro-ready).

global_id int

Apicurio globalId from X-Registry-GlobalId header.

content_id int

Apicurio contentId from X-Registry-ContentId header.

SerializationContext

apicurio_serdes.serialization.SerializationContext dataclass

Carries Kafka metadata at serialization time.

Mirrors confluent-kafka's SerializationContext interface (SC-004).

Parameters:

Name Type Description Default
topic str

The target Kafka topic name.

required
field MessageField

Whether this datum is a message key or value.

required

MessageField

apicurio_serdes.serialization.MessageField

Bases: Enum

Identifies whether serialized data is a Kafka key or value.

Mirrors confluent-kafka's MessageField enum (SC-004).

WireFormat

apicurio_serdes.serialization.WireFormat

Bases: Enum

Selects the wire format framing for an AvroSerializer.

Members

CONFLUENT_PAYLOAD: Default. Schema identifier embedded in message bytes as a magic byte (0x00) + 4-byte big-endian uint32 prefix. KAFKA_HEADERS: Schema identifier communicated as a Kafka message header. Message bytes contain only the raw Avro binary payload.

Authentification

BearerAuth

apicurio_serdes._auth.BearerAuth

Bases: Auth

Bearer token authentication for Apicurio registry clients.

Exactly one of token or token_provider must be supplied.

Parameters:

Name Type Description Default
token str | None

Static Bearer token string.

None
token_provider Callable[[], str] | None

Zero-argument callable that returns a token string. Called on every request, so it can return fresh tokens (e.g. GCP OIDC identity tokens retrieved via google-auth).

None

Raises:

Type Description
ValueError

If neither or both of token and token_provider are given, or if token is an empty string.

Example
# Static token
auth = BearerAuth(token="my-static-token")

# Dynamic token (GCP OIDC)
auth = BearerAuth(token_provider=lambda: get_google_id_token())

client = ApicurioRegistryClient(url="...", group_id="...", auth=auth)

auth_flow

auth_flow(request)

Inject Authorization header; works for both sync and async clients.

KeycloakAuth

apicurio_serdes._auth.KeycloakAuth

Bases: Auth

OAuth2 client credentials auth against a Keycloak token endpoint.

Fetches a token on first use and automatically refreshes it when less than 20% of its TTL remains (threshold-based refresh).

Implements separate sync_auth_flow / async_auth_flow so that async callers never block the event loop.

Parameters:

Name Type Description Default
token_url str

Full URL of the Keycloak token endpoint, e.g. "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token".

required
client_id str

OAuth2 client ID.

required
client_secret str

OAuth2 client secret.

required
scope str | None

Optional OAuth2 scope string (e.g. "openid").

None

Raises:

Type Description
AuthenticationError

If the token endpoint is unreachable, returns a non-200 response, or returns a 200 response with a malformed body (missing or empty access_token, missing or non-positive expires_in, or non-JSON).

Example
auth = KeycloakAuth(
    token_url="https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token",
    client_id="my-client",
    client_secret="secret",
)
client = ApicurioRegistryClient(url="...", group_id="...", auth=auth)

sync_auth_flow

sync_auth_flow(request)

Sync auth flow: ensure token, inject header.

async_auth_flow async

async_auth_flow(request)

Async auth flow: ensure token without blocking, inject header.

Avro

AvroSerializer

apicurio_serdes.avro._serializer.AvroSerializer

Serializes Python data to Avro bytes with configurable wire format framing.

Fetches the Avro schema from the registry on first call and caches it via the underlying ApicurioRegistryClient.

Parameters:

Name Type Description Default
registry_client ApicurioRegistryClient

An ApicurioRegistryClient instance.

required
artifact_id str | None

The static artifact identifier for the target schema. Mutually exclusive with artifact_resolver.

None
artifact_resolver ArtifactResolver | None

A callable (ctx) -> str that derives the artifact ID from the serialization context at first serialize. Mutually exclusive with artifact_id. Built-in strategies: :class:~apicurio_serdes.avro.TopicIdStrategy and :class:~apicurio_serdes.avro.SimpleTopicIdStrategy.

None
schema dict[str, Any] | None

The Avro schema dict to register. Required when auto_register=True; ignored otherwise.

None
auto_register bool

When True, register schema with the registry on first serialize if the artifact is not found (HTTP 404). Disabled by default.

False
if_exists Literal['FAIL', 'CREATE_VERSION', 'FIND_OR_CREATE_VERSION']

Behaviour when the artifact already exists during auto-registration. One of "FAIL", "FIND_OR_CREATE_VERSION" (default), or "CREATE_VERSION". Only consulted when auto_register=True.

'FIND_OR_CREATE_VERSION'
to_dict Callable[[Any, SerializationContext], dict[str, Any]] | None

Optional callable that converts input data to a dict before Avro encoding. Signature: (data, ctx) -> dict. When None, input is passed directly to the encoder.

None
use_id Literal['globalId', 'contentId']

Which registry-assigned identifier to use as the schema ID. "globalId" (default) or "contentId". Applies to both wire format modes.

'globalId'
strict bool

When True, reject extra fields not in the schema.

False
wire_format WireFormat

The wire format framing mode. Defaults to WireFormat.CONFLUENT_PAYLOAD (FR-003).

CONFLUENT_PAYLOAD
use_latest_version bool

Reserved flag for API consistency with AvroDeserializer. Must not be combined with auto_register=True (they are mutually exclusive). Defaults to False.

False

Raises:

Type Description
ValueError

If both or neither of artifact_id / artifact_resolver are provided; if wire_format is not a WireFormat enum member; if use_id is not "globalId" or "contentId"; if if_exists is not one of the four allowed values; if auto_register=True and schema is not provided; or if use_latest_version=True and auto_register=True.

Example
from apicurio_serdes import ApicurioRegistryClient
from apicurio_serdes.avro import AvroSerializer, TopicIdStrategy
from apicurio_serdes.serialization import (
    SerializationContext,
    MessageField,
)

client = ApicurioRegistryClient(
    url="http://localhost:8080/apis/registry/v3",
    group_id="com.example.schemas",
)
serializer = AvroSerializer(
    registry_client=client,
    artifact_resolver=TopicIdStrategy(),
)
ctx = SerializationContext(
    topic="user-events", field=MessageField.VALUE,
)
payload: bytes = serializer(
    {"userId": "abc-123", "country": "FR"}, ctx,
)

serialize

serialize(data, ctx)

Serialize data and return payload bytes plus any Kafka headers.

For CONFLUENT_PAYLOAD: returns framed bytes (unchanged from call) with an empty headers dict. For KAFKA_HEADERS: returns raw Avro binary as payload and a one-entry headers dict with the schema ID encoded per Apicurio's native convention.

Parameters:

Name Type Description Default
data Any

The data to serialize. Must be a dict (or convertible via to_dict) conforming to the Avro schema.

required
ctx SerializationContext

Serialization context with topic and field metadata.

required

Returns:

Type Description
SerializedMessage

SerializedMessage with payload bytes and headers dict.

Raises:

Type Description
ResolverError

If the artifact_resolver raises or returns something other than a non-empty str.

SchemaNotFoundError

If the resolved artifact does not exist in the registry and auto_register=False.

SchemaRegistrationError

If auto_register=True and the registry rejects the registration request (4xx/5xx).

RegistryConnectionError

If the registry is unreachable.

SerializationError

If the to_dict callable raises an exception.

ValueError

If data does not conform to the Avro schema.

__call__

__call__(data, ctx)

Serialize data to bytes. Delegates to serialize() and returns payload.

Only supported for CONFLUENT_PAYLOAD wire format. For KAFKA_HEADERS, use serialize() which returns a SerializedMessage with both payload and headers.

Parameters:

Name Type Description Default
data Any

The data to serialize. Must be a dict (or convertible via to_dict) conforming to the Avro schema.

required
ctx SerializationContext

Serialization context with topic and field metadata.

required

Returns:

Type Description
bytes

Serialized bytes (CONFLUENT_PAYLOAD framed).

Raises:

Type Description
TypeError

If wire_format is KAFKA_HEADERS (use serialize() instead).

ResolverError

If the artifact_resolver raises or returns something other than a non-empty str.

SchemaNotFoundError

If the resolved artifact does not exist in the registry and auto_register=False.

SchemaRegistrationError

If auto_register=True and the registry rejects the registration request (4xx/5xx).

RegistryConnectionError

If the registry is unreachable.

SerializationError

If the to_dict callable raises an exception.

ValueError

If data does not conform to the Avro schema.

AvroDeserializer

apicurio_serdes.avro._deserializer.AvroDeserializer

Bases: _BaseAvroDeserializer

Deserializes Confluent-framed Avro bytes to Python dicts.

Reads the wire format header to extract the schema identifier, resolves the schema from the registry, and decodes the Avro payload. Optionally applies a from_dict transformation hook.

Parameters:

Name Type Description Default
registry_client ApicurioRegistryClient

An ApicurioRegistryClient instance.

required
from_dict Callable[[dict[str, Any], SerializationContext], Any] | None

Optional callable that converts the decoded dict to a domain object. Signature: (data, ctx) -> Any. When None, the decoded dict is returned directly (FR-008).

None
use_id Literal['globalId', 'contentId']

Which registry identifier type the 4-byte wire format field represents. Must match the serializer's use_id setting. Defaults to "globalId".

'globalId'
artifact_id str | None

Artifact identifier used to fetch the latest registry schema as the reader schema. Requires use_latest_version=True. Mutually exclusive with artifact_resolver.

None
artifact_resolver ArtifactResolver | None

Callable (ctx) -> str that returns the artifact identifier at call time. Requires use_latest_version=True. Mutually exclusive with artifact_id. The resolved value is cached after the first successful call.

None
use_latest_version bool

When True, fetches the latest registry schema for the resolved artifact on the first call and uses it as the reader schema (cached per instance). Requires artifact_id or artifact_resolver. Mutually exclusive with reader_schema.

False
reader_schema dict[str, Any] | None

Optional Avro schema dict used as the reader schema during deserialization. When provided, fastavro performs schema resolution between the writer schema (embedded in the message) and this reader schema, enabling field defaults to fill gaps for added fields, type promotions, and other Avro evolution rules. When None (default), the writer schema is used for both roles (no evolution). Parsed once at construction time. Mutually exclusive with use_latest_version.

None

Raises:

Type Description
ValueError

If artifact_id and artifact_resolver are both provided, if use_latest_version=True is used without artifact_id or artifact_resolver, if use_latest_version=True is combined with reader_schema, or if artifact_id / artifact_resolver is provided without use_latest_version=True.

Example
from apicurio_serdes import ApicurioRegistryClient
from apicurio_serdes.avro import AvroDeserializer
from apicurio_serdes.serialization import SerializationContext, MessageField

client = ApicurioRegistryClient(
    url="http://localhost:8080/apis/registry/v3",
    group_id="com.example.schemas",
)
# Dynamically use the latest registry schema as the reader schema:
deserializer = AvroDeserializer(
    registry_client=client,
    artifact_id="UserEvent",
    use_latest_version=True,
)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)
result = deserializer(raw_bytes, ctx)

__call__

__call__(data, ctx)

Deserialize Confluent-framed Avro bytes.

Input format (FR-003): Byte 0: Magic byte 0x00 Bytes 1-4: Schema identifier as 4-byte big-endian unsigned int Bytes 5+: Avro binary payload (schemaless encoding)

Parameters:

Name Type Description Default
data bytes

Confluent wire format bytes to deserialize.

required
ctx SerializationContext

Serialization context with topic and field metadata.

required

Returns:

Type Description
Any

A Python dict (or the result of from_dict if configured).

Raises:

Type Description
DeserializationError

If the input is fewer than 5 bytes (FR-004), the magic byte is not 0x00 (FR-003), the Avro payload cannot be decoded (FR-011), or the from_dict callable raises an exception (FR-009).

SchemaNotFoundError

If the schema identifier does not correspond to any schema in the registry (FR-010).

RegistryConnectionError

If the registry is unreachable during schema resolution (FR-012).

ResolverError

If use_latest_version=True with an artifact_resolver and the resolver raises or returns a non-string value.

AsyncAvroDeserializer

apicurio_serdes.avro._async_deserializer.AsyncAvroDeserializer

Bases: _BaseAvroDeserializer

Deserializes Confluent-framed Avro bytes to Python dicts (async).

Non-blocking counterpart to AvroDeserializer. Uses AsyncApicurioRegistryClient for schema resolution, making it suitable for use in fully async services without blocking the event loop.

Parameters:

Name Type Description Default
registry_client AsyncApicurioRegistryClient

An AsyncApicurioRegistryClient instance.

required
from_dict Callable[[dict[str, Any], SerializationContext], Any] | None

Optional callable that converts the decoded dict to a domain object. Signature: (data, ctx) -> Any. When None, the decoded dict is returned directly.

None
use_id Literal['globalId', 'contentId']

Which registry identifier type the 4-byte wire format field represents. Must match the serializer's use_id setting. Defaults to "globalId".

'globalId'
artifact_id str | None

Artifact identifier used to fetch the latest registry schema as the reader schema. Requires use_latest_version=True. Mutually exclusive with artifact_resolver.

None
artifact_resolver ArtifactResolver | None

Callable (ctx) -> str that returns the artifact identifier at call time. Requires use_latest_version=True. Mutually exclusive with artifact_id. The resolved value is cached after the first successful call.

None
use_latest_version bool

When True, fetches the latest registry schema for the resolved artifact on the first call and uses it as the reader schema (cached per instance). Requires artifact_id or artifact_resolver. Mutually exclusive with reader_schema.

False
reader_schema dict[str, Any] | None

Optional Avro schema dict used as the reader schema during deserialization. When provided, fastavro performs schema resolution between the writer schema (embedded in the message) and this reader schema, enabling field defaults to fill gaps for added fields, type promotions, and other Avro evolution rules. When None (default), the writer schema is used for both roles (no evolution). Parsed once at construction time. Mutually exclusive with use_latest_version.

None

Raises:

Type Description
ValueError

If artifact_id and artifact_resolver are both provided, if use_latest_version=True is used without artifact_id or artifact_resolver, if use_latest_version=True is combined with reader_schema, or if artifact_id / artifact_resolver is provided without use_latest_version=True.

Example
from apicurio_serdes import AsyncApicurioRegistryClient
from apicurio_serdes.avro import AsyncAvroDeserializer
from apicurio_serdes.serialization import SerializationContext, MessageField

client = AsyncApicurioRegistryClient(
    url="http://localhost:8080/apis/registry/v3",
    group_id="com.example.schemas",
)
deserializer = AsyncAvroDeserializer(registry_client=client)
ctx = SerializationContext(topic="user-events", field=MessageField.VALUE)

async with client:
    result = await deserializer(raw_bytes, ctx)

__call__ async

__call__(data, ctx)

Deserialize Confluent-framed Avro bytes (async).

Input format

Byte 0: Magic byte 0x00 Bytes 1-4: Schema identifier as 4-byte big-endian unsigned int Bytes 5+: Avro binary payload (schemaless encoding)

Parameters:

Name Type Description Default
data bytes

Confluent wire format bytes to deserialize.

required
ctx SerializationContext

Serialization context with topic and field metadata.

required

Returns:

Type Description
Any

A Python dict (or the result of from_dict if configured).

Raises:

Type Description
DeserializationError

If the input is fewer than 5 bytes, the magic byte is not 0x00, the Avro payload cannot be decoded, or the from_dict callable raises an exception.

SchemaNotFoundError

If the schema identifier does not correspond to any schema in the registry.

RegistryConnectionError

If the registry is unreachable during schema resolution.

ResolverError

If use_latest_version=True with an artifact_resolver and the resolver raises or returns a non-string value.

TopicIdStrategy

apicurio_serdes.avro._strategies.TopicIdStrategy

Derives the artifact ID from the topic name and message field.

Returns "{topic}-{field}" (e.g. "orders-value" or "orders-key"), matching the Apicurio Java reference implementation's TopicIdStrategy.

Example
from apicurio_serdes.avro import TopicIdStrategy
from apicurio_serdes.serialization import MessageField, SerializationContext

strategy = TopicIdStrategy()
ctx = SerializationContext(topic="orders", field=MessageField.VALUE)
artifact_id = strategy(ctx)  # "orders-value"

__call__

__call__(ctx)

Return "{topic}-{field}" for the given context.

SimpleTopicIdStrategy

apicurio_serdes.avro._strategies.SimpleTopicIdStrategy

Derives the artifact ID from the topic name only.

Returns "{topic}" (e.g. "orders"), matching the Apicurio Java reference implementation's SimpleTopicIdStrategy.

Example
from apicurio_serdes.avro import SimpleTopicIdStrategy
from apicurio_serdes.serialization import MessageField, SerializationContext

strategy = SimpleTopicIdStrategy()
ctx = SerializationContext(topic="orders", field=MessageField.VALUE)
artifact_id = strategy(ctx)  # "orders"

__call__

__call__(ctx)

Return the topic name for the given context.

QualifiedRecordIdStrategy

apicurio_serdes.avro._strategies.QualifiedRecordIdStrategy

Derives the artifact ID from the Avro schema's record name and namespace.

Returns "{namespace}.{name}" when namespace is present, "{name}" otherwise. Matches the Confluent RecordNameStrategy and Apicurio's qualified-record convention.

The topic and message field are ignored — the artifact ID is fixed at construction time from the schema.

Note

The Java RecordIdStrategy (groupId=namespace routing) is not implemented. Use the group_id parameter on the Apicurio client for that routing behaviour.

Parameters:

Name Type Description Default
schema dict[str, Any]

Avro schema dict. Must contain a non-empty "name" key.

required

Raises:

Type Description
ValueError

If schema has no "name" key or the name is empty.

Example
from apicurio_serdes.avro import QualifiedRecordIdStrategy

schema = {
    "type": "record", "name": "Order",
    "namespace": "com.example", "fields": [],
}
strategy = QualifiedRecordIdStrategy(schema)
# strategy(ctx) == "com.example.Order"

__call__

__call__(ctx)

Return the qualified record name, ignoring context.

TopicRecordIdStrategy

apicurio_serdes.avro._strategies.TopicRecordIdStrategy

Derives the artifact ID from the topic and Avro schema's record name.

Returns "{topic}-{namespace}.{name}" when namespace is present, "{topic}-{name}" otherwise. Matches the Confluent TopicRecordNameStrategy.

The artifact ID is partially fixed at construction time (record part) and partially resolved at call time (topic).

Note

The Java RecordIdStrategy (groupId=namespace routing) is not implemented. Use the group_id parameter on the Apicurio client for that routing behaviour.

Parameters:

Name Type Description Default
schema dict[str, Any]

Avro schema dict. Must contain a non-empty "name" key.

required

Raises:

Type Description
ValueError

If schema has no "name" key or the name is empty.

Example
from apicurio_serdes.avro import TopicRecordIdStrategy
from apicurio_serdes.serialization import MessageField, SerializationContext

schema = {
    "type": "record", "name": "Order",
    "namespace": "com.example", "fields": [],
}
strategy = TopicRecordIdStrategy(schema)
ctx = SerializationContext(topic="orders", field=MessageField.VALUE)
# strategy(ctx) == "orders-com.example.Order"

__call__

__call__(ctx)

Return "{topic}-{record}" for the given context.

Exceptions

SchemaNotFoundError

apicurio_serdes._errors.SchemaNotFoundError

Bases: Exception

Raised when an artifact or schema ID does not exist in the registry.

Two construction paths set different attributes:

Artifact-based lookups (SchemaNotFoundError(group_id, artifact_id)): group_id: The group that was searched. artifact_id: The artifact that was not found.

ID-based lookups (SchemaNotFoundError.from_id(id_type, id_value)): id_type: The ID type that was searched ("globalId" or "contentId"). id_value: The numeric ID that was not found.

from_id classmethod

from_id(id_type, id_value)

Create a SchemaNotFoundError for an ID-based lookup failure.

Parameters:

Name Type Description Default
id_type str

"globalId" or "contentId".

required
id_value int

The numeric identifier that was not found.

required

Returns:

Type Description
SchemaNotFoundError

A SchemaNotFoundError with id_type and id_value attributes set.

RegistryConnectionError

apicurio_serdes._errors.RegistryConnectionError

Bases: Exception

Raised when the Apicurio Registry is unreachable.

Wraps the underlying network exception as __cause__. Includes the registry URL in the error message so callers can identify which endpoint failed.

Parameters:

Name Type Description Default
url str

The registry base URL that could not be reached.

required
cause Exception

The underlying network exception.

required

Attributes:

Name Type Description
url

The registry base URL that could not be reached.

SchemaRegistrationError

apicurio_serdes._errors.SchemaRegistrationError

Bases: Exception

Raised when the registry rejects a schema registration request.

Covers 4xx and 5xx responses from the artifact creation endpoint and missing JSON fields in the response body. Transport errors (network failures) raise RegistryConnectionError instead.

Note

The exception message includes str(cause), which may contain the full HTTP response body. Sanitise before logging in environments where registry error responses may contain sensitive information.

Parameters:

Name Type Description Default
artifact_id str

The artifact identifier that failed to register.

required
cause Exception

The underlying exception (HTTP error or missing JSON field).

required

Attributes:

Name Type Description
artifact_id

The artifact identifier that failed to register.

cause

The underlying exception.

SerializationError

apicurio_serdes._errors.SerializationError

Bases: Exception

Raised when the to_dict callable fails during serialization.

Wraps the original exception as __cause__ so the full traceback is preserved. Distinguishes hook failure from Avro schema validation errors.

Parameters:

Name Type Description Default
cause Exception

The original exception raised by the to_dict callable.

required

Attributes:

Name Type Description
cause

The original exception raised by the to_dict callable.

ResolverError

apicurio_serdes._errors.ResolverError

Bases: Exception

Raised when the artifact_resolver callable fails during serialization.

Covers two failure modes:

  • The resolver raised an exception (original set as __cause__).
  • The resolver returned a value that is not a non-empty string.

Parameters:

Name Type Description Default
message str

Human-readable description of the failure.

required
cause Exception | None

The original exception raised by the resolver, if any.

None

Attributes:

Name Type Description
cause

The original exception, if any.

DeserializationError

apicurio_serdes._errors.DeserializationError

Bases: Exception

Raised when deserialization fails (FR-003, FR-004, FR-009, FR-011).

Covers: invalid magic byte, input too short, Avro decode failure, and from_dict hook failure. When wrapping an underlying exception, the original is set as cause to preserve the full traceback.

Attributes:

Name Type Description
cause

The original exception, if any.

AuthenticationError

apicurio_serdes._errors.AuthenticationError

Bases: Exception

Raised when authentication with the token endpoint fails.

Covers: token endpoint unreachable, non-200 response, or a 200 response with a malformed body (missing or empty access_token, missing or non-positive expires_in, or non-JSON).

Parameters:

Name Type Description Default
message str

Description of the authentication failure.

required