DEV_희쨔응

Kafkaconnector(MysqlCDC To Impala&Kudu)-Confluent 본문

Kafka

Kafkaconnector(MysqlCDC To Impala&Kudu)-Confluent

희쨔응 2023. 3. 2. 11:15

Debizium Mysql CDC Source 커넥터가 Avro 형식으로 데이터를 추출해 Kafka Topic에 Pub해주고 Confluent Kudu sink 커넥터가 해당 Topic을 Consume 하여 Impala Table에 적재하는 시나리오 입니다.

데모 시나리오 테스트 결과

  • Debizium Mysql CDC Source 커넥터 동작 이상 없음 (Insert / Delete / Update / Upsert)
  • Schema Resister 동작 이상 없음 (Source 테이블 컬럼명 데이터 타입 매핑 동작)
  • Confluent Kudu sink 제한 사항 Topic에 적재된 CDC 데이터 Impala 적용시 Delete, Update, Upsert 지원 하지 않음 오로지 Insert 동작만 지원

Mysql CDC source connector 소스

{
  "name": "source-mysql-kudu-hj0",
  "config": {
    "timestamp.column.name": "user",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.delete.retention.ms": "60000",
    "topic.creation.default.replication.factor": "1",
    "database.allowPublicKeyRetrieval": "true",
    "topic.creation.default.compression.type": "lz4",
    "topic.creation.default.cleanup.policy": "compact",
    "value.converter.schema.registry.url": "http://IP:8081",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "key.converter.schema.registry.url": "http://IP:8081",
    "name": "source-mysql-kudu-hj0",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "unwrap, route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "drop",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$1.$2.$3",
    "database.hostname": "IP",
    "database.port": "3306",
    "database.user": "ishark",
    "database.password": "ishark2020",
    "database.server.name": "mysql111",
    "snapshot.mode": "initial",
    "time.precision.mode": "connect",
    "database.history.kafka.bootstrap.servers": "IP:9092",
    "database.history.kafka.topic": "history.mysql111.kudu_testh03",
    "table.include.list": "testdb.kudu_testh03",
    "include.schema.changes": "true",
    "database.include.list": "testdb"
  }
}

Kudu Sink Connector 소스

{
  "name": "sink-mysql-kudu-hj0",
  "config": {
    "value.converter.schema.registry.url": "http://IP:8081",
    "key.converter.schema.registry.url": "http://IP:8081",
    "name": "sink-mysql-kudu-hj0",
    "connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "route",
    "topics": "mysql110.testdb.kudu_testh02",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "impala.server": "IP",
    "impala.port": "21050",
    "kudu.database": "test",
    "impala.ldap.user": "",
    "impala.ldap.password": "",
    "kudu.tablet.replicas": "1",
    "confluent.topic.bootstrap.servers": "IP:9092",
    "confluent.topic.replication.factor": "1",
    "table.name.format": "${topic}",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "auto.create": "true"
  }
}

'Kafka' 카테고리의 다른 글

API to PostgreSQL 연동 (AirFlow, Kafka)  (0) 2023.06.09
Apache Flume(Kafka To File)  (0) 2023.03.06
Kafkaconnector(MysqlCDC To Impala&Kudu)  (0) 2023.02.22
Kcat(Kafkacat)  (3) 2023.02.20
MariaDB to Elasticsearch  (0) 2023.01.30
Comments