Skip to content

Latest commit

 

History

History
942 lines (709 loc) · 30.5 KB

File metadata and controls

942 lines (709 loc) · 30.5 KB

schema_registry

import "github.com/Aleph-Alpha/std/v1/schema_registry"

Package schema_registry provides integration with Confluent Schema Registry.

This package enables schema management, validation, and evolution for Apache Kafka messages and other streaming data platforms. It supports multiple serialization formats including Avro, Protobuf, and JSON Schema.

Architecture

This package follows the "accept interfaces, return structs" design pattern:

  • Registry interface: Defines the contract for schema registry operations
  • Client struct: Concrete implementation of the Registry interface
  • NewClient constructor: Returns *Client (concrete type)
  • FX module: Provides both *Client and Registry interface for dependency injection

Core Features:

  • HTTP client for Confluent Schema Registry
  • Schema registration and retrieval with caching
  • Compatibility checking for schema evolution
  • Confluent wire format encoding/decoding
  • Serializers for Avro, Protobuf, and JSON Schema
  • Generic wrapper for custom serializers

Direct Usage \(Without FX\)

For simple applications or tests, create a client directly:

import "github.com/Aleph-Alpha/std/v1/schema_registry"

// Create schema registry client (returns concrete *Client)
client, err := schema_registry.NewClient(schema_registry.Config{
    URL:      "http://localhost:8081",
    Username: "user",     // Optional
    Password: "password", // Optional
    Timeout:  10 * time.Second,
})
if err != nil {
    log.Fatal(err)
}

// Register a schema
avroSchema := `{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
}`

schemaID, err := client.RegisterSchema("users-value", avroSchema, "AVRO")

FX Module Integration

For production applications using Uber's fx, use the FXModule which provides both the concrete type and interface:

import (
    "go.uber.org/fx"
    "github.com/Aleph-Alpha/std/v1/schema_registry"
)

app := fx.New(
    schema_registry.FXModule, // Provides *Client and Registry interface
    fx.Provide(
        func() schema_registry.Config {
            return schema_registry.Config{
                URL:      os.Getenv("SCHEMA_REGISTRY_URL"),
                Username: os.Getenv("SCHEMA_REGISTRY_USER"),
                Password: os.Getenv("SCHEMA_REGISTRY_PASSWORD"),
                Timeout:  30 * time.Second,
            }
        },
    ),
    fx.Invoke(func(client *schema_registry.Client) {
        // Use concrete type directly
        schemaID, _ := client.RegisterSchema("subject", schema, "AVRO")
    }),
)

Observability \(Observer Hook\)

Schema Registry supports optional observability through the Observer interface from the observability package. This allows external systems to track schema operations without coupling the package to specific metrics/tracing implementations.

Using WithObserver (non-FX usage):

client, err := schema_registry.NewClient(config)
if err != nil {
    return err
}
client = client.WithObserver(myObserver).WithLogger(myLogger)

Using FX (automatic injection):

app := fx.New(
    schema_registry.FXModule,
    logger.FXModule,  // Optional: provides logger
    fx.Provide(
        func() schema_registry.Config { return loadConfig() },
        func() observability.Observer { return myObserver },  // Optional
    ),
)

The observer receives events for all schema operations:

  • Component: "schema_registry"
  • Operations: "get_schema_by_id", "get_latest_schema", "register_schema", "check_compatibility"
  • Resource: subject name (or "registry" for ID lookups)
  • SubResource: schema ID or version
  • Duration: operation duration
  • Error: any error that occurred
  • Metadata: operation-specific details (e.g., cache_hit, schema_type, schema_id, is_compatible)

Type Aliases in Consumer Code

To simplify your code and make it registry-agnostic, use type aliases:

package myapp

import stdRegistry "github.com/Aleph-Alpha/std/v1/schema_registry"

// Use type alias to reference std's interface
type SchemaRegistry = stdRegistry.Registry

// Now use SchemaRegistry throughout your codebase
func MyFunction(registry SchemaRegistry) {
    registry.GetSchemaByID(schemaID)
}

This eliminates the need for adapters and allows you to switch implementations by only changing the alias definition.

Schema Operations

import "github.com/linkedin/goavro/v2"

// Create Avro codec
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}

// Create Avro serializer
serializer, err := schema_registry.NewAvroSerializer(
    schema_registry.AvroSerializerConfig{
        Registry: client,
        Subject:  "users-value",
        Schema:   avroSchema,
        MarshalFunc: func(data interface{}) ([]byte, error) {
            return codec.BinaryFromNative(nil, data)
        },
    },
)

