DEV_희쨔응

NLP 감성 분석 및 키워드 연관어 분석 파이프라인 본문

Messaging System/Kafka

NLP 감성 분석 및 키워드 연관어 분석 파이프라인

희쨔응 2023. 9. 13. 11:12

해당 구성도는 Kafka source connector가 PK 컬럼기준으로 Source 테이블의 변경을 감지하여 실시간으로 Kafka Topic에 데이터를 적재 합니다 그후 Ksql를 통해 데이터 전처리 및 클렌징 작업을 진행후 Pyspark kafkastream 을 통해 Topic의 Data를 consume 한 후 NLP API 서버에 전송 후 반환된 학습 데이터를 TargetDB에 적재하는 PipeLine 데모 시나리오 입니다. 해당 시나리오에서 사용한 NLP는 감성분석,키워드분석,연관어분석,키센텐스 분석,비속어 분석,문장요약 등이 사용 되었습니다.

 

Kafka Source Connector

{
  "name": "test",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://ip:5432/db",
    "connection.user": "user",
    "connection.password": "pw",
    "table.whitelist": "table",
    "mode": "incrementing",
    "incrementing.column.name": "column",
    "topic.prefix": "topicname",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://kafka-schema-registry:8081"

        }
}

ksql (join stream)

CREATE STREAM table
WITH (KAFKA_TOPIC= 'topic',VALUE_FORMAT='AVRO',KEY_FORMAT='AVRO')  AS 
SELECT
    l.column1, 
    l.column2,
    l.column3,
    u.column1,
    u.column2
FROM table1 l
LEFT JOIN table2 u WITHIN 2 hours ON l.column1 = u.column1
PARTITION BY column1;

2시간 내에 들어온 데이터를 대상으로 join 하는 조건의 stream 예시 입니다.

 

pyspark kafka stream 예제

# 필요한 패키지와 함수 import
from pyspark.sql.functions import col, udf, from_json, to_json, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType, TimestampType
from pyspark.sql import SparkSession
import json
import requests
import re
from pyspark.sql import functions as F
# Spark Session 생성
spark = SparkSession.builder \
    .appName("KafkaStreamingExample-hadadadj") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4,org.postgresql:postgresql:42.6.0") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .config("spark.cores.max", "4") \
    .config("spark.sql.session.timeZone", "Asia/Seoul") \
    .getOrCreate()

# 스키마 정의
schema = StructType([
    StructField("column1", IntegerType()),
    StructField("column2", StringType()),
    StructField("column3", StringType()),
])

# StreamingContext 생성
spark.sparkContext.setLogLevel('ERROR')

# Kafka에서 데이터를 읽어와 DataFrame 생성
df_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "ip:9092") \
    .option("subscribe", "topic") \
    .option("startingOffsets", "earliest") \
    .load() \
    .selectExpr("CAST(value AS STRING)")
#데이터 필터링 조건
df = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("parsed_value")) \
    .select("parsed_value.*") \
    .filter((col("column1") == 0) | (col("column1") == 1)) \
    .filter(col("column2") == "서울")

completion_session_ids = df \
    .filter(col("query_text").contains("completion")) \
    .select("session_id", col("timestamp").alias("comp_timestamp")) \
    .distinct()

# 데이터 필터링 조건2 데이터 특수 문자 및 html 태그 제거 해당 요소로인해 nlp분석결과에 영향이감
def clean_text(text):
    cleaned_text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '[url주소]', text)
    cleaned_text = re.sub('<.*?>', '', cleaned_text)
    cleaned_text = re.sub(r'[^\w\s]', ' ', cleaned_text)
    cleaned_text = ' '.join(cleaned_text.split())
    return cleaned_text

# DataFrame에 clean_text 함수 적용하여 정리된 텍스트 컬럼 생성
udf_clean_text = udf(lambda text: clean_text(text), StringType())
df_with_cleaned_text = df.withColumn('cleaned_query_text_text', udf_clean_text(col("column3")))
# NLP API 호출 함수 정의
def get_sentiment_v2_result(text):
    url = "http://ip:8000/sentiment_v2"
    data = {
        "document": [text]
    }
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, data=json.dumps(data), headers=headers)
    if response.status_code == 200:
        result = response.json()
        sentiment = result['sentiment']
        score = result['score']
        return sentiment, score
    else:
        return None

# UDF 함수 정의 (sentiment 체크를 위해 추가)
nlp_udf = udf(lambda text: get_sentiment_v2_result(text), StructType([
    StructField("sentiment", StringType()),
    StructField("score", FloatType())
]))
#버전1
def get_sentiment_v1_result(text):
    url = "http://ip:8000/sentiment"
    data = {
        "document": [text]
    }
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, data=json.dumps(data), headers=headers)
    if response.status_code == 200:
        result = response.json()
        score = result['score']
        return score
    else:
        return None

