Kafka Schema Registry 실습 / Orange Pi 5 Max Cluster 환경
Kafka Schema Registry를 활용해서 Schema를 관리하는 실습을 수행한다.
1. 실습 환경 구성
1.1. 전체 실습 환경
![[Figure 1] Kafka Schema Registry 실습 환경](/blog-software/docs/record/kafka-schema-registry-orangepi5-cluster/images/environment.png)
[Figure 1] Kafka Schema Registry 실습 환경
- Kafka Schema Registry : Kafka Message를 위한 Schema를 관리하는 역할을 수행한다.
- Kafka : Producer와 Consumer 사이에서 Message를 전송하는 역할을 수행한다.
- schema Topic : Schema Registry에 등록된 Schema를 저장하는 Topic.
- user-events Topic : Producer가 전송한 Message를 저장하는 Topic.
- Producer : Kafka Schema Registry에서 스키마를 가져와 Avro 형식으로 직렬화하여 Kafka Topic에 전송하는 역할을 수행한다.
- Consumer : Kafka Topic에서 Avro 형식으로 직렬화된 Message를 수신하여 역직렬화하는 역할을 수행한다.
전체 실슴 환경 구성은 다음의 링크를 참조한다.
- Orange Pi 5 Max 기반 Kubernetes Cluster 구축 : https://ssup2.github.io/blog-software/docs/record/orangepi5-cluster-build/
- Orange Pi 5 Max 기반 Kubernetes Data Platform 구축 : https://ssup2.github.io/blog-software/docs/record/kubernetes-data-platform-orangepi5-cluster/
1.2. Producer, Consumer 구동을 위한 Python 패키지 설치
mkdir -p ~/kafka-schema-registry
cd ~/kafka-schema-registry
uv init
source .venv/bin/activate
uv add "confluent-kafka[avro,registry]" avro-python3uv를 통해서 Producer, Consumer 실행을 위한 Python 패키지를 설치한다.
2. Kafka Schema Registry 이용
Kafka Schema Registry를 활용하여 Avro 스키마를 관리하고, Python Producer와 Consumer를 통해 스키마가 적용된 Record를 전송 및 수신한다.
2.1. Schema 등록
SCHEMA_REGISTRY_EXTERNAL_IP=$(kubectl get service -n kafka schema-registry -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
SCHEMA_REGISTRY_PORT=$(kubectl get service -n kafka schema-registry -o jsonpath='{.spec.ports[0].port}')
SCHEMA_REGISTRY_URL="http://${SCHEMA_REGISTRY_EXTERNAL_IP}:${SCHEMA_REGISTRY_PORT}"
echo "Schema Registry URL: ${SCHEMA_REGISTRY_URL}"Schema Registry URL: http://192.168.1.99:8081Kafka Schema의 Endpoint를 환경변수로 설정하고, 확인한다.
SCHEMA='{
"type": "record",
"name": "User",
"namespace": "ssup2.com",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}'
curl -X POST ${SCHEMA_REGISTRY_URL}/subjects/user/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{
\"schema\": $(echo "$SCHEMA" | jq -c tojson)
}"{"id":3,"version":1,"guid":"e06655b8-8d49-9800-f924-ea691503f834","schemaType":"AVRO","schema":"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"ssup2.com\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}"}Schema Registry에 Avro 스키마를 등록한다. 사용자 정보를 담는 User 스키마를 등록한다.
2.2. Producer 실행
| |
Python으로 작성된 Producer는 Schema Registry에서 스키마를 가져와 Avro 형식으로 직렬화하여 Kafka Topic에 전송한다. Producer는 스키마를 전달하면 Schema Registry에 자동으로 등록하고 스키마 ID를 사용한다.
SCHEMA_REGISTRY_EXTERNAL_IP=$(kubectl get service -n kafka schema-registry -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
SCHEMA_REGISTRY_PORT=$(kubectl get service -n kafka schema-registry -o jsonpath='{.spec.ports[0].port}')
export SCHEMA_REGISTRY_URL="http://${SCHEMA_REGISTRY_EXTERNAL_IP}:${SCHEMA_REGISTRY_PORT}"
python producer.pyRetrieved schema from Schema Registry: user
Message delivered: user-events [0]
Message delivered: user-events [0]
Message delivered: user-events [0]
All messages sentProducer를 실행하고, Message가 Kafka Topic에 전송되는 것을 확인한다.
2.3. Consumer 실행
Python으로 작성된 Consumer는 Kafka Topic에서 Avro 형식으로 직렬화된 Record를 수신하여 역직렬화한다. Consumer는 메시지에 포함된 스키마 ID를 사용하여 Schema Registry에서 자동으로 스키마를 가져온다.
| |
Consumer를 실행한다.
SCHEMA_REGISTRY_EXTERNAL_IP=$(kubectl get service -n kafka schema-registry -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
SCHEMA_REGISTRY_PORT=$(kubectl get service -n kafka schema-registry -o jsonpath='{.spec.ports[0].port}')
export SCHEMA_REGISTRY_URL="http://${SCHEMA_REGISTRY_EXTERNAL_IP}:${SCHEMA_REGISTRY_PORT}"
python consumer.pyWaiting for messages from topic: user-events
Received message: {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
Received message: {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'}
Received message: {'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'}Consumer 실행 시 Producer에서 전송한 메시지가 Avro 형식으로 역직렬화되어 출력되는 것을 확인할 수 있다.
3. 참조
- Kafka Schema Producer : https://suwani.tistory.com/154
- Kafka Schema Consumer : https://suwani.tistory.com/155