DEV_희쨔응

Apache Beam 본문

Kafka

Apache Beam

희쨔응 2023. 1. 26. 10:31

Apache Beam은 ETL, batch, streaming 파이프라인을 처리하기 위한 unified programming model 입니다. Beam의 가장 큰 특징은 다양한 랭귀지와 다양한 runner를 지원한다는 것 이고 Beam SDK를 통해 다양한 runner에서 데이터를 처리할 수 있게 합니다.

 

Beam SDK 를 사용하여 테스트 서버에 구축되어 있는 Kafka 에 간단한 데모를 Python으로 구현 하였습니다.

 

 

Producer 소스

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = (p
                     | "Creating data" >> beam.Create([('dev_1', '{"device": "0001", status": "healthy"}')])
                     | "Pushing messages to Kafka" >> kafkaio.KafkaProduce(
                                                                            topic='토픽네임',
                                                                            servers="브로커IP:9092"
                                                                        )
                    )
    notifications | 'Writing to stdout' >> beam.Map(print)

구동 결과

Kafdrop으로 해당 Topic에 PUB한 Data를 확인

 

 

 

Consumer 소스

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

consumer_config = {"topic": "토픽네임",
                   "bootstrap_servers": "브로커IP:9092",
                   "group_id": "heejae_beam_test"}

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(
        consumer_config=consumer_config,
        value_decoder=bytes.decode,  # optional
    )
    notifications | 'Writing to stdout' >> beam.Map(print)

구동 결과

해당 Topic에 적재된 Data를 Print한다

 

'Kafka' 카테고리의 다른 글

Kcat(Kafkacat)  (3) 2023.02.20
MariaDB to Elasticsearch  (0) 2023.01.30
Apache Flume(Kafka To Hadoop)  (0) 2022.10.21
Supervisor  (0) 2022.10.20
PrestoDB to Kafka  (0) 2022.10.17
Comments