DEV_희쨔응

pySpark( File(csv) to Postgresql ) 본문

Hadoop/Spark

pySpark( File(csv) to Postgresql )

희쨔응 2023. 3. 14. 16:03

개발 서버에 Spark를 사용한 위와 같은 파이프라인 demo를 구성하였습니다. 

 

특정한 파일 확장자(CSV, JSON 등)을 스파크(PySpark)를 통해 RDB에 저장합니다. 

 

> 예제 파일 

 

file_to_postgres

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
ip = "10.65.41.141"
port = 5432
user = "isharkk"
passwd = "rplinux"
db = "testt"
spark = SparkSession \
        .builder \
        .appName('SparkByExamples.com') \
        .config("spark.driver.extraClassPath", "/root/spark-3.2.2-bin-hadoop3/jars/postgresql-42.5.4.jar") \
        .getOrCreate()
df = spark.read.csv("file:///root/self.csv")
df2 = spark.read.option("header",True) \
     .csv("file:///root/self.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
 .csv("/tmp/spark_output/self.csv")
query1 = df2.write.format("jdbc")\
          .option("url","jdbc:postgresql://{0}:{1}/{2}".format(ip, port, db)) \
          .option("driver", "org.postgresql.Driver") \
          .option("dbtable", "ishark.self") \
          .option("user", user) \
          .option("password", passwd) \
          .save()

 

> 해당 CSV 파일 스키마 속성 

 

> 해당 CSV 파일 해더 형식 추가 

> Postgresql에 적재된 CSV 데이터

 

'Hadoop > Spark' 카테고리의 다른 글

Spark Producer(Hadoop to Kafka)  (0) 2023.03.14
pySpark ( MariaDB to Hadoop )  (0) 2023.03.14
pySpark ( Postgresql to MariaDB )  (0) 2023.03.14
Spark consumer(Kafka to Hadoop)  (1) 2023.02.16
Spark & Zeppelin  (0) 2022.09.30
Comments