Preprocesamiento en análisis de sentimientos con dataset de Twitter [Parte 2]

Autor: | Última modificación: 12 de abril de 2024 | Tiempo de Lectura: 3 minutos
Temas en este post: ,

Algunos de nuestros reconocimientos:

Premios KeepCoding

En este post haremos el preprocesamiento en análisis de sentimientos con dataset de Twitter. En una primera entrega hicimos la descarga del dataset y la carga de datos dentro del mismo ejercicio.

Una vez tenemos todo configurado y entendemos los principios de Apache Beam, vamos a comenzar con uno de los pilares principales es la automatización; por ello, vamos a automatizar las tres fases que vamos a tener: el preprocesamiento, el entrenamiento y la inferencia.

Preprocesamiento en análisis de sentimientos con dataset de Twitter

Previo al preprocesamiento

Para el preprocesamiento en análisis de sentimientos con dataset de Twitter vamos a utilizar Apache Beam sobre el dataset en bruto para paralelizar el procesamiento y que sea lo más eficiente posible:

#Preprocesamiento en análisis de sentimientos con dataset de Twitter
import os

#Set the working directory to the sample code directory
%cd ./twitter-sentiment-analysis-batch/

WORK_DIR = os.getcwd ()
Preprocesamiento en análisis de sentimientos con dataset de Twitter
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
! python3 preprocess.py \
--work -dir $WORK_DIR \
--runner DirectRunner \
--input $WORK_DIR/data/training*.csv \
--output $WORK_DIR/data/transformed_data \
--mode train
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
! python3 preprocess.py \
--work -dir $WORK_DIR \
--runner DirectRunner \
--input $WORK_DIR/data/test*.csv \
--output $WORK_DIR/data/transformed_data \
--mode test

Preprocesamiento

#Preprocesamiento en análisis de sentimientos con dataset de Twitter
#Importamos librerías
from __future__ import absolute_import

import argparse
import logging
import re
import os
import csv
import random

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download ("stopwords")

#CLEANING
STOP_WORDS = stopwords.words ("english")
STEMMER = SnowballStemmer ("english")
TEXT_CLEANING_RE = "@\S+ | https?: \S+ | http?: \S | [^A-Za-z0-9] + "


#Preprocesamiento en análisis de sentimientos con dataset de Twitter
class ExtractColumnsDoFn (beam.DoFn):
       def process (self, element):
              #space removal
              element_split = list (csv.reader (element, delimiter = " , "))
              element_split = [x for x in element_split if x != [" " , " "]]
#text, sentiment
yield element_split [5] [-1], element_split [0] [-1]


class PreprocessColumnsTrainFn (beam.DoFn):
       def process_sentiment (self, sentiment):
              sentiment = int (sentiment)
              if sentiment == 4:
                     return "POSITIVE"
              elif sentiment == 0:
                     return "NEGATIVE"
              else:
                     return "NEUTRAL"
       
       def process_text (self, text):
              #Remove link, user and special characters
              stem = False
              text = re.sub (TEXT_CLEANING_RE, "  ", str (text).lower ()).strip ()
              tokens = [  ]
              for token in text.split ():
                     if token not in STOP_WORDS:
                            if stem:
                                   tokens.append (STEMMER.stem (token))
                            else:
                                   tokens.append (token)
              return "  ".join (tokens)

def process (self, element):
       processed_text = self.process_text (element [0])
       processed_sentiment = self.process_sentiment (element [1])
       tield f "{processed_text}, {processed_sentiment}"



class CustomCoder (Coder):
       " " " A custom coder used for reading and writing strings " " "

       def __init__ (self, encoding: str):
              #latin - 1
              #iso - 8859 - 1
              self.enconding = encoding

       def encode (self, value):
              return value.encode (self.enconding)
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
def run (argv = None, save_main_session = True):

       " " " Main entry point; defines and runs the wordcount pipeline. " " "

       parser = argparse.ArgumentParser ()

       parser.add_argument (
              "--work-dir", dest = "work_dir", required = True, help = "Working directory", 
       )

       parser.add_argument (
              "--input", dest = "input", required = True, help = "Input dataset in work dir", 
       )

       parser.add_argument (
              "--output", dest = "output", required = True, help = "Output path to store transformed data in work dir", 
       )

parser.add_argument (
              "--mode", dest = "mode", required = True, help = "Type of output to store transformed data", 
       )

       known_args, pipeline_args = parser.parse_known_args (argv)

#Preprocesamiento en análisis de sentimientos con dataset de Twitter
#We use the save_main_session option because one or more DoFn's in this workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions (pipeline_args)
pipeline_options.view_as (SetupOptions).save_main_session = save_main_session
pipeline_options.view_as (DirectOptions).direct_num_workers = 0

#The pipeline variable will be run on exiting the with block.
with beam.Pipeline (options = pipeline_options) as p:
       #Read the text file [pattern] into a PCollection.
       raw_data = p | "ReadTwitterData" >> ReadFromText (
              known_args.input, coder = CustomCoder ("latin-1")
       )


if known_args.mode == "train":
       transformed_data = (
              raw_data
                   |  "ExtractColumns" >> beam.ParDo (ExtractColumnsDoFn ())
                   |  "Preprocess" >> beam.ParDo (PreprocessColumnsTrainFn ())
)


eval_percent = 20
       assert 0 < eval_perept < 100, "eval_percent must in the range (0 - 100)"
       train_dataset, eval_dataset = (
              transformed_data
                     "Split dataset"
                >> beam.Partition (
                        lambda elem, _: int (random.uniform (0, 100) < eval_percent), 2
              )    
       )

       train_dataset | "TrainWriteToCSV" >> WriteToText (
              os.path.join (known_args.output, "train", "part")
          )
       eval_dataset | "EvalWriteToCSV" >> WriteToText (
              os.path.join (known_args.output, "eval", "part")
       )
#Preprocesamiento en análisis de sentimientos con dataset de Twitter
if __name__ == "__main__":
       logging.getLogger ().setLevel (logging.INFO)
       run ()

¿Qué sigue?

Si quieres seguir aprendiendo, en KeepCoding te ofrecemos la posibilidad de formarte con grandes expertos, que te guiarán a través de los conocimientos teóricos y prácticos que, en unos pocos meses, te permitirán convertirte en un gran profesional del amplio y demandado sector IT. Échale un vistazo al temario de nuestro Big Data, Inteligencia Artificial & Machine Learning Full Stack Bootcamp y descubre esta formación de alta calidad e intensidad. ¡Solicita ahora mismo más información y da el salto que cambiará tu futuro!

¡CONVOCATORIA ABIERTA!

Big Data, IA & Machine Learning

Full Stack Bootcamp

Clases en Directo | Profesores en Activo | Temario 100% actualizado