DEV_희쨔응
Kafkaconnector(MysqlCDC To Impala&Kudu)-Confluent 본문
Debizium Mysql CDC Source 커넥터가 Avro 형식으로 데이터를 추출해 Kafka Topic에 Pub해주고 Confluent Kudu sink 커넥터가 해당 Topic을 Consume 하여 Impala Table에 적재하는 시나리오 입니다.
데모 시나리오 테스트 결과
- Debizium Mysql CDC Source 커넥터 동작 이상 없음 (Insert / Delete / Update / Upsert)
- Schema Resister 동작 이상 없음 (Source 테이블 컬럼명 데이터 타입 매핑 동작)
- Confluent Kudu sink 제한 사항 Topic에 적재된 CDC 데이터 Impala 적용시 Delete, Update, Upsert 지원 하지 않음 오로지 Insert 동작만 지원
Mysql CDC source connector 소스
{
"name": "source-mysql-kudu-hj0",
"config": {
"timestamp.column.name": "user",
"topic.creation.default.partitions": "1",
"topic.creation.default.delete.retention.ms": "60000",
"topic.creation.default.replication.factor": "1",
"database.allowPublicKeyRetrieval": "true",
"topic.creation.default.compression.type": "lz4",
"topic.creation.default.cleanup.policy": "compact",
"value.converter.schema.registry.url": "http://IP:8081",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://IP:8081",
"name": "source-mysql-kudu-hj0",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "unwrap, route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1.$2.$3",
"database.hostname": "IP",
"database.port": "3306",
"database.user": "ishark",
"database.password": "ishark2020",
"database.server.name": "mysql111",
"snapshot.mode": "initial",
"time.precision.mode": "connect",
"database.history.kafka.bootstrap.servers": "IP:9092",
"database.history.kafka.topic": "history.mysql111.kudu_testh03",
"table.include.list": "testdb.kudu_testh03",
"include.schema.changes": "true",
"database.include.list": "testdb"
}
}
Kudu Sink Connector 소스
{
"name": "sink-mysql-kudu-hj0",
"config": {
"value.converter.schema.registry.url": "http://IP:8081",
"key.converter.schema.registry.url": "http://IP:8081",
"name": "sink-mysql-kudu-hj0",
"connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "route",
"topics": "mysql110.testdb.kudu_testh02",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"impala.server": "IP",
"impala.port": "21050",
"kudu.database": "test",
"impala.ldap.user": "",
"impala.ldap.password": "",
"kudu.tablet.replicas": "1",
"confluent.topic.bootstrap.servers": "IP:9092",
"confluent.topic.replication.factor": "1",
"table.name.format": "${topic}",
"pk.mode": "record_value",
"pk.fields": "id",
"auto.create": "true"
}
}
'Kafka' 카테고리의 다른 글
API to PostgreSQL 연동 (AirFlow, Kafka) (0) | 2023.06.09 |
---|---|
Apache Flume(Kafka To File) (0) | 2023.03.06 |
Kafkaconnector(MysqlCDC To Impala&Kudu) (0) | 2023.02.22 |
Kcat(Kafkacat) (3) | 2023.02.20 |
MariaDB to Elasticsearch (0) | 2023.01.30 |
Comments