DEV_희쨔응
Spark Producer(Hadoop to Kafka) 본문
하둡에 저장된 데이터를 카프카에 전송하는 배치성 프로듀서 Demo를 구성하였습니다.
Hadoop to kafka Source
from pyspark.sql import SparkSession
appName = "readj"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
# Create DF and save as JSON
df = spark.read.format('json').load(
'hdfs://10.65.41.145:9000/test/json/*.json')
df.show()
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "10.65.41.147:9092,10.65.41.148:9092,10.65.41.149:9092") \
.option("topic", "topic_sparkt") \
.save()
현재 트리거를 통해 실행해주어야 기능이 가능합니다.
추후 배치 작업 또는 스트림을 구성하여 자동화 시킬 예정입니다.
실행 명령어
/root/spark-3.2.2-bin-hadoop3/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2,org.apache.kafka:kafka-clients:3.2.2,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.2 sparkpy.py
> Kafka에 전송할 토픽 데이터 조회
> Kafka에서 Consume한 토픽 데이터 조회
'Hadoop > Spark' 카테고리의 다른 글
pySpark ( MariaDB to Hadoop ) (0) | 2023.03.14 |
---|---|
pySpark( File(csv) to Postgresql ) (0) | 2023.03.14 |
pySpark ( Postgresql to MariaDB ) (0) | 2023.03.14 |
Spark consumer(Kafka to Hadoop) (1) | 2023.02.16 |
Spark & Zeppelin (0) | 2022.09.30 |
Comments