DEV_희쨔응
NLP 감성 분석 및 키워드 연관어 분석 파이프라인 본문
해당 구성도는 Kafka source connector가 PK 컬럼기준으로 Source 테이블의 변경을 감지하여 실시간으로 Kafka Topic에 데이터를 적재 합니다 그후 Ksql를 통해 데이터 전처리 및 클렌징 작업을 진행후 Pyspark kafkastream 을 통해 Topic의 Data를 consume 한 후 NLP API 서버에 전송 후 반환된 학습 데이터를 TargetDB에 적재하는 PipeLine 데모 시나리오 입니다. 해당 시나리오에서 사용한 NLP는 감성분석,키워드분석,연관어분석,키센텐스 분석,비속어 분석,문장요약 등이 사용 되었습니다.
Kafka Source Connector
{
"name": "test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://ip:5432/db",
"connection.user": "user",
"connection.password": "pw",
"table.whitelist": "table",
"mode": "incrementing",
"incrementing.column.name": "column",
"topic.prefix": "topicname",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081"
}
}
ksql (join stream)
CREATE STREAM table
WITH (KAFKA_TOPIC= 'topic',VALUE_FORMAT='AVRO',KEY_FORMAT='AVRO') AS
SELECT
l.column1,
l.column2,
l.column3,
u.column1,
u.column2
FROM table1 l
LEFT JOIN table2 u WITHIN 2 hours ON l.column1 = u.column1
PARTITION BY column1;
2시간 내에 들어온 데이터를 대상으로 join 하는 조건의 stream 예시 입니다.
pyspark kafka stream 예제
# 필요한 패키지와 함수 import
from pyspark.sql.functions import col, udf, from_json, to_json, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType, TimestampType
from pyspark.sql import SparkSession
import json
import requests
import re
from pyspark.sql import functions as F
# Spark Session 생성
spark = SparkSession.builder \
.appName("KafkaStreamingExample-hadadadj") \
.master("spark://spark-master:7077") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4,org.postgresql:postgresql:42.6.0") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memory", "8g") \
.config("spark.cores.max", "4") \
.config("spark.sql.session.timeZone", "Asia/Seoul") \
.getOrCreate()
# 스키마 정의
schema = StructType([
StructField("column1", IntegerType()),
StructField("column2", StringType()),
StructField("column3", StringType()),
])
# StreamingContext 생성
spark.sparkContext.setLogLevel('ERROR')
# Kafka에서 데이터를 읽어와 DataFrame 생성
df_raw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip:9092") \
.option("subscribe", "topic") \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("CAST(value AS STRING)")
#데이터 필터링 조건
df = df_raw.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("parsed_value")) \
.select("parsed_value.*") \
.filter((col("column1") == 0) | (col("column1") == 1)) \
.filter(col("column2") == "서울")
completion_session_ids = df \
.filter(col("query_text").contains("completion")) \
.select("session_id", col("timestamp").alias("comp_timestamp")) \
.distinct()
# 데이터 필터링 조건2 데이터 특수 문자 및 html 태그 제거 해당 요소로인해 nlp분석결과에 영향이감
def clean_text(text):
cleaned_text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '[url주소]', text)
cleaned_text = re.sub('<.*?>', '', cleaned_text)
cleaned_text = re.sub(r'[^\w\s]', ' ', cleaned_text)
cleaned_text = ' '.join(cleaned_text.split())
return cleaned_text
# DataFrame에 clean_text 함수 적용하여 정리된 텍스트 컬럼 생성
udf_clean_text = udf(lambda text: clean_text(text), StringType())
df_with_cleaned_text = df.withColumn('cleaned_query_text_text', udf_clean_text(col("column3")))
# NLP API 호출 함수 정의
def get_sentiment_v2_result(text):
url = "http://ip:8000/sentiment_v2"
data = {
"document": [text]
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code == 200:
result = response.json()
sentiment = result['sentiment']
score = result['score']
return sentiment, score
else:
return None
# UDF 함수 정의 (sentiment 체크를 위해 추가)
nlp_udf = udf(lambda text: get_sentiment_v2_result(text), StructType([
StructField("sentiment", StringType()),
StructField("score", FloatType())
]))
#버전1
def get_sentiment_v1_result(text):
url = "http://ip:8000/sentiment"
data = {
"document": [text]
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code == 200:
result = response.json()
score = result['score']
return score
else:
return None
# UDF 함수 정의 (sentiment 체크를 위해 추가)
nlp_udf1 = udf(lambda text: get_sentiment_v1_result(text), StructType([
StructField("score", FloatType())
]))
# badword1를 체크하는 API 호출 함수 정의
def check_badword1(text):
url = "http://ip:8000/badword"
data = {
"document": [text]
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
result = response.json()
badword_scores = result['badword_scores']
if badword_scores and len(badword_scores) > 0:
return badword_scores[0]
else:
return None
else:
return None
# UDF 함수 정의 (badword 체크를 위해 추가)
badword_check_udf1 = udf(lambda text: check_badword1(text), FloatType())
# badword를 체크하는 API 호출 함수 정의
def check_badword(text):
url = "http://ip:8000/badword_v2"
data = {
"document": [text]
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
result = response.json()
badword_scores_v2 = result['badword_scores_v2']
if badword_scores_v2 and len(badword_scores_v2) > 0:
return badword_scores_v2[0]
else:
return None
else:
return None
# UDF 함수 정의 (badword 체크를 위해 추가)
badword_check_udf = udf(lambda text: check_badword(text), FloatType())
# DataFrame에 UDF 적용 (badword 체크를 위해 추가)
df_with_nlp_badword = df_with_cleaned_text.withColumn('nlp_result', nlp_udf(col("cleaned_query_text_text"))) \
.withColumn("badword_scores_v2", badword_check_udf(col("cleaned_query_text_text"))) \
.withColumn("badword_scores", badword_check_udf1(col("cleaned_query_text_text"))) \
.withColumn("nlp_result1", nlp_udf1(col("cleaned_query_text_text")))
# DataFrame 변환 및 필요한 컬럼 선택
df_transformed_keywords = df_with_nlp_badword.select(
col("column1"),
col("column2"),
col("column3"),
col("nlp_result.sentiment").alias("sentiment_v2"),
col("nlp_result.score").alias("sentiment_v2_score"),
col("nlp_result1.score").alias("sentiment_score"),
col("badword_scores_v2").alias("badword_score_v2"),
col("badword_scores").alias("badword_score_v1"),
)
# PostgreSQL 연결 정보 설정
jdbc_url = "jdbc:postgresql://ip:5432/postgres"
connection_properties = {
"user": "user",
"password": "pw",
"driver": "org.postgresql.Driver",
"timezone": "Asia/Seoul"
}
# DataFrame을 DB에 저장
def save_to_postgresql(batch_df, batch_id):
batch_df.write \
.jdbc(jdbc_url, "test.table", mode="append", properties=connection_properties)
# DataFrame을 DB에 저장 (스트리밍으로)
query = df_transformed_keywords.writeStream \
.outputMode("append") \
.foreachBatch(save_to_postgresql) \
.start() \
.awaitTermination()
해당 코드에서 사용하는 nlp는 감성분석 버전 1,2 와 비속어 분석 버전 1,2 입니다.
아래는 각 NLP 반환 값의 예시 입니다.
#감성분석 버전1
#전송
curl -X 'POST' \
'http://IP:8000/sentiment' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"document": [
"정말 짜증나는 하루에요"
]
}'
#결과
{
"score": [
0.0023321229964494705
],
"time": 0.0029921531677246094
}
#감성분석 버전2
#전송
curl -X 'POST' \
'http://IP:8000/sentiment_v2' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"document": [
"정말 짜증나는 하루에요"
]
}'
#결과
{
"sentiment": "Negative",
"score": 0.9992272853851318,
"time": 0.019309043884277344
}
#비속어 분석 버전1
#전송
curl -X 'POST' \
'http://IP:8000/badword' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"document": [
"당신은 바보 멍청이 입니다"
]
}'
#결과
{
"badword_scores": [
0.9700000286102295
],
"time": 0.05283522605895996
}
#비속어 분석 버전2
#전송
curl -X 'POST' \
'http://IP:8000/badword_v2' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"document": [
"당신은 병신 입니다"
]
}'
#결과
{
"badword_scores_v2": [
1
],
"time": 0.05513644218444824
}
'Messaging System > Kafka' 카테고리의 다른 글
Kafka 관련 용어 정리 (1) | 2022.09.30 |
---|
Comments