// Serialize data
user := map[string]interface{}{
    "name": "John Doe",
    "age":  30,
}
encoded, err := serializer.Serialize(user)
// encoded contains: [magic_byte][schema_id][avro_payload]

// Create Avro deserializer
deserializer, err := schema_registry.NewAvroDeserializer(
    schema_registry.AvroDeserializerConfig{
        Registry: client,
        UnmarshalFunc: func(data []byte, target interface{}) error {
            native, _, err := codec.NativeFromBinary(data)
            if err != nil {
                return err
            }
            // Handle conversion to target type
            return nil
        },
    },
)

// Deserialize data
var result map[string]interface{}
err = deserializer.Deserialize(encoded, &result)

Using with Protobuf

import "google.golang.org/protobuf/proto"

// Create Protobuf serializer
serializer, err := schema_registry.NewProtobufSerializer(
    schema_registry.ProtobufSerializerConfig{
        Registry:    client,
        Subject:     "users-value",
        Schema:      protoSchema, // .proto file content as string
        MarshalFunc: proto.Marshal,
    },
)

// Serialize protobuf message
protoMsg := &pb.User{Name: "Jane", Age: 25}
encoded, err := serializer.Serialize(protoMsg)

// Create Protobuf deserializer
deserializer, err := schema_registry.NewProtobufDeserializer(
    schema_registry.ProtobufDeserializerConfig{
        Registry:      client,
        UnmarshalFunc: proto.Unmarshal,
    },
)

// Deserialize
var user pb.User
err = deserializer.Deserialize(encoded, &user)

Using with JSON Schema

// Create JSON serializer
serializer, err := schema_registry.NewJSONSerializer(
    schema_registry.JSONSerializerConfig{
        Registry: client,
        Subject:  "users-value",
        Schema: `{
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "age": {"type": "integer"}
            }
        }`,
    },
)

// Serialize JSON
user := struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}{Name: "Alice", Age: 28}
encoded, err := serializer.Serialize(user)

// Deserialize
deserializer, err := schema_registry.NewJSONDeserializer(
    schema_registry.JSONDeserializerConfig{
        Registry: client,
    },
)
var result struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}
err = deserializer.Deserialize(encoded, &result)

Wire Format

All serializers produce messages in Confluent wire format:

[magic_byte (1 byte)] [schema_id (4 bytes, big-endian)] [payload]

The magic byte is always 0x0, followed by the schema ID, then the serialized payload. This format is compatible with all Confluent tools.

Schema Caching

The client automatically caches schemas by ID and subject to minimize network calls to the Schema Registry. Caches are thread-safe and maintained in-memory for the lifetime of the client.

For more information, see the SCHEMA_REGISTRY.md documentation file.

Index

Variables

FXModule is an fx.Module that provides and configures the Schema Registry client. This module registers the Schema Registry client with the Fx dependency injection framework, making it available to other components in the application.

The module provides: 1. *Client (concrete type) for direct use 2. Registry interface for dependency injection 3. Lifecycle management for proper initialization

Usage:

app := fx.New(
    schema_registry.FXModule,
    fx.Provide(
        func() schema_registry.Config {
            return schema_registry.Config{
                URL:      "http://localhost:8081",
                Username: "user",
                Password: "pass",
            }
        },
    ),
)
var FXModule = fx.Module("schema_registry",
    fx.Provide(
        NewClientWithDI,

        fx.Annotate(
            func(c *Client) Registry { return c },
            fx.As(new(Registry)),
        ),
    ),
    fx.Invoke(RegisterSchemaRegistryLifecycle),
)

func DecodeSchemaID(data []byte) (int, []byte, error)

DecodeSchemaID decodes a schema ID from the Confluent wire format Returns the schema ID and the remaining payload (after the 5-byte header)

func EncodeSchemaID(schemaID int) []byte

EncodeSchemaID encodes a schema ID in the Confluent wire format Format: [magic_byte][schema_id] - magic_byte: 0x0 (1 byte) - schema_id: 4 bytes (big-endian)

func RegisterSchemaRegistryLifecycle(params SchemaRegistryLifecycleParams)

RegisterSchemaRegistryLifecycle registers the Schema Registry client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the Schema Registry client.

Parameters:

  • params: The lifecycle parameters containing the Schema Registry client

The function:

  1. On application start: Logs that the registry client is ready
  2. On application stop: Currently no cleanup needed (HTTP client is stateless)

This ensures that the Schema Registry client remains available throughout the application's lifetime and any future cleanup logic can be added here.

AvroDeserializer is a convenience wrapper for Avro with schema registry