# UDF 함수 정의 (sentiment 체크를 위해 추가)
nlp_udf1 = udf(lambda text: get_sentiment_v1_result(text), StructType([
    StructField("score", FloatType())
]))
# badword1를 체크하는 API 호출 함수 정의
def check_badword1(text):
    url = "http://ip:8000/badword"
    data = {
        "document": [text]
    }
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, json=data, headers=headers)
    if response.status_code == 200:
        result = response.json()
        badword_scores = result['badword_scores']
        if badword_scores and len(badword_scores) > 0:
            return badword_scores[0]
        else:
            return None
    else:
        return None

# UDF 함수 정의 (badword 체크를 위해 추가)
badword_check_udf1 = udf(lambda text: check_badword1(text), FloatType())

# badword를 체크하는 API 호출 함수 정의
def check_badword(text):
    url = "http://ip:8000/badword_v2"
    data = {
        "document": [text]
    }
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, json=data, headers=headers)
    if response.status_code == 200:
        result = response.json()
        badword_scores_v2 = result['badword_scores_v2']
        if badword_scores_v2 and len(badword_scores_v2) > 0:
            return badword_scores_v2[0]
        else:
            return None
    else:
        return None

# UDF 함수 정의 (badword 체크를 위해 추가)
badword_check_udf = udf(lambda text: check_badword(text), FloatType())

# DataFrame에 UDF 적용 (badword 체크를 위해 추가)
df_with_nlp_badword = df_with_cleaned_text.withColumn('nlp_result', nlp_udf(col("cleaned_query_text_text"))) \
                       .withColumn("badword_scores_v2", badword_check_udf(col("cleaned_query_text_text"))) \
                       .withColumn("badword_scores", badword_check_udf1(col("cleaned_query_text_text"))) \
                       .withColumn("nlp_result1", nlp_udf1(col("cleaned_query_text_text")))



# DataFrame 변환 및 필요한 컬럼 선택
df_transformed_keywords = df_with_nlp_badword.select(
    col("column1"),
    col("column2"),
    col("column3"),
    col("nlp_result.sentiment").alias("sentiment_v2"),
    col("nlp_result.score").alias("sentiment_v2_score"),
    col("nlp_result1.score").alias("sentiment_score"),
    col("badword_scores_v2").alias("badword_score_v2"),
    col("badword_scores").alias("badword_score_v1"),
)


# PostgreSQL 연결 정보 설정
jdbc_url = "jdbc:postgresql://ip:5432/postgres"
connection_properties = {
    "user": "user",
    "password": "pw",
    "driver": "org.postgresql.Driver",
    "timezone": "Asia/Seoul"
}

# DataFrame을 DB에 저장
def save_to_postgresql(batch_df, batch_id):
    batch_df.write \
        .jdbc(jdbc_url, "test.table", mode="append", properties=connection_properties)

# DataFrame을 DB에 저장 (스트리밍으로)
query = df_transformed_keywords.writeStream \
    .outputMode("append") \
    .foreachBatch(save_to_postgresql) \
    .start() \
    .awaitTermination()

해당 코드에서 사용하는 nlp는 감성분석 버전 1,2 와 비속어 분석 버전 1,2 입니다.

아래는 각 NLP 반환 값의 예시 입니다.

#감성분석 버전1
#전송
curl -X 'POST' \
  'http://IP:8000/sentiment' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "document": [
    "정말 짜증나는 하루에요"
  ]
}'
#결과
{
  "score": [
    0.0023321229964494705
  ],
  "time": 0.0029921531677246094
}
#감성분석 버전2
#전송
curl -X 'POST' \
  'http://IP:8000/sentiment_v2' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "document": [
    "정말 짜증나는 하루에요"
  ]
}'
#결과
{
  "sentiment": "Negative",
  "score": 0.9992272853851318,
  "time": 0.019309043884277344
}
#비속어 분석 버전1
#전송
curl -X 'POST' \
  'http://IP:8000/badword' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "document": [
    "당신은 바보 멍청이 입니다"
  ]
}'
#결과
{
  "badword_scores": [
    0.9700000286102295
  ],
  "time": 0.05283522605895996
}
#비속어 분석 버전2
#전송
curl -X 'POST' \
  'http://IP:8000/badword_v2' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "document": [
    "당신은 병신 입니다"
  ]
}'
#결과
{
  "badword_scores_v2": [
    1
  ],
  "time": 0.05513644218444824
}

 

'Messaging System > Kafka' 카테고리의 다른 글

Kafka 관련 용어 정리  (1) 2022.09.30
Comments