목록Hadoop/Spark (6)
DEV_희쨔응

하둡에 저장된 데이터를 카프카에 전송하는 배치성 프로듀서 Demo를 구성하였습니다. Hadoop to kafka Source from pyspark.sql import SparkSession appName = "readj" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() # Create DF and save as JSON df = spark.read.format('json').load( 'hdfs://10.65.41.145:9000/test/json/*.json') df.show() # Write key-value data from ..

개발 서버에 스파크(pySpark)를 활용하여 위와 같은 파이프라인을 구성하였습니다. Rdb의 테이블 데이터를 Hadoop에 append 형식으로 저장합니다. 스케줄 코드를 입력하면 원하는 시간마다 데이터를 전송할 수 있습니다. MariaDB_to_Hadoop from pyspark.sql import SparkSession appName = "PySpark Example - MariaDB Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel('ERROR') # sql..

개발 서버에 Spark를 사용한 위와 같은 파이프라인 demo를 구성하였습니다. 특정한 파일 확장자(CSV, JSON 등)을 스파크(PySpark)를 통해 RDB에 저장합니다. > 예제 파일 file_to_postgres import pyspark from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField, StringType, IntegerType from pyspark.sql.types import ArrayType, DoubleType, BooleanType from pyspark.sql.functions import col,array_contains ip = "10.65.41.141" port =..

개발 서버에 스파크(Pyspark)를 이용하여 다른 RDB 간의 테이블 데이터를 옮기는 파이프라인을 구성하였습니다. Pyspark 에서는 해당 테이블의 View를 구성하여 select, join, group by 등의 SQL을 사용하여 데이터를 필터링 할 수 있습니다. Postgresql_to_MariaDB import pyspark from pyspark.sql import SparkSession ip = "10.65.41.141" port = 5432 user = "isharkk" passwd = "rplinux" db = "testt" sp = pyspark.sql.SparkSession \ .builder \ .config("spark.driver.extraClassPath", "/root/spa..