Kafka JDBC Connector 실습 / Orange Pi 5 Max Cluster 환경
Kafka JDBC Connector를 활용해서 PostgreSQL의 Table 복제를 수행한다.
1. 실습 환경 구성
1.1. 전체 실습 환경
![[Figure 1] Kafka Connect JDBC Connector 실습 환경](/blog-software/docs/record/kafka-jdbc-connector-orangepi5-cluster/images/environment.png)
[Figure 1] Kafka Connect JDBC Connector 실습 환경
Spark를 통해서 MinIO에 저장되어 있는 데이터를 변환하는 환경은 [Figure 1]과 같다.
PostgreSQL : Data 저장소 역할을 수행한다.
- kafka_connect_src Database, Users Table : Data를 가져오기 위한 Source Table.
- kafka_connect_dst Database, Users Table : 가져온 Data를 저장하는 Destination Table.
Kafka Connect : Kafka와 PostgreSQL 사이에서 Data를 주고받는 역할을 수행한다.
- postgresql-src-connector Source JDBC Connector : Source Table의 Data를 Kafka로 보내는 JDBC Connector.
- postgresql-dst-connector Destination JDBC Connector : Kafka에서 가져온 Data를 Destination Table에 저장하는 JDBC Connector.
Kafka : JDBC Connector를 통해서 Data를 주고받는 역할을 수행한다. 또한 Kafka Connect의 작업 상태를 저장하는 역할도 수행한다.
- postgresql-users Topic : 복제된 Data를 저장하는 Topic.
- connect-cluster-configs : Kafka Connect의 설정 정보를 저장하는 Topic.
- connect-cluster-offsets : Kafka Connect의 오프셋 정보를 저장하는 Topic.
- connect-cluster-status : Kafka Connect의 상태 정보를 저장하는 Topic.
Strimzi Kafka Operator : Kafka와 Kafka Connect를 관리하는 Operator.
전체 실습 환경 구성은 다음의 링크를 참조한다.
- Orange Pi 5 Max 기반 Kubernetes Cluster 구축 : https://ssup2.github.io/blog-software/docs/record/orangepi5-cluster-build/
- Orange Pi 5 Max 기반 Kubernetes Data Platform 구축 : https://ssup2.github.io/blog-software/docs/record/kubernetes-data-platform-orangepi5-cluster/
1.2. Kafka Connect JDBC Connector 이미지 생성
| |
docker build -t ghcr.io/ssup2-playground/k8s-data-platform_kafka-connect-jdbc-connector:0.48.0-kafka-4.1.0 .
docker push ghcr.io/ssup2-playground/k8s-data-platform_kafka-connect-jdbc-connector:0.48.0-kafka-4.1.0[File 1]의 Dockerfile을 활용하여 Kafka JDBC Connector가 포함된 Container Image를 Build 및 Push한다.
1.3. PostgreSQL 구성
# Create Source Database
kubectl exec -it postgresql-0 -n postgresql -- psql -U postgres -c "CREATE DATABASE kafka_connect_src;"
# Create Destination Database
kubectl exec -it postgresql-0 -n postgresql -- psql -U postgres -c "CREATE DATABASE kafka_connect_dst;"
# Create users Table in Source Database
kubectl exec -it postgresql-0 -n postgresql -- psql -U postgres -d kafka_connect_src -c "
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) NOT NULL,
age INTEGER
);"PostgreSQL에서 kafka_connect_src Source Database 및 kafka_connect_dst Destination Database를 생성하고, kafka_connect_src Source Database에 users Table을 생성한다.
2. Kafka, Kafka Connect 구성
2.1. Kafka 구성
| |
kubectl apply -f kafka.yaml[File 2]의 Kafka Manifest를 적용하여, Strimzi Kafka Operator가 Kafka를 구성하도록 만든다.
2.1. Kafka Connect 구성
| |
kubectl apply -f kafka-connect.yaml[File 3]의 Kafka Connect Manifest를 적용하여, Strimzi Kafka Operator가 Kafka Connect를 구성하도록 만든다. 4.1.0 Version을 명시하고 있고, connect-cluster Group ID를 사용한다. connect-cluster-offsets, connect-cluster-configs, connect-cluster-status Topic을 통해서 Kafka Connect의 작업 상태를 저장한다.
2.3. Kafka Connect JDBC Connector 구성
| |
kubectl apply -f postgresql-src-connector.yaml[File 4]의 Kafka Connector Manifest를 적용하여, Kafka Connect가 kafka_connect_src Source Database의 users Table의 Data를 Kafka로 보내도록 만든다.
| |
kubectl apply -f postgresql-dst-connector.yaml[File 5]의 Kafka Connector Manifest를 적용하여, Kafka Connect가 Kafka의 postgresql-users Topic에서 가져온 Data를 kafka_connect_dst Destination Database의 users Table에 저장하도록 만든다.
2.5. Data 복제 실습
kubectl exec -it postgresql-0 -n postgresql -- psql -U postgres -d kafka_connect_src -c "
INSERT INTO users (name, email, age) VALUES
('John Doe', 'john@ssup2.com', 30),
('Jane Smith', 'jane@ssup2.com', 25),
('Bob Johnson', 'bob@ssup2.com', 35),
('Alice Brown', 'alice@ssup2.com', 28),
('Charlie Wilson', 'charlie@ssup2.com', 32);"kafka_connect_src Source Database의 users Table에 Data를 추가한다.
kubectl exec -it postgresql-0 -n postgresql -- psql -U postgres -d kafka_connect_dst -c "SELECT * FROM users;"id | name | email | age
----+----------------+-------------------+-----
1 | John Doe | john@ssup2.com | 30
2 | Jane Smith | jane@ssup2.com | 25
3 | Bob Johnson | bob@ssup2.com | 35
4 | Alice Brown | alice@ssup2.com | 28
5 | Charlie Wilson | charlie@ssup2.com | 32kafka_connect_dst Destination Database의 users Table에 Data가 저장되었는지 확인한다.
3. 참고
- Strimzi Kafka Operator : https://togomi.tistory.com/66