Kafka Schema Registry Practice / Orange Pi 5 Max Cluster Environment
Practice managing schemas using Kafka Schema Registry.
1. Practice Environment Setup
1.1. Overall Practice Environment
![[Figure 1] Kafka Schema Registry Practice Environment](/blog-software/docs/record/kafka-schema-registry-orangepi5-cluster/images/environment.png)
[Figure 1] Kafka Schema Registry Practice Environment
- Kafka Schema Registry : Manages schemas for Kafka messages.
- Kafka : Transmits messages between Producer and Consumer.
- schema Topic : Topic that stores schemas registered in Schema Registry.
- user-events Topic : Topic that stores messages sent by Producer.
- Producer : Retrieves schemas from Kafka Schema Registry, serializes them in Avro format, and sends them to Kafka topics.
- Consumer : Receives messages serialized in Avro format from Kafka topics and deserializes them.
For the overall practice environment setup, refer to the following links:
- Orange Pi 5 Max based Kubernetes Cluster Setup : https://ssup2.github.io/blog-software/docs/record/orangepi5-cluster-build/
- Orange Pi 5 Max based Kubernetes Data Platform Setup : https://ssup2.github.io/blog-software/docs/record/kubernetes-data-platform-orangepi5-cluster/
1.2. Python Package Installation for Producer and Consumer
mkdir -p ~/kafka-schema-registry
cd ~/kafka-schema-registry
uv init
source .venv/bin/activate
uv add "confluent-kafka[avro,registry]" avro-python3Install Python packages required for running Producer and Consumer using uv.
2. Using Kafka Schema Registry
Manage Avro schemas using Kafka Schema Registry and send/receive records with schemas applied through Python Producer and Consumer.
2.1. Schema Registration
SCHEMA_REGISTRY_EXTERNAL_IP=$(kubectl get service -n kafka schema-registry -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
SCHEMA_REGISTRY_PORT=$(kubectl get service -n kafka schema-registry -o jsonpath='{.spec.ports[0].port}')
SCHEMA_REGISTRY_URL="http://${SCHEMA_REGISTRY_EXTERNAL_IP}:${SCHEMA_REGISTRY_PORT}"
echo "Schema Registry URL: ${SCHEMA_REGISTRY_URL}"Schema Registry URL: http://192.168.1.99:8081Set and verify the Kafka Schema endpoint as an environment variable.
SCHEMA='{
"type": "record",
"name": "User",
"namespace": "ssup2.com",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}'
curl -X POST ${SCHEMA_REGISTRY_URL}/subjects/user/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{
\"schema\": $(echo "$SCHEMA" | jq -c tojson)
}"{"id":3,"version":1,"guid":"e06655b8-8d49-9800-f924-ea691503f834","schemaType":"AVRO","schema":"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"ssup2.com\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}"}Register an Avro schema in Schema Registry. Register a User schema containing user information.
2.2. Producer Execution
| |
The Producer written in Python retrieves schemas from Schema Registry, serializes them in Avro format, and sends them to Kafka topics. When a schema is provided, the Producer automatically registers it in Schema Registry and uses the schema ID.
SCHEMA_REGISTRY_EXTERNAL_IP=$(kubectl get service -n kafka schema-registry -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
SCHEMA_REGISTRY_PORT=$(kubectl get service -n kafka schema-registry -o jsonpath='{.spec.ports[0].port}')
export SCHEMA_REGISTRY_URL="http://${SCHEMA_REGISTRY_EXTERNAL_IP}:${SCHEMA_REGISTRY_PORT}"
python producer.pyRetrieved schema from Schema Registry: user
Message delivered: user-events [0]
Message delivered: user-events [0]
Message delivered: user-events [0]
All messages sentExecute the Producer and verify that messages are sent to the Kafka topic.
2.3. Consumer Execution
The Consumer written in Python receives records serialized in Avro format from Kafka topics and deserializes them. The Consumer automatically retrieves schemas from Schema Registry using the schema ID included in the message.
| |
Execute the Consumer.
SCHEMA_REGISTRY_EXTERNAL_IP=$(kubectl get service -n kafka schema-registry -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
SCHEMA_REGISTRY_PORT=$(kubectl get service -n kafka schema-registry -o jsonpath='{.spec.ports[0].port}')
export SCHEMA_REGISTRY_URL="http://${SCHEMA_REGISTRY_EXTERNAL_IP}:${SCHEMA_REGISTRY_PORT}"
python consumer.pyWaiting for messages from topic: user-events
Received message: {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
Received message: {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'}
Received message: {'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'}When executing the Consumer, you can verify that messages sent by the Producer are deserialized in Avro format and output.
3. References
- Kafka Schema Producer : https://suwani.tistory.com/154
- Kafka Schema Consumer : https://suwani.tistory.com/155