DEV_희쨔응
pySpark( File(csv) to Postgresql ) 본문
개발 서버에 Spark를 사용한 위와 같은 파이프라인 demo를 구성하였습니다.
특정한 파일 확장자(CSV, JSON 등)을 스파크(PySpark)를 통해 RDB에 저장합니다.
> 예제 파일
file_to_postgres
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
ip = "10.65.41.141"
port = 5432
user = "isharkk"
passwd = "rplinux"
db = "testt"
spark = SparkSession \
.builder \
.appName('SparkByExamples.com') \
.config("spark.driver.extraClassPath", "/root/spark-3.2.2-bin-hadoop3/jars/postgresql-42.5.4.jar") \
.getOrCreate()
df = spark.read.csv("file:///root/self.csv")
df2 = spark.read.option("header",True) \
.csv("file:///root/self.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
.csv("/tmp/spark_output/self.csv")
query1 = df2.write.format("jdbc")\
.option("url","jdbc:postgresql://{0}:{1}/{2}".format(ip, port, db)) \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "ishark.self") \
.option("user", user) \
.option("password", passwd) \
.save()
> 해당 CSV 파일 스키마 속성
> 해당 CSV 파일 해더 형식 추가
> Postgresql에 적재된 CSV 데이터
'Hadoop > Spark' 카테고리의 다른 글
Spark Producer(Hadoop to Kafka) (0) | 2023.03.14 |
---|---|
pySpark ( MariaDB to Hadoop ) (0) | 2023.03.14 |
pySpark ( Postgresql to MariaDB ) (0) | 2023.03.14 |
Spark consumer(Kafka to Hadoop) (1) | 2023.02.16 |
Spark & Zeppelin (0) | 2022.09.30 |
Comments