DEV_희쨔응

Spark Producer(Hadoop to Kafka) 본문

Hadoop/Spark

Spark Producer(Hadoop to Kafka)

희쨔응 2023. 3. 14. 16:08

하둡에 저장된 데이터를 카프카에 전송하는 배치성 프로듀서 Demo를 구성하였습니다.

 

 

Hadoop to kafka Source

from pyspark.sql import SparkSession
 
appName = "readj"
master = "local"
 
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()
 
# Create DF and save as JSON
df = spark.read.format('json').load(
    'hdfs://10.65.41.145:9000/test/json/*.json')
df.show()
 
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "10.65.41.147:9092,10.65.41.148:9092,10.65.41.149:9092") \
  .option("topic", "topic_sparkt") \
  .save()

현재 트리거를 통해 실행해주어야 기능이 가능합니다.
추후 배치 작업 또는 스트림을 구성하여 자동화 시킬 예정입니다.

 

실행 명령어

/root/spark-3.2.2-bin-hadoop3/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2,org.apache.kafka:kafka-clients:3.2.2,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.2 sparkpy.py

 

> Kafka에 전송할 토픽 데이터 조회

 

> Kafka에서 Consume한 토픽 데이터 조회

 

'Hadoop > Spark' 카테고리의 다른 글

pySpark ( MariaDB to Hadoop )  (0) 2023.03.14
pySpark( File(csv) to Postgresql )  (0) 2023.03.14
pySpark ( Postgresql to MariaDB )  (0) 2023.03.14
Spark consumer(Kafka to Hadoop)  (1) 2023.02.16
Spark & Zeppelin  (0) 2022.09.30
Comments