Dataproc의 자연어 처리를 위한 PySpark

1. 개요

자연어 처리 (NLP)는 텍스트 데이터에서 유용한 정보를 도출하고 분석하는 연구입니다. 인터넷에서 생성되는 글의 양이 계속 증가함에 따라 이제 그 어느 때보다 조직에서 텍스트를 활용하여 비즈니스와 관련된 정보를 얻고자 합니다.

NLP는 언어 번역부터 감정 분석, 처음부터 문장 생성 등 다양한 작업에 사용할 수 있습니다. 텍스트를 사용하는 방식을 변화시키는 활발한 연구 분야입니다.

대규모로 대량의 텍스트 데이터에 NLP를 사용하는 방법을 살펴봅니다. 쉽지 않은 일일 수 있습니다. 다행히 Spark MLlibspark-nlp와 같은 라이브러리를 활용하면 이 작업을 더 쉽게 할 수 있습니다.

2. 사용 사례

가상의 조직인 'FoodCorp'의 수석 데이터 과학자가 식품 업계의 동향에 대해 자세히 알아보고자 합니다. Reddit의 하위 갤러리인 r/food의 게시물 형식의 텍스트 데이터 코퍼스에 액세스할 수 있으며, 이를 통해 사람들이 무엇에 관해 이야기하는지 살펴볼 수 있습니다.

이를 위한 한 가지 접근 방식은 '주제 모델링'이라는 NLP 메서드를 사용하는 것입니다. 주제 모델링은 문서 그룹의 의미론적 의미에서 추세를 파악할 수 있는 통계적 방법입니다. 즉, Reddit '게시물' 자료에 주제 모델을 구축하여 동향을 설명하는 '주제' 또는 단어 그룹 목록을 생성할 수 있습니다.

모델을 빌드하기 위해 텍스트를 클러스터링하는 데 자주 사용되는 잠재 디리히레히트 분포 (LDA)라는 알고리즘을 사용합니다. LDA에 대한 훌륭한 소개는 여기에서 확인할 수 있습니다.

3. 프로젝트 만들기

아직 Google 계정(Gmail 또는 Google Apps)이 없으면 계정을 만들어야 합니다. Google Cloud Platform Console ( console.cloud.google.com)에 로그인하고 새 프로젝트를 만듭니다.

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

그런 다음 Google Cloud 리소스를 사용할 수 있도록 Cloud 콘솔에서 결제를 사용 설정해야 합니다.

이 Codelab을 실행하는 데는 몇 달러 정도의 비용이 들지만 더 많은 리소스를 사용하려고 하거나 실행 중일 경우 비용이 더 들 수 있습니다. PySpark-BigQuerySpark-NLP Codelab의 끝부분에서 각각 '정리'를 설명합니다.

Google Cloud Platform 신규 사용자는 $300 상당의 무료 체험판을 사용할 수 있습니다.

4. 환경 설정

먼저 Dataproc 및 Compute Engine API를 사용 설정해야 합니다.

화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.

2bfc27ef9ba2ec7d.png

드롭다운에서 API 관리자를 선택합니다.

408af5f32c4b7c25.png

API 및 서비스 사용 설정을 클릭합니다.

a9c0e84296a7ba5b.png

검색창에서 'Compute Engine'을 검색합니다. 표시되는 결과 목록에서 'Google Compute Engine API'를 클릭합니다.

b6adf859758d76b3.png

Google Compute Engine 페이지에서 사용 설정을 클릭합니다.

da5584a1cbc77104.png

사용 설정되면 왼쪽 화살표를 클릭하여 뒤로 이동합니다.

이제 'Google Dataproc API'를 검색하여 사용 설정합니다.

f782195d8e3d732a.png

그런 다음 Cloud 콘솔의 오른쪽 상단에 있는 버튼을 클릭하여 Cloud Shell을 엽니다.

a10c47ee6ca41c54.png

Codelab을 진행하면서 참조할 수 있는 몇 가지 환경 변수를 설정합니다. 먼저 만들려는 Dataproc 클러스터의 이름(예: 'my-cluster')을 선택하고 환경에 설정합니다. 원하는 이름을 사용해도 됩니다.

CLUSTER_NAME=my-cluster

그런 다음 여기에서 사용 가능한 영역 중 하나를 선택합니다. 예를 들면 us-east1-b.

REGION=us-east1

마지막으로 작업에서 데이터를 읽을 소스 버킷을 설정해야 합니다. 버킷 bm_reddit에 샘플 데이터가 있지만, 이 섹션 전에 BigQuery 데이터 전처리를 위한 PySpark를 완료했다면 해당 섹션에서 생성한 데이터를 사용해도 됩니다.

BUCKET_NAME=bm_reddit

환경 변수를 구성했으므로 다음 명령어를 실행하여 Dataproc 클러스터를 만들어 보겠습니다.

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

