DEV_희쨔응

API to PostgreSQL 연동 (AirFlow, Kafka) 본문

Kafka

API to PostgreSQL 연동 (AirFlow, Kafka)

희쨔응 2023. 6. 9. 14:10

시나리오 구성도

먼저 Python API가 인프라 통계 정보 추출을 위한 인증 Token값을 요청 한 후 발급 받은 Token값을 http requests 에 포함 시켜 전송 후 통계 데이터를 추출 및 Kafka A Topic에 json형식으로 적재 합니다. 해당 작업은 Airflow를 통해 UI/UX 환경에서 관리 되며 배치 성 작업으로 등록 되어 구동 합니다.

이 후 A Topic에 적재된 데이터는 KSQL을 통하여 Stream Table을 생성 한 후 B Topic에 데이터를 적재 하고 해당 B Topic을 Postgre Sink Connector 가 컨슘 하여 Target DB에 해당 데이터를 적재 합니다.

 

Airflow DAG 등록

from datetime import date, timedelta, datetime
from kafka import KafkaProducer
import requests
import json
import time
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import logging

default_args = {
    'start_date': days_ago(1)
}

test = 'http://url/key/webservice/api'
headers = {'Content-Type': 'application/json', 'charset': 'UTF-8', 'Accept': '*/*'}

def get_token(duration_minutes):
    # Token API 호출
    headers = {
        'Content-Type': 'application/json',
        'charset': 'UTF-8',
        'Accept': '*/*'
    }
    response = requests.get(f'{test}/auth/token.do?user=callbot02&durationMinutes={duration_minutes}', headers=headers)
    token = response.json().get('token')

    log_message = f'Token API 호출 결과: {response.status_code}'
    logging.info(log_message)

    return token

def get_data(token, id): 
    # Data API 호출
    headers = {
        'Content-Type': 'application/json',
        'charset': 'UTF-8',
        'Accept': '*/*'
    }
    yesterday = (date.today() - timedelta(1)).strftime("%Y-%m-%d")
    today = date.today().strftime("%Y-%m-%d")

    response = requests.get(f'{test}/acs/call/list.do?token={token}&acsId={id}&startDate={yesterday}&endDate={today}&status=3', headers=headers)
    data = response.json()

    log_message = f'Data API 호출 결과: {response.status_code}'
    logging.info(log_message)

    if data['message'] == 'success':
        total_count = data['total']
        log_message = f'Data API 호출 결과 전체 건수: {total_count}'
        logging.info(log_message)

        start=time.time() #현재시간

        # Kafka 연결
        producer = KafkaProducer(acks=0,
                            bootstrap_servers=['kafka:9092'],
                            value_serializer=lambda x: json.dumps(x).encode('utf-8')
                            )
        for x in range(total_count):
            doc = data['data'][x]
            producer.send(topic='test', value=doc)

        producer.flush()
        proc_time = time.time()-start

        log_message = f'Kafka 에 데이터를 전송한 시간: {proc_time}'
        logging.info(log_message)
            
    else:
        log_message = f'Data API 호출 결과 fail 상태 입니다. {yesterdayy} ~ {todayy}'
        logging.info(log_message)

def get_stat_data(token):
    yesterdayy = (date.today() - timedelta(1)).strftime("%Y%m%d")
    todayy = date.today().strftime("%Y%m%d")
    
    response = requests.get(f'{test}/stat/list.do?token={token}&reportId=2020000026&searchUnit=MI&fromDate={yesterdayy}&toDate={todayy}&exclLunch=false', headers=headers)
    response_dict = json.loads(response.content.decode('utf-8'))
    
    data = response_dict["data"]

    log_message = f'Data API 호출 결과: {response.status_code}'
    logging.info(log_message)
    
    if response_dict['message'] == 'success':
        start = time.time()

        producer = KafkaProducer(
            bootstrap_servers=["kafka:9092"],
            value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode('utf-8'),
            max_request_size=5000*1024*1024
        )

        for item in data:
            producer.send(topic='test1', value=item)

        producer.flush()
        proc_time = time.time() - start
        
        log_message = f'Kafka 에 데이터를 전송한 시간: {proc_time}'
        logging.info(log_message)
        
    else:
        log_message = f'Data API 호출 결과 fail 상태입니다. {yesterdayy} ~ {todayy}'
        logging.info(log_message)