type AvroDeserializer struct {
    *WrapperDeserializer
}

func NewAvroDeserializer(config AvroDeserializerConfig) (*AvroDeserializer, error)

NewAvroDeserializer creates an Avro deserializer with schema registry support

AvroDeserializerConfig holds configuration for Avro deserializer

type AvroDeserializerConfig struct {
    Registry      Registry
    UnmarshalFunc func([]byte, interface{}) error // Avro decoding function
}

AvroSerializer is a convenience wrapper for Avro with schema registry

type AvroSerializer struct {
    *WrapperSerializer
}

func NewAvroSerializer(config AvroSerializerConfig) (*AvroSerializer, error)

NewAvroSerializer creates an Avro serializer with schema registry support

AvroSerializerConfig holds configuration for Avro serializer

type AvroSerializerConfig struct {
    Registry    Registry
    Subject     string
    Schema      string
    MarshalFunc func(interface{}) ([]byte, error) // Avro encoding function
}

type Client

Client is the default implementation of Registry that communicates with Confluent Schema Registry over HTTP.

type Client struct {
    // contains filtered or unexported fields
}

func NewClient(config Config) (*Client, error)

NewClient creates a new schema registry client Returns the concrete *Client type.

func NewClientWithDI(params SchemaRegistryParams) (*Client, error)

NewClientWithDI creates a new Schema Registry client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the SchemaRegistryParams struct.

Returns the concrete *Client type.

Parameters:

  • params: A SchemaRegistryParams struct that contains the Config instance required to initialize the Schema Registry client. This struct embeds fx.In to enable automatic injection of these dependencies.

Returns:

  • *Client: A fully initialized Schema Registry client ready for use.

Example usage with fx:

app := fx.New(
    schema_registry.FXModule,
    fx.Provide(
        func() schema_registry.Config {
            return schema_registry.Config{
                URL:      os.Getenv("SCHEMA_REGISTRY_URL"),
                Username: os.Getenv("SCHEMA_REGISTRY_USER"),
                Password: os.Getenv("SCHEMA_REGISTRY_PASSWORD"),
                Timeout:  30 * time.Second,
            }
        },
    ),
)

func (*Client) CheckCompatibility

func (c *Client) CheckCompatibility(subject, schema, schemaType string) (bool, error)

CheckCompatibility checks if a schema is compatible with the existing schema for a subject

func (*Client) GetLatestSchema

func (c *Client) GetLatestSchema(subject string) (*Metadata, error)

GetLatestSchema retrieves the latest version of a schema for a subject

func (*Client) GetSchemaByID

func (c *Client) GetSchemaByID(id int) (string, error)

GetSchemaByID retrieves a schema from the registry by its ID

func (*Client) RegisterSchema

func (c *Client) RegisterSchema(subject, schema, schemaType string) (int, error)

RegisterSchema registers a new schema with the schema registry

func (*Client) WithLogger

func (c *Client) WithLogger(logger Logger) *Client

WithLogger sets the logger for this client and returns the client for method chaining. The logger is used for structured logging of client operations and errors.

Example:

client := client.WithObserver(myObserver).WithLogger(myLogger)

func (*Client) WithObserver

func (c *Client) WithObserver(observer observability.Observer) *Client

WithObserver sets the observer for this client and returns the client for method chaining. The observer receives events about schema registry operations (e.g., register, get, check compatibility).

Example:

client := client.WithObserver(myObserver).WithLogger(myLogger)

type Config

Config holds configuration for schema registry client

type Config struct {
    // URL is the schema registry endpoint (e.g., "http://localhost:8081")
    URL string

    // Username for basic auth (optional)
    Username string

    // Password for basic auth (optional)
    Password string

    // Timeout for HTTP requests
    Timeout time.Duration
}

Deserializer is the interface for decoding data

type Deserializer interface {
    Deserialize(data []byte, target interface{}) error
}

JSONDeserializer is a convenience wrapper for JSON Schema with schema registry

type JSONDeserializer struct {
    *WrapperDeserializer
}

func NewJSONDeserializer(config JSONDeserializerConfig) (*JSONDeserializer, error)

NewJSONDeserializer creates a JSON deserializer with schema registry support

JSONDeserializerConfig holds configuration for JSON deserializer

type JSONDeserializerConfig struct {
    Registry Registry
}

JSONSerializer is a convenience wrapper for JSON Schema with schema registry

type JSONSerializer struct {
    *WrapperSerializer
}

func NewJSONSerializer(config JSONSerializerConfig) (*JSONSerializer, error)

NewJSONSerializer creates a JSON serializer with schema registry support