각 명령어를 단계별로 살펴보겠습니다.

gcloud beta dataproc clusters create ${CLUSTER_NAME}: 앞에서 입력한 이름으로 Dataproc 클러스터 생성을 시작합니다. 아래에서 설명하는 구성요소 게이트웨이와 같은 Dataproc의 베타 기능을 사용 설정하기 위해 여기에 beta를 포함합니다.

--zone=${ZONE}: 클러스터의 위치를 설정합니다.

--worker-machine-type n1-standard-8: 작업자에 사용할 머신 유형입니다.

--num-workers 4: 클러스터에 4개의 작업자가 있습니다.

--image-version 1.4-debian9: 사용할 Dataproc의 이미지 버전을 나타냅니다.

--initialization-actions ...: 초기화 작업은 클러스터와 작업자를 만들 때 실행되는 맞춤 스크립트입니다. 사용자는 키를 만들어 GCS 버킷에 저장하거나 공개 버킷 dataproc-initialization-actions에서 참조할 수 있습니다. 여기에 포함된 초기화 작업을 사용하면 --metadata 플래그와 함께 제공된 Pip를 사용하여 Python 패키지를 설치할 수 있습니다.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Dataproc에 설치할 패키지 목록으로, 공백으로 구분됩니다. 이 경우 google-cloud-storage Python 클라이언트 라이브러리와 spark-nlp를 설치합니다.

--optional-components=ANACONDA: 선택적 구성요소는 Dataproc와 함께 사용되는 일반적인 패키지로, 생성 중에 Dataproc 클러스터에 자동으로 설치됩니다. 초기화 작업보다 선택적 구성요소를 사용하는 이점은 시작 시간이 더 빠르고 특정 Dataproc 버전에서 테스트된다는 점입니다. 전반적으로 더 안정적입니다.

--enable-component-gateway: 이 플래그를 사용하면 Zeppelin, Jupyter, Spark 기록과 같은 일반적인 UI를 보려는 경우 Dataproc의 구성요소 게이트웨이를 활용할 수 있습니다. 참고: 일부에는 연결된 선택적 구성요소가 필요합니다.

Dataproc에 대해 자세히 알아보려면 이 Codelab을 확인하세요.

그런 다음 Cloud Shell에서 다음 명령어를 실행하여 샘플 코드가 포함된 저장소를 클론하고 올바른 디렉터리로 이동합니다.

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib는 Apache Spark로 작성된 확장형 머신러닝 라이브러리입니다. MLlib은 미세 조정된 머신러닝 알고리즘 모음과 함께 Spark의 효율성을 활용하여 대량의 데이터를 분석할 수 있습니다. Java, Scala, Python, R API가 있습니다. 이 Codelab에서는 특히 Python에 중점을 둡니다.

MLlib에는 다양한 변환기와 추정기가 포함되어 있습니다. 변환기는 일반적으로 transform() 함수를 사용하여 데이터를 변경하거나 변형할 수 있는 도구인 반면, 추정기는 일반적으로 fit() 함수를 사용하여 데이터를 학습할 수 있는 사전 빌드된 알고리즘입니다.

트랜스포머의 예는 다음과 같습니다.

  • 토큰화 (단어 문자열에서 숫자 벡터 만들기)
  • 원-핫 인코딩 (문자열에 있는 단어를 나타내는 숫자의 희소 벡터 생성)
  • 불용어 삭제 도구 (문자열에 의미론적 가치를 더하지 않는 단어 삭제)