with DAG('token_data_api_dag_ver2', default_args=default_args, schedule_interval=timedelta(minutes=5)) as dag:
    token_task = PythonOperator(
        task_id='get_token',
        python_callable=get_token,
        op_kwargs={'duration_minutes':60}
    )
    get_stat_data = PythonOperator(
        task_id='get_data_stat',
        python_callable=get_stat_data,
        op_kwargs={'token': '{{ ti.xcom_pull(task_ids="get_token") }}'}
    )
    ids = [test1, test2, test3]
    data_tasks = []
    for id in ids:
        task_id = f'get_data_{id}'
        data_task = PythonOperator(
            task_id=task_id,
            python_callable=get_data,
            op_kwargs={'token': '{{ ti.xcom_pull(task_ids="get_token") }}', 'id': id}
        )	
        token_task >> data_task
        token_task >> send_data_to_kafka_operator1
        data_tasks.append(data_task)

Kafka Topic 저장 된 Topic 을 KSql 사용하여 Avro 토픽으로 저장

-Topic 을 가져와 Json Stream 생성

############################################
CREATE STREAM test_json
(col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR
)
WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');

############################################
CREATE STREAM test1_json
(
col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR
)
WITH (KAFKA_TOPIC='test1', VALUE_FORMAT='JSON');

Json Stream 을 AVRO Strema 생성 후 Kafka Topic 에 저장 (Data transform)

#############################################
CREATE STREAM test_avro
WITH (KAFKA_TOPIC= 'test_avro',VALUE_FORMAT='AVRO') AS 
SELECT col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR
FROM test_json where col1 = 'test'
PARTITION BY col1;

#############################################
CREATE STREAM test1_avro
WITH (KAFKA_TOPIC= 'test1_avro',VALUE_FORMAT='AVRO') AS 
SELECT col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR
FROM test1_json where col1 = 'test'
PARTITION BY col1;

Kafka Connect 이용하여 JDBC 저장

-upsert

#############################################
{
        "name": "test_sink",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max": "1",
                "topics": "test_avro",
                "table.name.format": "tablename",
                "connection.url": "jdbc:postgresql://ip:5432/postgres",
                "connection.user" :"postgres",
                "connection.password" : "pw",
                "dialect.name": "PostgreSqlDatabaseDialect",
                "transforms": "unwrap",
                "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                "transforms.unwrap.drop.tombstones": "false",
                "transforms.unwrap.delete.handling.mode":"rewrite",
                "transforms.unwrap.add.fields":"op,schema",
                "insert.mode": "upsert",
                "delete.enabled": "true",
                "auto.create": "true",
                "auto.evolve": "true",
                "pk.fields": "col1",
                "pk.mode": "record_key",
                "delete.retention.ms":"100",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
                "value.converter.schemas.enable":"true",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
                "key.converter.schemas.enable":"false"
        }
}

#############################################
{
        "name": "test1_sink",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max": "1",
                "topics": "test1_avro",
                "table.name.format": "tablename",
                "connection.url": "jdbc:postgresql://ip:5432/postgres",
                "connection.user" :"postgres",
                "connection.password" : "pw",
                "dialect.name": "PostgreSqlDatabaseDialect",
                "transforms": "unwrap",
                "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                "transforms.unwrap.drop.tombstones": "false",
                "transforms.unwrap.delete.handling.mode":"rewrite",
                "transforms.unwrap.add.fields":"op,schema",
                "insert.mode": "upsert",
                "delete.enabled": "true",
                "auto.create": "true",
                "auto.evolve": "true",
                "pk.fields": "col1",
                "pk.mode": "record_key",
                "delete.retention.ms":"100",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
                "value.converter.schemas.enable":"true",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
                "key.converter.schemas.enable":"false"
        }
}

'Kafka' 카테고리의 다른 글

Apache Flume(Kafka To File)  (0) 2023.03.06
Kafkaconnector(MysqlCDC To Impala&Kudu)-Confluent  (0) 2023.03.02
Kafkaconnector(MysqlCDC To Impala&Kudu)  (0) 2023.02.22
Kcat(Kafkacat)  (3) 2023.02.20
MariaDB to Elasticsearch  (0) 2023.01.30
Comments