Preprocesamiento en análisis de sentimientos con dataset de Twitter [Parte 2]

Contenido del Bootcamp dirigido por:

Preprocesamiento en análisis de sentimientos con dataset de Twitter
¿Qué encontrarás en este post?

En este post haremos el preprocesamiento en análisis de sentimientos con dataset de Twitter. En una primera entrega hicimos la descarga del dataset y la carga de datos dentro del mismo ejercicio.

Una vez tenemos todo configurado y entendemos los principios de Apache Beam, vamos a comenzar con uno de los pilares principales es la automatización; por ello, vamos a automatizar las tres fases que vamos a tener: el preprocesamiento, el entrenamiento y la inferencia.

Preprocesamiento en análisis de sentimientos con dataset de Twitter

Previo al preprocesamiento

Para el preprocesamiento en análisis de sentimientos con dataset de Twitter vamos a utilizar Apache Beam sobre el dataset en bruto para paralelizar el procesamiento y que sea lo más eficiente posible:

#Preprocesamiento en análisis de sentimientos con dataset de Twitter
import os

#Set the working directory to the sample code directory
%cd ./twitter-sentiment-analysis-batch/

WORK_DIR = os.getcwd ()
Preprocesamiento en análisis de sentimientos con dataset de Twitter
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
! python3 preprocess.py \
--work -dir $WORK_DIR \
--runner DirectRunner \
--input $WORK_DIR/data/training*.csv \
--output $WORK_DIR/data/transformed_data \
--mode train
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
! python3 preprocess.py \
--work -dir $WORK_DIR \
--runner DirectRunner \
--input $WORK_DIR/data/test*.csv \
--output $WORK_DIR/data/transformed_data \
--mode test

Preprocesamiento

#Preprocesamiento en análisis de sentimientos con dataset de Twitter
#Importamos librerías
from __future__ import absolute_import

import argparse
import logging
import re
import os
import csv
import random

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download ("stopwords")

#CLEANING
STOP_WORDS = stopwords.words ("english")
STEMMER = SnowballStemmer ("english")
TEXT_CLEANING_RE = "@\S+ | https?: \S+ | http?: \S | [^A-Za-z0-9] + "


#Preprocesamiento en análisis de sentimientos con dataset de Twitter
class ExtractColumnsDoFn (beam.DoFn):
       def process (self, element):
              #space removal
              element_split = list (csv.reader (element, delimiter = " , "))
              element_split = [x for x in element_split if x != [" " , " "]]
#text, sentiment
yield element_split [5] [-1], element_split [0] [-1]


class PreprocessColumnsTrainFn (beam.DoFn):
       def process_sentiment (self, sentiment):
              sentiment = int (sentiment)
              if sentiment == 4:
                     return "POSITIVE"
              elif sentiment == 0:
                     return "NEGATIVE"
              else:
                     return "NEUTRAL"
       
       def process_text (self, text):
              #Remove link, user and special characters
              stem = False
              text = re.sub (TEXT_CLEANING_RE, "  ", str (text).lower ()).strip ()
              tokens = [  ]
              for token in text.split ():
                     if token not in STOP_WORDS:
                            if stem:
                                   tokens.append (STEMMER.stem (token))
                            else:
                                   tokens.append (token)
              return "  ".join (tokens)

def process (self, element):
       processed_text = self.process_text (element [0])
       processed_sentiment = self.process_sentiment (element [1])
       tield f "{processed_text}, {processed_sentiment}"



class CustomCoder (Coder):
       " " " A custom coder used for reading and writing strings " " "

       def __init__ (self, encoding: str):
              #latin - 1
              #iso - 8859 - 1
              self.enconding = encoding

       def encode (self, value):
              return value.encode (self.enconding)
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
def run (argv = None, save_main_session = True):

       " " " Main entry point; defines and runs the wordcount pipeline. " " "

       parser = argparse.ArgumentParser ()

       parser.add_argument (
              "--work-dir", dest = "work_dir", required = True, help = "Working directory", 
       )

       parser.add_argument (
              "--input", dest = "input", required = True, help = "Input dataset in work dir", 
       )

       parser.add_argument (
              "--output", dest = "output", required = True, help = "Output path to store transformed data in work dir", 
       )

parser.add_argument (
              "--mode", dest = "mode", required = True, help = "Type of output to store transformed data", 
       )

       known_args, pipeline_args = parser.parse_known_args (argv)

#Preprocesamiento en análisis de sentimientos con dataset de Twitter
#We use the save_main_session option because one or more DoFn's in this workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions (pipeline_args)
pipeline_options.view_as (SetupOptions).save_main_session = save_main_session
pipeline_options.view_as (DirectOptions).direct_num_workers = 0

#The pipeline variable will be run on exiting the with block.
with beam.Pipeline (options = pipeline_options) as p:
       #Read the text file [pattern] into a PCollection.
       raw_data = p | "ReadTwitterData" >> ReadFromText (
              known_args.input, coder = CustomCoder ("latin-1")
       )


if known_args.mode == "train":
       transformed_data = (
              raw_data
                   |  "ExtractColumns" >> beam.ParDo (ExtractColumnsDoFn ())
                   |  "Preprocess" >> beam.ParDo (PreprocessColumnsTrainFn ())
)


eval_percent = 20
       assert 0 < eval_perept < 100, "eval_percent must in the range (0 - 100)"
       train_dataset, eval_dataset = (
              transformed_data
                     "Split dataset"
                >> beam.Partition (
                        lambda elem, _: int (random.uniform (0, 100) < eval_percent), 2
              )    
       )

       train_dataset | "TrainWriteToCSV" >> WriteToText (
              os.path.join (known_args.output, "train", "part")
          )
       eval_dataset | "EvalWriteToCSV" >> WriteToText (
              os.path.join (known_args.output, "eval", "part")
       )
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
if __name__ == "__main__":
       logging.getLogger ().setLevel (logging.INFO)
       run ()

¿Qué sigue?

Si quieres seguir aprendiendo, en KeepCoding te ofrecemos la posibilidad de formarte con grandes expertos, que te guiarán a través de los conocimientos teóricos y prácticos que, en unos pocos meses, te permitirán convertirte en un gran profesional del amplio y demandado sector IT. Échale un vistazo al temario de nuestro Big Data, Inteligencia Artificial & Machine Learning Full Stack Bootcamp y descubre esta formación de alta calidad e intensidad. ¡Solicita ahora mismo más información y da el salto que cambiará tu futuro!

¡CONVOCATORIA ABIERTA!

Big Data, IA & Machine Learning

Full Stack Bootcamp

Clases en Directo | Profesores en Activo | Temario 100% actualizado