JSONSerializerConfig holds configuration for JSON serializer

type JSONSerializerConfig struct {
    Registry Registry
    Subject  string
    Schema   string
}

type Logger

Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.

type Logger interface {
    // InfoWithContext logs an informational message with trace context.
    InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

    // WarnWithContext logs a warning message with trace context.
    WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

    // ErrorWithContext logs an error message with trace context.
    ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}

Metadata contains metadata about a registered schema

type Metadata struct {
    ID      int    `json:"id"`
    Version int    `json:"version"`
    Schema  string `json:"schema"`
    Subject string `json:"subject"`
    Type    string `json:"schemaType,omitempty"`
}

ProtobufDeserializer is a convenience wrapper for Protobuf with schema registry

type ProtobufDeserializer struct {
    *WrapperDeserializer
}

func NewProtobufDeserializer(config ProtobufDeserializerConfig) (*ProtobufDeserializer, error)

NewProtobufDeserializer creates a Protobuf deserializer with schema registry support

ProtobufDeserializerConfig holds configuration for Protobuf deserializer

type ProtobufDeserializerConfig struct {
    Registry      Registry
    UnmarshalFunc func([]byte, interface{}) error // Protobuf decoding function
}

ProtobufSerializer is a convenience wrapper for Protobuf with schema registry

type ProtobufSerializer struct {
    *WrapperSerializer
}

func NewProtobufSerializer(config ProtobufSerializerConfig) (*ProtobufSerializer, error)

NewProtobufSerializer creates a Protobuf serializer with schema registry support

ProtobufSerializerConfig holds configuration for Protobuf serializer

type ProtobufSerializerConfig struct {
    Registry    Registry
    Subject     string
    Schema      string
    MarshalFunc func(interface{}) ([]byte, error) // Protobuf encoding function
}

Registry provides an interface for interacting with a Confluent Schema Registry. It handles schema registration, retrieval, and caching for efficient serialization.

type Registry interface {
    // GetSchemaByID retrieves a schema by its ID
    GetSchemaByID(id int) (string, error)

    // GetLatestSchema retrieves the latest version of a schema for a subject
    GetLatestSchema(subject string) (*Metadata, error)

    // RegisterSchema registers a new schema for a subject
    RegisterSchema(subject, schema, schemaType string) (int, error)

    // CheckCompatibility checks if a schema is compatible with the latest version
    CheckCompatibility(subject, schema, schemaType string) (bool, error)
}

SchemaRegistryLifecycleParams groups the dependencies needed for Schema Registry lifecycle management

type SchemaRegistryLifecycleParams struct {
    fx.In

    Lifecycle fx.Lifecycle
    Client    *Client
}

SchemaRegistryParams groups the dependencies needed to create a Schema Registry client

type SchemaRegistryParams struct {
    fx.In

    Config   Config
    Logger   Logger                 `optional:"true"`
    Observer observability.Observer `optional:"true"`
}

Serializer is the interface for encoding data

type Serializer interface {
    Serialize(data interface{}) ([]byte, error)
}

WrapperDeserializer wraps any deserializer with schema registry support. It automatically retrieves schemas and strips the Confluent wire format header.

type WrapperDeserializer struct {
    // contains filtered or unexported fields
}

func NewWrapperDeserializer(config WrapperDeserializerConfig) (*WrapperDeserializer, error)

NewWrapperDeserializer creates a new schema registry-aware deserializer

func (*WrapperDeserializer) Deserialize

func (d *WrapperDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize strips the schema registry header and decodes data

WrapperDeserializerConfig holds configuration for schema registry deserializer

type WrapperDeserializerConfig struct {
    Registry          Registry
    InnerDeserializer Deserializer // Optional: for decoding data after schema registry unwrapping
}

WrapperSerializer wraps any serializer with schema registry support. It automatically registers schemas and prepends the Confluent wire format header.

type WrapperSerializer struct {
    // contains filtered or unexported fields
}

func NewWrapperSerializer(config WrapperSerializerConfig) (*WrapperSerializer, error)

NewWrapperSerializer creates a new schema registry-aware serializer

func (*WrapperSerializer) Serialize

func (s *WrapperSerializer) Serialize(data interface{}) ([]byte, error)

Serialize encodes data and prepends the schema registry header

WrapperSerializerConfig holds configuration for schema registry serializer

type WrapperSerializerConfig struct {
    Registry        Registry
    Subject         string
    SchemaType      string // "AVRO", "PROTOBUF", "JSON"
    Schema          string
    InnerSerializer Serializer // Optional: for encoding data before schema registry wrapping
}

Generated by gomarkdoc