추정기의 예는 다음과 같습니다.

  • 분류 (사과인가요, 오렌지인가요?)
  • 회귀 (이 사과는 얼마여야 하나요?)
  • 클러스터링 (모든 사과가 서로 얼마나 유사한가요?)
  • 결정 트리 (if color == orange, then it's an orange. 그렇지 않으면 사과입니다.)
  • 차원 축소 (데이터 세트에서 특성을 삭제해도 사과와 오렌지를 구분할 수 있나요?)

MLlib에는 교차 검증뿐만 아니라 초매개변수 조정 및 선택과 같은 머신러닝의 다른 일반적인 방법을 위한 도구도 포함되어 있습니다.

또한 MLlib에는 재실행할 수 있는 다양한 변환기를 사용하여 데이터 변환 파이프라인을 빌드할 수 있는 Pipelines API가 포함되어 있습니다.

6. Spark-NLP

Spark-nlp는 Spark를 사용하여 효율적인 자연어 처리 작업을 실행하기 위해 John Snow Labs에서 만든 라이브러리입니다. 여기에는 다음과 같은 일반적인 작업을 위한 주석 작성 도구라는 내장 도구가 포함되어 있습니다.

  • 토큰화 (단어 문자열에서 숫자 벡터 만들기)
  • 단어 임베딩 생성 (벡터를 통해 단어 간의 관계 정의)
  • 품사 태그 (어떤 단어가 명사인가요? 동사인가요?)

spark-nlp는 이 Codelab의 범위에 해당하지 않지만 TensorFlow와도 잘 통합됩니다.

무엇보다도 Spark-NLP는 MLlib 파이프라인에 쉽게 삽입할 수 있는 구성요소를 제공하여 Spark MLlib의 기능을 확장합니다.

7. 자연어 처리 권장사항

데이터에서 유용한 정보를 추출하기 전에 몇 가지 준비를 해야 합니다. 사전 처리 단계는 다음과 같습니다.

토큰화

일반적으로 가장 먼저 해야 할 일은 데이터를 '토큰화'하는 것입니다. 이를 위해서는 데이터를 가져와 '토큰' 또는 단어를 기준으로 분할해야 합니다. 일반적으로 이 단계에서는 구두점을 삭제하고 모든 단어를 소문자로 설정합니다. 예를 들어 다음과 같은 문자열이 있다고 가정해 보겠습니다. What time is it? 토큰화 후 이 문장은 4개의 토큰으로 구성됩니다. "what" , "time", "is", "it". 모델이 what라는 단어를 두 가지 다른 대소문자로 된 두 개의 다른 단어로 취급하지 않도록 합니다. 또한 구두점은 일반적으로 단어에서 추론을 더 잘 학습하는 데 도움이 되지 않으므로 구두점도 삭제합니다.

정규화

데이터를 '정규화'하는 경우가 많습니다. 이렇게 하면 의미가 비슷한 단어가 동일한 단어로 대체됩니다. 예를 들어 텍스트에서 'fought', 'battled', 'dueled'라는 단어가 식별되면 정규화 과정에서 'battled'와 'dueled'가 'fought'로 대체될 수 있습니다.

Stemming

어간 추출은 단어를 어근 의미로 대체합니다. 예를 들어 '자동차', '자동차들', '자동차의'는 모두 '자동차'라는 단어로 바뀝니다. 이러한 단어는 모두 근본적으로 동일한 의미를 내포하기 때문입니다.

검색되지 않는 단어 삭제하기

불용어는 일반적으로 문장의 의미적 의미에 가치를 더하지 않는 'and', 'the'와 같은 단어입니다. 일반적으로 텍스트 데이터 세트의 노이즈를 줄이기 위해 이러한 단어를 삭제합니다.

8. 작업 실행

실행할 작업을 살펴보겠습니다. 코드는 cloud-dataproc/codelabs/spark-nlp/topic_model.py에서 확인할 수 있습니다. 스크립트와 관련 주석을 꼼꼼히 살펴보고 진행 상황을 파악하는 데 몇 분 정도 시간을 할애하세요. 또한 아래에서 몇 가지 섹션을 강조 표시합니다.

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

작업 실행

이제 작업을 실행해 보겠습니다. 다음 명령어를 실행합니다.

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

이 명령어를 사용하면 Dataproc Jobs API를 활용할 수 있습니다. pyspark 명령어를 포함하면 클러스터에 이 작업이 PySpark 작업임을 나타냅니다. 클러스터 이름, 여기에서 사용 가능한 매개변수(선택사항), 작업이 포함된 파일의 이름을 제공합니다. 여기서는 Spark, Yarn 또는 Dataproc의 다양한 속성을 변경할 수 있는 매개변수 --properties를 제공합니다. Spark에 spark-nlp를 작업과 함께 패키징하여 포함하겠다고 알리는 Spark 속성 packages를 변경합니다. 또한 오류를 제외한 PySpark의 로그 출력 대부분을 억제하는 매개변수 --driver-log-levels root=FATAL도 제공합니다. 일반적으로 Spark 로그는 노이즈가 많습니다.

마지막으로 -- ${BUCKET}는 버킷 이름을 제공하는 Python 스크립트 자체의 명령줄 인수입니다. --${BUCKET} 사이의 공백에 유의하세요.

작업을 실행한 후 몇 분 후에 모델이 포함된 출력이 표시됩니다.

167f4c839385dcf0.png

대단해!! 모델의 출력을 보고 추세를 추론할 수 있나요? YouTube는 어떠신가요?

위의 출력에서 주제 8의 아침 식사 관련 동향과 주제 9의 디저트 관련 동향을 추론할 수 있습니다.

9. 삭제

이 빠른 시작을 완료한 후 GCP 계정에 불필요한 비용이 청구되지 않도록 하려면 다음 단계를 따르세요.

  1. 내가 만든 환경의 Cloud Storage 버킷 삭제
  2. Dataproc 환경을 삭제합니다.

이 Codelab용으로 프로젝트를 만든 경우 원하는 경우 프로젝트를 삭제할 수도 있습니다.

  1. GCP Console에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력하고 종료를 클릭하여 프로젝트를 삭제합니다.

라이선스

이 작업물은 Creative Commons Attribution 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 이용할 수 있습니다.