DEV_희쨔응
KSQL & Schema Registry 통한 KSETL 본문
개요
Kafka 를 통한 다양한 작업들이 늘어남에 따라 Topic에 적재되는 실시간성 Data를 조건에 맞게 변환 및 가공 하기 위한 방안을 생각 하
던 중 과거 KSQL 과 Schema Registry 통해 해당 형식의 작업을 처리 했었던 경험을 바탕으로 현재 사용중인 BETA Kafka 에서 해당 환
경을 구축 후 간단한 테스트를 진행 하였습니다.
KSETL (Kafka Stream ETL) ??
KSETL은 Kafka 스트림 ETL(extract, transform, load)의 약자로 Kafka의 데이터를 추출하고 변환해서 적재하는 시스템이라는 의미입니다.
LINE을 비롯한 여러 회사에서 채택되어 사용되고 있으며 다양한 레퍼런스를 가지고 있다는 장점이 있습니다. ( KSETL로 Kafka 스
트림 ETL 시스템을 빠르게 구성하기 )
1. KSQL 이란?
KSQL은 Apache Kafka 플랫폼에 있는 실 시간 스 트 리 밍 데 이 터 를 처리하 고 분석 할 수 있 는 SQL 엔 진 입니다.
KSQL을 사용하여 쿼리를 실행하는 것은 관계형 데이터베이스에서 SQL 쿼리를 실행하는 것과 유사합니다. SELECT, LIMIT, JOIN 및
WHERE과 같은 KSQL의 기본 명령은 SQL에 있는 명령과 동일합니다. Kafka는 지속적인 모니터링, 실시간 스트리밍 분석, 온라인 데이터
통합, 이상 탐지, 데이터 탐색 및 임의 필터링을 포함하여 스트리밍 데이터를 처리하거나 분석하는 다양한 애플리케이션에서 사용됩니다.
KSQl 사용 예시
ksql 스트림 생성
2 --stream 테이블 생성
3 CREATE stream heejae(
4 id int
5 ,cik VARCHAR
6 ,ric VARCHAR
7 ) WITH (
8 KAFKA_TOPIC = 'heejae_topic', --stream 테이블을 생성할 대상 Topic
9 VALUE_FORMAT = 'avro' --schema registry에 등록할 데이터 형식 (json,avro)
10 );
11
12 -- ksql 스트림 테이블간 실시간성 데이터 join STREAM 생성
13 CREATE STREAM heejae
14 WITH (KAFKA_TOPIC= 'heejae_test_join1',VALUE_FORMAT='AVRO',KEY_FORMAT='AVRO') AS
15 SELECT
16 l.seq AS "seq",
17 l.completion_code AS "completion_code",
18 u.agent_code AS "agent_code",
19 u.session_id AS "session_id"
20 FROM heejae_test_join1 l
21 LEFT JOIN heejae_test_join2 u WITHIN 2 hours ON l.id = u.id
22 PARTITION BY id;
23
24 - **stream 간의 join stream을 생성 할 때 반드시 WITHIN 절을 사용하여 시간 범위를 지정 해야한다.**
25
26 각 stream이 참조 하는 topic의 오프셋 시간 범위를 정하여 해당 범위의 오프셋 데이터만 join 합니다.
27
28 EX)WITHIN 2 hours ON (2시간 내에 들어온 데이터 끼리만 Join)
Ui for apache kafka ksql 쿼리 실행 화면
2. Schema Registry 란?
Schema Registry는 Kafka 를 위한 스키마 관리 도구입니다. 스 키 마 는 데 이 터 의 구 조 와 형식을 정 의 하는 것으로, 데이터의 유효성 검사, 직렬화, 역직렬화에 중요한 역할을 합니다.
Kafka의 메시지는 일반적으로 직렬화된 형태로 전송됩니다. 직렬화된 데이터는 바이트 스트림 형태로 저장되며, 컨슈머가 데이터를 읽을 때
는 이진 형식을 다시 원래의 데이터 형식으로 변환해야 합니다. 이때 스키마는 데이터의 형식과 구조를 정의하고, 직렬화 및 역직렬화 과정
에서 필요한 정보를 제공합니다.
Schema Registry는 Kafka 클러 스 터 의 메타 데 이 터 저 장 소 로 사 용 되며, 다음과 같은 기능을 제공합니다.
스키마 등록
스키마 버전 관리
스키마 유효성 검사
결론적으로 Schema Registry를 사용하면 Kafka에서 생성된 데이터의 일관성과 호환성을 유지할 수 있습니다. 서로 다른 언어나 플랫폼에
서도 동일한 스키마를 공유하고 사용함으로써 데이터 통합을 용이하게 할 수 있습니다. 또한 Schema Registry는 데이터의 직렬화와 역직렬
화 과정에서 오류를 방지하고 데이터의 일관성을 보장하는 데 도움을 줍니다.
직렬화, 역직렬화란?
**직렬화(serialization)**는 데이터를 바이트 스트림 또는 이진 형태로 변환하는 과정을 말합니다. 직렬화는 데이터를 저장하거나 네트워
크를 통해 전송할 때 사용됩니다. 직렬화된 데이터는 원래의 데이터 형식과는 다른 형태로 표현되며, 주로 바이트 배열로 표현됩니다.
**역직렬화(deserialization)**는 직렬화된 데이터를 원래의 데이터 형식으로 변환하는 과정을 말합니다. 역직렬화는 직렬화된 데이터를
읽거나 전송받은 후에 원래의 데이터 형식으로 다시 변환하는 작업을 의미합니다.
Kafka에서는 메시지를 직렬화하여 전송하고, 컨슈머는 직렬화된 메시지를 역직렬화하여 원래의 데이터 형식으로 읽습니다. 직렬화와 역
직렬화는 데이터의 유지 및 전송을 위해 필요한 과정이며, 스키마는 이러한 직렬화 및 역직렬화 과정에서 필요한 정보를 제공합니다.
스키마는 데이터의 구조와 형식을 정의하는데, 직렬화 과정에서는 데이터를 스키마에 맞게 변환하여 직렬화된 형태로 저장하고, 역직렬
화 과정에서는 직렬화된 데이터를 스키마를 기반으로 원래의 데이터 형식으로 변환합니다. 스키마는 데이터의 필드, 유형, 구조 등을 정
의하므로, 직렬화 및 역직렬화 과정에서 데이터의 일관성과 호환성을 보장하는 역할을 합니다.
Ui for apache kafka SchemaRegistry 조회 화면
구축 메뉴얼 (Docker compose)
version: '3.8'
services:
kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: kafka-schema-registry
container_name: kafka-schema-registry
extra_hosts:
- kafka-01:브로커ip
restart: always
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-01:9092
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
ports:
- 8081:8081
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.8.0
hostname: ksqldb-server
container_name: ksqldb-server
extra_hosts:
- kafka-01:브로커ip
restart: always
environment:
KSQL_BOOTSTRAP_SERVERS: kafka-01:9092
KSQL_LISTENERS: http://0.0.0.0:8085
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: true
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: true
KSQL_KSQL_EXTENSION_DIR: /opt/ksqldb-udfs
depends_on:
- kafka-schema-registry
ports:
- 8085:8085
volumes:
- ./logs:/usr/logs
- ./extensions/:/opt/ksqldb-udfs
networks:
default:
external:
name: aicc-net
Ui for apache kafka 등록 (application.yml )
kafka:
clusters:
- name: kafka-beta
bootstrapServers: ip:9092
#readonly: true
#kafkaConnect:
#- name: kafka-connect-01
# address: http://ip:8083
#스키마 레지스트리 등록
schemaRegistry: http://ip:8081
#ksql 등록
ksqldbServer: http://ip:8085
spring:
jmx:
enabled: true
auth:
type: "LOGIN_FORM"
security:
user:
name: admin
password: admin
server:
port: 8080 #- Port in which kafka-ui will run.