En este tutorial, demostramos cómo construir un unificado Haz Apache canalización que funciona a la perfección tanto en modo por lotes como en modo secuencial utilizando DirectRunner. Generamos datos sintéticos que tienen en cuenta la hora del evento y aplicamos ventanas fijas con activadores y retrasos permitidos para demostrar cómo Apache Beam maneja consistentemente eventos puntuales y tardíos. Al cambiar solo la fuente de entrada, mantenemos idéntica la razonamiento de agregación central, lo que nos ayuda a comprender claramente cómo se comportan el maniquí de tiempo de eventos, las ventanas y los paneles de Beam sin servir de una infraestructura de transmisión externa. Mira el CÓDIGOS COMPLETOS aquí.
!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone
Instalamos las dependencias requeridas y aseguramos la compatibilidad de versiones para que Apache Beam. Importamos las API principales de Beam conexo con las ventanas, los activadores y las utilidades TestStream que se necesitarán más delante en el proceso. Todavía incorporamos módulos estereotipado de Python para el manejo del tiempo y el formato JSON. Mira el CÓDIGOS COMPLETOS aquí.
MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
def make_event(user_id, event_type, amount, event_time_epoch_s):
return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}
almohadilla = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(almohadilla.timestamp())
BATCH_EVENTS = (
make_event("u1", "purchase", 20, t0 + 5),
make_event("u1", "purchase", 15, t0 + 20),
make_event("u2", "purchase", 8, t0 + 35),
make_event("u1", "refund", -5, t0 + 62),
make_event("u2", "purchase", 12, t0 + 70),
make_event("u3", "purchase", 9, t0 + 75),
make_event("u2", "purchase", 3, t0 + 50),
)
Definimos la configuración general que controla el tamaño de la ventana, el retraso y el modo de ejecución. Creamos eventos sintéticos con marcas de tiempo explícitas para que el comportamiento de las ventanas sea determinista y acomodaticio de razonar. Preparamos un pequeño conjunto de datos que incluye intencionalmente eventos fuera de orden y tardíos para observar la semántica de tiempo de evento de Beam. Mira el CÓDIGOS COMPLETOS aquí.
def format_joined_record(kv):
user_id, d = kv
return {
"user_id": user_id,
"count": int(d("count")(0)) if d("count") else 0,
"sum_amount": float(d("sum_amount")(0)) if d("sum_amount") else 0.0,
}
class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e("event_time")))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e("user_id"), e("amount")))
counts = keyed | beam.combiners.Count.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{"count": counts, "sum_amount": sums}
| beam.CoGroupByKey()
| beam.Map(format_joined_record)
)
Construimos un Beam PTransform reutilizable que encapsula toda la razonamiento de agregación en ventanas. Aplicamos ventanas fijas, activadores y reglas de acumulación, luego agrupamos eventos por adjudicatario y calculamos recuentos y sumas. Mantenemos esta transformación independiente de la fuente de datos, por lo que se aplica la misma razonamiento tanto a las entradas por lotes como a las de streaming. Mira el CÓDIGOS COMPLETOS aquí.
class AddWindowInfo(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.start)
we = float(window.end)
yield {
**element,
"window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_first": pane_info.is_first,
"pane_is_last": pane_info.is_last,
}
def build_test_stream():
return (
TestStream()
.advance_watermark_to(t0)
.add_elements((
beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
))
.advance_processing_time(5)
.advance_watermark_to(t0 + 61)
.add_elements((
beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
))
.advance_processing_time(5)
.add_elements((
beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
))
.advance_watermark_to(t0 + 121)
.advance_watermark_to_infinity()
)
Enriquecemos cada registro asociado con metadatos de ventanas y paneles para que podamos ver claramente cuándo y por qué se emiten los resultados. Convertimos las marcas de tiempo internas de Beam en horas UTC legibles por humanos para veterano claridad. Todavía definimos un TestStream que simula el comportamiento de transmisión existente utilizando marcas de agua, avances en el tiempo de procesamiento y datos tardíos. Mira el CÓDIGOS COMPLETOS aquí.
def run_batch():
with beam.Pipeline(options=PipelineOptions(())) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
def run_stream():
opts = PipelineOptions(())
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(
p
| build_test_stream()
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
run_stream() if MODE == "stream" else run_batch()
Conectamos todo en canalizaciones ejecutables en forma de lotes y flujos. Alternamos entre modos cambiando un solo indicador mientras reutilizamos la misma transformación de agregación. Ejecutamos el proceso e imprimimos los resultados en ventana directamente, lo que hace que el flujo de ejecución y los resultados sean fáciles de inspeccionar.
En conclusión, demostramos que la misma canalización de Beam puede procesar tanto datos por lotes limitados como datos ilimitados similares a flujos, al tiempo que conserva una semántica de agregación y ventanas idéntica. Observamos cómo las marcas de agua, los activadores y los modos de acumulación influyen en el momento en que se emiten los resultados y cómo las actualizaciones tardías de datos computaban previamente las ventanas. Adicionalmente, nos centramos en las bases conceptuales del maniquí unificado de Beam, proporcionando una almohadilla sólida para luego subir el mismo diseño a corredores de transmisión y entornos de producción reales.
Mira el CÓDIGOS COMPLETOS aquí. Adicionalmente, no dudes en seguirnos en Gorjeo y no olvides unirte a nuestro SubReddit de más de 100.000 ml y suscríbete a nuestro boletín. ¡Esperar! estas en telegrama? Ahora igualmente puedes unirte a nosotros en Telegram.
Consulte nuestra última traducción de ai2025.devuna plataforma de examen centrada en 2025 que convierte los lanzamientos de modelos, los puntos de remisión y la actividad del ecosistema en un conjunto de datos estructurado que puede filtrar, comparar y exportar.
