Crear un suscriptor con Dataflow Streaming

| Última modificación: 9 de abril de 2024 | Tiempo de Lectura: 4 minutos

Algunos de nuestros reconocimientos:

Premios KeepCoding

En este artículo te explicaremos cómo crear un suscriptor con Dataflow Streaming.

En un ejercicio anterior vimos cómo desarrollar un modelo en streaming. El paso siguiente para seguir con el ejercicio de análisis de sentimientos en Twitter es crear un suscriptor con Dataflow Streaming. Veamos cómo hacerlo.

Crear un suscriptor con Dataflow Streaming

Una vez tenemos nuestro publicador recogiendo tweets y hemos creado el topic en el que leerlos, pasaremos a la siguiente fase, que es consumirlos para generar predicciones.

Esta tarea la vamos a subdividir en dos fases:

  1. Job de DataFlow en streaming para recoger los mensajes, mandarlos a inferir, recoger la respuesta y mandarla a un destino. En este caso, la mandaremos a otro tópico de Pub/Sub.
  2. Desplegar el servicio de inferencia online para realizar predicciones en tiempo real solicitadas por el job de streaming de Dataflow.

Para la creación del job nos basaremos en un código que ya habíamos escrito. Cambiaremos la fuente y entrada para que sea capaz de leer del topic y la fuente de salida.

Entonces, lo que haremos será crear un pipeline de predicción, en específico, vamos a crear un suscriptor con Dataflow Streaming.

Por un lado, tendremos un job de Dataflow. Para la predicción, el job lo que hará es invocar un microservicio que debe estar previamente hecho en Cloud Run.

#Crear un suscriptor con Dataflow Streaming
$mkdir /content/subscriber
#Crear un suscriptor con Dataflow Streaming
%cd /content/subscriber
Crear un suscriptor con DataFlow Streaming
#Crear un suscriptor con Dataflow Streaming
%%writefile requirements.txt

apache-beam [gcp]
fsspec
gcsfs
loguru
#Crear un suscriptor con Dataflow Streaming
!pip install --force - reinstall -r requirements.txt

La estructura es simple: tenemos nuestro main, también tenemos la función de run y el predict, que es el transformador para hacer las predicciones:

#Crear un suscriptor con Dataflow Streaming
## %%writefile predict.py

from __future__ import absolute_import
from __future__ import print_function

import argparse
import requests
import json
import sys

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import (
      GoogleCloudOptions,
      StandardOptions,
      PipelineOptions,
      SetupOptions 
)
from loguru import logger

class Predict (beam.DoFn):
      def __init__ (self, predict_server) -> None:
            self.url = predict_server

      def _predict (self, text) -> str:
            payload = {"text": text}
            headers = {"accept": "application / json", "Content - Type": "applicatoin / json"}
            try:
                  response = requests.post (
                        self.url, data = json.dumps (payload), headers = headers
                  )
                  response = json.loads (response.text)
            except Exception:
                  response = {"label", "undefined", "score": 0, "elapsed_time" : 0}

            return response

      def process (self, element, window = beam.DoFn.WindowParam):
            logger.info (f"Text to predict: {element}")
            result = self._predict (element)
            resiñt ["text"] = element
      yield json.dumps (result)

def run (predict_server, source, sink, beam_options = None):
      with beam.Pipeline (options = beam_options) as p:
            _ = (
                  P
                  |  "Read data from PubSub" >> source
                  |  "decode" >> beam.Map (lambda x: x.decode ("utf - 8"))
                  |  "window" >> beam.WindowInto (window.FixedWindows (15))
                  |  "Predict" >> beam.ParDo (Predict (predict_server))
                  |  "encode" >> beam.Map (lambda x: x.encode ("utf - 8")).with_output_types (bytes)
                  |  "Write predictions" >> sink
            )
if __name__ == "__main__":
      " " " Main function " " "
      parser = argparse.ArgumentParser (
            formatter_class = argparse.ArgumentDefaultsHelpFormatter
      )

      parser.add_argument (
            "-- inputs_topic",
            dest = "inputs_topic",
            required = True,
            help = "Directory for temporary files and preprocessed datasets to."
            "This can be a Google Cloud Storage path"
      )

      parser.add_argument (
            "-- outputs_topic",
            dest = "outputs_topic",
            required = True,
            help = "Directory for temporary files and preprocessed datasets to."
            "This can be a Google Cloud Storage path"
      )

      parser.add_argument (
            "-- predict_server",
            dest = "inputs_topic",
            required = True,
            help = "Directory for temporary files and preprocessed datasets to."
            "This can be a Google Cloud Storage path"
      )

      args, pipeline_args = parser.parse_known_args ()
      logger.info (args)
      beam_options = PipelineOptions (pipeline_args ()
      beam_options.view_as (SetupOptions).save_main_session = True
      #beam_options.view_as (DirectOptions). direct_num_workers = 0

      project = beam_options.view_as (GoogleCloudOptions).project

      if not project:
            parser.print_usage ()
            print ("error: argument --project is required for streaming")
            sys.exit (1)

      beam_options.view.as (StandardOptions).streaming = True

      source = beam.io.ReadFromPubSub (
            topic = "projects/{ }/topics/{ }".format (project, args.inputs_topic)
      ).with_output_types (bytes)

      sink = beam.ioWriteToPubSub (
            topic = "projects/{ }/topics/{ }".format (project, args.outputs_topic)
      )

run (args.predict_server, source, sink, beam_options)
#Crear un suscriptor con Dataflow Streaming
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
      "apache-beam [gcp]",
      "fsspec",
      "gcsfs",
      "loguru",
]

setuptools.setup (
      name = "twitterstreaming",
      version = "0.0.1",
      install_requires = REQUIRED_PACKAGES,
      packages = setuptools.find_packages (),
      include_package_data = True,
      description = "Twitter Sentiment Analysis Streaming",
)

Finalmente, una vez tenemos nuestro código listo y el servicio de inferencia desplegado, procedemos a crear el job:

#Crear un suscriptor con DataFlow Streaming
! python3 predict.py \
--project $PROJECT_ID \
--region $REGION \
--runner DataflowRunner \
--temp_location gs: //$BUCKET_NAME/twitter-sentiment-batch/beam-temp \
--setup_file ./setup.py \
--inputs_topic tweets \
--outputs_topic tweets - predictions \
--predict_server https://sentiment-analysis-server-5i3whdmtog-ew.a.run.app/api/model/predict \

¿Qué sigue?

Si tu meta es seguir aprendiendo acerca de cómo crear un suscriptor con Dataflow Streaming y quieres seguir formándote en alguna de las numerosas temáticas del mundo del Big Data y aprender sobre despliegue de un modelo en streaming y muchos aspectos más para destacar en este demandado mercado laboral, no puedes perderte el Big Data, Inteligencia Artificial & Machine Learning Full Stack Bootcamp. Aquí podrás, en pocos meses, transformarte en un gran profesional IT. ¡Solicita ahora mismo más información y da el salto que impulsará tu futuro!

Sandra Navarro

Business Intelligence & Big Data Advisor & Coordinadora del Bootcamp en Data Science, Big Data & Machine Learning.

Posts más leídos

¡CONVOCATORIA ABIERTA!

Big Data, IA & Machine Learning

Full Stack Bootcamp

Clases en Directo | Profesores en Activo | Temario 100% actualizado