DEV_희쨔응
Kafkaconnector(MysqlCDC To Impala&Kudu) 본문
현재 테스트 서버에 아래와 같이 Kafka 데모 시나리오 개발/테스트 완료 하였습니다.
먼저 Debizium Mysql CDC Source 커넥터가 Avro 형식으로 데이터를 추출해 Kafka Topic에 Pub해주고 Lenses Kudu sink 커넥터가 해당 Topic을 Consume 하여 Impala Table에 적재하는 시나리오 입니다.
데모 시나리오 테스트 결과
- Debizium Mysql CDC Source 커넥터 동작 이상 없음 (Insert / Delete / Update / Upsert)
- Schema Resister 동작 이상 없음 (Source 테이블 컬럼명 데이터 타입 매핑 동작)
- Lenses Kudu sink 커넥터 제한 사항 Confluent Schema Resister 에 등록된 스키마 형식을 인식 하지 못함 수동으로 스키마 변경 해야함
- Lenses Kudu sink 커넥터 제한 사항 Topic에 적재된 CDC 데이터 Impala 적용시 Delete 지원 하지 않음 (Insert / Update / Upsert 지원)
- Lenses Kudu sink 플러그인 사용 시 날짜가 1970년도(유닉스 최초 설정 시간) 날짜로 찍히는 오류
Mysql CDC source connector 소스
{
"name": "source-mysql-kudu-g",
"config": {
"topic.creation.default.partitions": "1",
"topic.creation.default.delete.retention.ms": "60000",
"topic.creation.default.replication.factor": "1",
"database.allowPublicKeyRetrieval": "true",
"topic.creation.default.compression.type": "lz4",
"topic.creation.default.cleanup.policy": "compact",
"value.converter.schema.registry.url": "http://IP:8081",
"key.converter.schema.registry.url": "http://IP:8081",
"name": "source-mysql-kudu-g",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap, route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1.$2.$3",
"database.hostname": "IP",
"database.port": "3306",
"database.user": "ishark",
"database.password": "*******",
"database.server.name": "mysql009",
"snapshot.mode": "initial",
"time.precision.mode": "connect",
"database.history.kafka.bootstrap.servers": "IP:9092",
"database.history.kafka.topic": "history.mysql009.testdb",
"table.include.list": "testdb.kudu_test",
"include.schema.changes": "true",
"database.include.list": "testdb"
}
}
Kudu Sink Connector 소스
{
"name": "kudu-sink-3",
"config": {
"connector.class":"com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
"tasks.max":"1",
"topics":"mysql009.testdb.kudu_test",
"connect.kudu.master":"IP:7051",
"connect.kudu.kcql":"UPSERT INTO impala::default.kudu_test1 SELECT * FROM mysql009.testdb.kudu_test",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://IP:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://IP:8081"
}
}
구동 후 작동 화면
위의 dbeaver에서 mysql 테이블을 update 하였고 아래 임팔라 cli 화면에서 데이터를 조회해본 결과 991 행이 foo로 정상적으로 업데이트 된것을 확인할수있습니다.
'Kafka' 카테고리의 다른 글
Apache Flume(Kafka To File) (0) | 2023.03.06 |
---|---|
Kafkaconnector(MysqlCDC To Impala&Kudu)-Confluent (0) | 2023.03.02 |
Kcat(Kafkacat) (3) | 2023.02.20 |
MariaDB to Elasticsearch (0) | 2023.01.30 |
Apache Beam (1) | 2023.01.26 |
Comments