This directory contains examples demonstrating various features of the Danube messaging platform using the Python client library.
Purpose: Demonstrates the simplest way to send and receive raw byte messages without schema validation.
Key Features:
- Basic producer/consumer setup
- Raw byte message passing
- Message acknowledgment
- Single producer-consumer pair
Use Case: Quick prototyping, simple message passing, when schema validation is not needed.
python examples/simple_producer_consumer.pyPurpose: Shows how to use topic partitioning for horizontal scaling and parallel processing.
Key Features:
- Creating partitioned topics
- Automatic message distribution across partitions
- Parallel message consumption
Use Case: High-throughput scenarios where messages can be processed independently in parallel.
# Terminal 1 - Start consumer
python examples/partitions_consumer.py
# Terminal 2 - Start producer
python examples/partitions_producer.pyPurpose: Demonstrates reliable message delivery with acknowledgments and retry mechanisms.
Key Features:
- Reliable dispatch mode for guaranteed delivery
- Automatic retry on failures
- Message acknowledgment tracking
Use Case: Critical messages that must be delivered (e.g., payment notifications, order processing).
# Terminal 1 - Start consumer
python examples/reliable_dispatch_consumer.py
# Terminal 2 - Start producer
python examples/reliable_dispatch_producer.pyPurpose: Shows how to use JSON Schema for message validation with typed data structures.
Key Features:
- JSON Schema registration in Schema Registry
- Automatic serialization/deserialization
- Type-safe message passing
- Schema validation on both producer and consumer sides
Use Case: Applications using JSON for structured data with schema evolution needs.
# Terminal 1 - Start consumer
python examples/json_consumer.py
# Terminal 2 - Start producer
python examples/json_producer.pySchema Type: json_schema
Purpose: Demonstrates consumer-side schema validation against the Schema Registry at startup.
Key Features:
- Fetches schema from registry before consuming
- Validates a sample dict against JSON Schema definition
- Fails at startup if sample doesn't match schema
- Prevents runtime deserialization errors
- Schema version tracking and logging
Use Case: Production consumers that need to ensure their data definitions match the producer's schema, preventing silent data loss or deserialization failures.
# Run the producer first to register schema
python examples/json_producer.py
# Then run the validated consumer
python examples/json_consumer_validated.pyWhat it validates:
- Field names match schema properties
- Field types are compatible (string, integer, etc.)
- Required fields are present
Dependencies: Requires jsonschema package:
pip install jsonschemaPurpose: Demonstrates Apache Avro schema usage for structured event serialization.
Key Features:
- Avro schema registration
- Structured event serialization
- Schema evolution support
- Strongly-typed data structures
Use Case: High-performance applications requiring efficient serialization and schema evolution.
# Terminal 1 - Start consumer
python examples/avro_consumer.py
# Terminal 2 - Start producer
python examples/avro_producer.pySchema Type: avro
Example Schema:
{
"type": "record",
"name": "UserEvent",
"namespace": "com.example.events",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "action", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "metadata", "type": ["null", "string"], "default": null}
]
}Purpose: Comprehensive demonstration of schema evolution and compatibility checking.
Key Features:
- Schema version management
- Compatibility checking (backward/forward/full)
- Safe schema evolution
- Listing all schema versions
- Retrieving latest schema
Use Case: Understanding how to evolve data schemas safely over time without breaking existing consumers.
python examples/schema_evolution.pyDemonstrates:
- Compatible change: Adding optional fields with defaults
- Incompatible change: Adding required fields without defaults
- Schema version history tracking
- Compatibility mode enforcement
Partitions allow horizontal scaling by distributing messages across multiple partitions:
producer = (
client.new_producer()
.with_topic("/default/my_topic")
.with_name("my_producer")
.with_partitions(3) # Create 3 partitions
.build()
)Ensures message delivery with automatic retries:
from danube import DispatchStrategy
producer = (
client.new_producer()
.with_topic("/default/my_topic")
.with_name("my_producer")
.with_dispatch_strategy(DispatchStrategy.RELIABLE)
.build()
)Register schemas and enable validation:
from danube import SchemaType, CompatibilityMode
# 1. Get schema client from DanubeClient
schema_client = client.schema()
# 2. Register schema
schema_id = await (
schema_client.register_schema("my-subject")
.with_type(SchemaType.AVRO)
.with_schema_data(schema_bytes)
.execute()
)
# 3. Check compatibility before evolution
compat_result = await schema_client.check_compatibility(
"my-subject",
new_schema_bytes,
SchemaType.AVRO,
None, # Use subject's default mode
)
# 4. Set compatibility mode for a subject
await schema_client.set_compatibility_mode("critical-subject", CompatibilityMode.FULL)
# 5. Create producer with schema subject
producer = (
client.new_producer()
.with_topic("/default/my_topic")
.with_name("my_producer")
.with_schema_subject("my-subject")
.build()
)| Type | Description | Use Case |
|---|---|---|
SchemaType.BYTES |
Raw binary data (no validation) | Simple messaging, custom formats |
SchemaType.STRING |
UTF-8 text data | Plain text messages |
SchemaType.NUMBER |
Numeric data (int, float, double) | Simple numeric values |
SchemaType.JSON_SCHEMA |
JSON with schema validation | Structured JSON data |
SchemaType.AVRO |
Apache Avro binary format | High-performance, schema evolution |
SchemaType.PROTOBUF |
Protocol Buffers | Cross-language compatibility |
Schema evolution is controlled by compatibility modes. Set these per-subject to define evolution rules:
| Mode | Description | Allows | Use Case |
|---|---|---|---|
CompatibilityMode.BACKWARD |
New schema reads old data | Add optional fields, remove fields | Default. Consumers upgrade before producers |
CompatibilityMode.FORWARD |
Old schema reads new data | Add required fields, remove optional fields | Producers upgrade before consumers |
CompatibilityMode.FULL |
Both directions | Only safe changes (add optional) | Strictest. Critical schemas |
CompatibilityMode.NONE |
No validation | Any change | Development/testing only |
-
Start the Danube broker (from the
docker/directory):cd docker/ docker compose up -d -
Install the danube-client package:
pip install -e . -
For the validated consumer example, install
jsonschema:pip install jsonschema
# Basic examples
python examples/simple_producer_consumer.py
python examples/partitions_producer.py
python examples/partitions_consumer.py
# Schema registry examples
python examples/json_producer.py
python examples/json_consumer.py
python examples/json_consumer_validated.py # Requires jsonschema
python examples/avro_producer.py
python examples/avro_consumer.py
python examples/schema_evolution.py
# Reliable dispatch
python examples/reliable_dispatch_producer.py
python examples/reliable_dispatch_consumer.py- Start simple:
simple_producer_consumer.py— understand basic messaging - Add schemas:
json_producer.py+json_consumer.py— learn schema registration - Validate schemas:
json_consumer_validated.py— production-ready validation - Schema evolution:
schema_evolution.py— understand compatibility rules - High performance:
avro_producer.py+avro_consumer.py— Avro serialization - Scale up:
partitions_producer.py+partitions_consumer.py— horizontal scaling - Reliability:
reliable_dispatch_*.pyexamples — guaranteed delivery