Athrun Data Intelligence


Probé AutoCDC desde Snapshots en Python y me sorprendió cómo 4 líneas de código podían reemplazar lo que estaba haciendo antaño en 1500 líneas de código. — Ingeniero de datos sénior, empresa aeroespacial y de defensa Fortune 500

Cambiar captura de datos (CDC) y dimensiones que cambian lentamente (SCD) son fundamentales para las cargas de trabajo modernas de observación e inteligencia químico. Los equipos confían en ellos para sostener las tablas posteriores precisas a medida que cambian los datos operativos, ya sea que eso signifique sostener una paisaje presente del negocio o preservar el contexto histórico completo.

Sin retención, en la ejercicio, los oleoductos de los CDC suelen ser algunos de los más dolorosos de construir y ejecutar. Los equipos rutinariamente realizan tareas complejas MERGE Método para manejar actualizaciones, eliminaciones y datos que llegan tarde: capas de tablas provisionales, funciones de ventana y suposiciones de secuenciación sobre las que es difícil razonar y aún más difíciles de sostener a medida que evolucionan las canalizaciones.

En esta publicación, analizaremos los patrones CDC y SCD que los ingenieros de datos y los profesionales de SQL encuentran todos los días, por qué estos patrones son difíciles de implementar a mano y cómo AutoCDC en Lakeflow Spark Declarative Pipelines los automatiza de forma declarativa y, al mismo tiempo, ofrece mejoras significativas en precio y rendimiento.

CDC y SCD siguen siendo difíciles para los ingenieros de datos

Incluso para los equipos que entienden adecuadamente estos patrones, corregirlos y mantenerlos correctos a lo dadivoso del tiempo es donde las cosas fallan. A medida que crecen los volúmenes de datos y se expanden los casos de uso, los canales se vuelven frágiles; los problemas de corrección surgen tarde; e incluso los cambios pequeños requieren reescrituras cuidadosas para evitar dañar las tablas posteriores.

Mantenimiento de tablas SCD tipo 1

Las tablas SCD Tipo 1 sobrescriben las filas existentes para reverberar el estado más fresco. Incluso este caso “simple” rápidamente se topa con desafíos:

  • Las actualizaciones llegan desordenadas
  • Los eventos duplicados deben deduplicarse constantemente
  • Las eliminaciones deben aplicarse correctamente
  • La dialéctica debe seguir siendo idempotente en todos los reintentos y reprocesamiento.

Lo que muchas veces empieza como una simple MERGE INTO evoluciona en dirección a una dialéctica profundamente anidada con tablas de preparación, funciones de ventana y suposiciones de secuenciación sobre las que es difícil razonar (o cambiar de forma segura). Con el tiempo, los equipos se vuelven reacios a tocar estos conductos.

Mantenimiento del historial de SCD tipo 2

SCD Tipo 2 introduce una complejidad adicional:

  • Seguimiento de versiones de filas y ventanas de validez
  • Manejar las actualizaciones que llegan tarde sin corromper el historial
  • Asegurar que exista exactamente una lectura «presente» en cualquier momento

Los errores aquí no siempre fallan estrepitosamente. A menudo surgen semanas a posteriori como una sutil desviación métrica o la pobreza de recobrar las tablas históricas por completo.

Extirpación de datos de cambios de diferentes fuentes

No todos los sistemas emiten registros CDC limpios. Algunos sistemas emiten feeds de datos de cambios nativos, mientras que otros no lo hacen (a menudo porque el equipo que consume los datos no controla la saco de datos ascendiente), lo que obliga a los equipos a recobrar los cambios comparando instantáneas sucesivas de una tabla de origen.

Convenir los dos normalmente significa dialéctica de ingesta y procesamiento separada; diferentes supuestos de corrección; y más rutas de código para sostener y depurar.

Operando oleoductos CDC a lo dadivoso del tiempo

Incluso una vez que una canalización de CDC es correcta, todavía tiene que sobrevivir al reprocesamiento y las reposiciones, la desarrollo del esquema, las fallas y los reinicios. La dialéctica CDC hecha a mano tiende a volverse más frágil con el tiempo a medida que estas realidades se acumulan, lo que aumenta el peligro eficaz y los costos de mantenimiento.

Automatización de patrones CDC complejos con ingeniería de datos declarativa

AutoCDC fue diseñado para estandarizar estos patrones CDC y SCD comunes detrás de una noción declarativa. En extensión de codificar a mano cómo Se deben aplicar cambios, declaran los equipos. que semántica quieren, y la plataforma gestiona los pedidos, el estado y el procesamiento incremental.

Carga de trabajo de los CDC AutoCDC MERGE / Método de instantáneas escritas a mano
Mantenimiento de tablas de estado presente (SCD tipo 1) La definición de canalización declarativa maneja automáticamente la secuenciación, la deduplicación y las eliminaciones. Método MERGE personalizada con funciones de ventana y reglas de secuenciación
Mantenimiento de tablas históricas (SCD Tipo 2) Dirección cibernética de versiones con seguimiento del historial integrado Método MERGE de varios pasos para cerrar e insertar versiones de registros
Inferir cambios a partir de fuentes de instantáneas Compatibilidad con CDC de instantáneas incorporada Canalizaciones de diferencias de instantáneas manuales con uniones y comparaciones
Intervenir tuberías de forma confiable a lo dadivoso del tiempo (datos tardíos, reintentos, reprocesamiento) Ordenamiento maquinal y ejecución idempotente. Requiere salvaguardias personalizadas y dialéctica adicional
Huella del código y complejidad operativa ~6 a 10 líneas de definición de canalización declarativa Entre 40 y 200 líneas de dialéctica de canalización personalizada

Esto brinda a los equipos una forma consistente y repetible de implementar CDC y SCD en todos los procesos, en extensión de reinventar el patrón cada vez (que es verdaderamente el valencia central de la programación declarativa en normal, y Spark Declarative Pipelines específicamente).

Al procesar registros de cambios de una fuente de datos de cambios (CDF), AutoCDC maneja automáticamente registros fuera de secuencia y aplica actualizaciones correctamente según una columna de secuencia declarada. Para mostrar cómo funciona esto en la ejercicio, consideremos el venidero feed de CDC de muestra:

ID de sucesor nombre ciudad operación número de secuencia
124 Raúl Oaxaca INSERTAR 1
123 Isabel Monterrey INSERTAR 1
125 mercedes Tijuana INSERTAR 2
126 Lirio Cancún INSERTAR 2
123 incapaz incapaz BORRAR 6
125 mercedes Guadalajara ACTUALIZAR 6
125 mercedes México ACTUALIZAR 5
123 Isabel chihuahua ACTUALIZAR 5

Recuerde, debe designar SCD Tipo 1 para conservar solo los datos más recientes, o designar SCD Tipo 2 para conservar datos históricos. Comencemos con el Tipo 1.

Automatización del mantenimiento de SCD tipo 1 (cambiar fuentes de provisiones de datos)

En este ejemplo, una fuente de datos modificados contiene inserciones, actualizaciones y eliminaciones para una tabla de usuarios. El objetivo es sostener un paisaje presente de cada registrodonde las nuevas actualizaciones sobrescriben los títulos más antiguos.

Tabla de salida para SCD Tipo 1

identificación nombre ciudad
124 Raúl Oaxaca
125 mercedes Guadalajara
126 Lirio Cancún

El sucesor 123 (Isabel) fue eliminado, por lo que no aparece en el resultado. El sucesor 125 (Mercedes) muestra solo la última ciudad (Guadalajara) porque el SCD Tipo 1 sobrescribe los títulos anteriores.

Con un enfoque tradicional, esto requiere personalización. MERGE dialéctica para deduplicar eventos, aplicar ordenamientos, aplicar eliminaciones y certificar que la canalización permanezca correcta en todos los reintentos o datos que lleguen tarde. AutoCDC reemplaza esta dialéctica frágil con una definición de canalización declarativa que maneja automáticamente la secuenciación, la deduplicación, los datos que llegan tarde y el procesamiento incremental, eliminando docenas de líneas de dialéctica de combinación personalizada.

Ver ejemplo de código completo en apéndice

Automatización del historial de SCD tipo 2 (cambiar fuentes de provisiones de datos)

En muchos sistemas analíticos, sostener sólo el estado más fresco no es suficiente: los equipos necesitan una completa Historia de cómo los registros cambian con el tiempo.. Este es el patrón SCD Tipo 2, donde cada lectura de un registro se almacena con ventanas de validez que indican cuándo estuvo activo.

Tabla de salida para SCD tipo 2:

identificación nombre ciudad __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel chihuahua 5 6
124 Raúl Oaxaca 1 NULO
125 mercedes Tijuana 2 5
125 mercedes México 5 6
125 mercedes Guadalajara 6 NULO
126 Lirio Cancún 2 NULO

La tabla conserva la historia completa. El sucesor 123 tiene dos versiones (terminó en la secuencia 6 cuando se eliminó). El sucesor 125 tiene tres versiones que muestran cambios de ciudad. Registros con __END_AT = NULL están actualmente activos.

Implementar esto manualmente requiere varios pasos MERGE dialéctica para cerrar registros anteriores, insertar nuevas versiones y certificar que solo una lectura permanezca activa a la vez. AutoCDC automatiza estas transiciones de forma declarativa, administrando automáticamente las columnas del historial y la dialéctica de versiones, al tiempo que garantiza la corrección incluso cuando las actualizaciones llegan desordenadas.

Ver ejemplo de código completo en apéndice

Inferir CDC a partir de fuentes de instantáneas

No todos los sistemas fuente emiten registros de cambios. En muchos casos, los equipos reciben instantáneas periódicas de una tabla fuente y debe inferir qué cambió entre ejecuciones.

Tradicionalmente, esto requiere comparar instantáneas manualmente para detectar inserciones, actualizaciones y eliminaciones antaño de aplicar esos cambios con la dialéctica MERGE. AutoCDC manejo el CDC basado en instantáneas como un patrón de primera clase, detecta automáticamente cambios a nivel de fila entre instantáneas y los aplica de forma incremental sin requerir dialéctica de diferencias personalizada o compañía de estado.

Implementar esto manualmente requiere detectar cambios a nivel de fila entre instantáneas, cerrar registros previamente activos e insertar nuevas versiones con ventanas de validez actualizadas. AutoCDC deriva automáticamente estos cambios y aplica la semántica SCD Tipo 2, manteniendo el historial de versiones sin requerir una dialéctica de combinación de varios pasos o un seguimiento personalizado del estado de las instantáneas.

Dirección de pedidos, estado y reprocesamiento.

Tuberías declarativas de Lakeflow Spark rastrea automáticamente el progreso incremental y maneja datos fuera de secuencia. Las canalizaciones pueden recuperarse de fallas, reprocesar datos históricos y cambiar con el tiempo sin aplicar dos veces ni perder cambios.

En la ejercicio, esto elimina la pobreza de que los equipos gestionen ellos mismos la dialéctica de secuenciación, la contabilidad de marcas de agua o la seguridad del reprocesamiento: la plataforma se encarga de ello.

Novedades: importantes mejoras en precio y rendimiento

Más allá de simplificar la dialéctica de la canalización, las mejoras recientes de Databricks Runtime han generado ganancias sustanciales tanto en el rendimiento como en la rentabilidad para las cargas de trabajo de AutoCDC, solo desde noviembre de 2025:

  • SCD tipo 1
    • ~22 % de progreso en la latencia
    • ~40% de reducción en el costo
    • ~71 % de beneficio neto en relación precio-rendimiento
  • SCD tipo 2
    • ~45 % de reducción en la latencia
    • ~35% de reducción en el costo de las actualizaciones incrementales
    • ~96 % de beneficio neto en relación precio-rendimiento

Estas ganancias son importantes para los oleoductos del mundo existente que funcionan continuamente a escalera. Mientras MERGE INTO sigue siendo una primitiva fundamental de Spark, AutoCDC se pedestal en ella para manejar datos fuera de secuencia y el procesamiento incremental de forma más capaz a medida que crecen los volúmenes de datos.

Éxito del cliente con AutoCDC

Los equipos que ejecutan canalizaciones CDC y SCD en producción han citado explícitamente a AutoCDC por ofrecer un valencia significativo:

Navy Federal Credit Union utiliza AutoCDC en Lakeflow Spark Declarative Pipelines para impulsar el procesamiento de eventos a gran escalera en tiempo existente, manejando miles de millones de eventos de aplicaciones de forma continua y eliminando el código CDC personalizado y el mantenimiento continuo de los canales.

La simplicidad del maniquí de programación Spark Declarative Pipelines combinada con sus capacidades de servicio dio como resultado un tiempo de respuesta increíblemente rápido. — Jian (Miracle) Zhou, administrador sénior de ingeniería, Navy Federal Credit Union

Block utiliza AutoCDC en Lakeflow Spark Declarative Pipelines para simplificar la captura de datos de cambios y las canalizaciones de transmisión en tiempo existente en Delta Lake, reemplazando el CDC codificado a mano y la dialéctica de fusión con un enfoque declarativo que es rápido de implementar y posible de ejecutar.

Con la apadrinamiento de Spark Declarative Pipelines, el tiempo necesario para concretar y desarrollar un canal de transmisión ha pasado de días a horas. — Yue Zhang, ingeniero de software, bases de datos, Block

Valora Group, un proveedor líder de “foodvenience” con sede en Suiza, utiliza AutoCDC en Lakeflow Spark Declarative Pipelines para avivar la captura de datos de cambios para datos maestros y observación minoristas en tiempo existente, reemplazando el código CDC personalizado con un enfoque declarativo que es posible de implementar, repetir y medrar en todos los equipos.

Ganamos mucho al hacer CDC en SDP, porque no se escribe ningún código: todo está resumido en segundo plano. AutoCDC minimiza el número de líneas… es muy posible de hacer. — Alexane Rose, arquitecta de datos e inteligencia químico, Valora Holding

emprender

AutoCDC está adecuado como parte de Tuberías declarativas de Lakeflow Spark en ladrillos de datos.

Para conocer más:

Pruebe AutoCDC en sus propios canales y elimine la dialéctica CDC hecha a mano.

Apéndice

Ejemplo de SCD tipo 1

UNIR AutoCDC
from delta.tables import DeltaTable
from pyspark.sql.functions import max_by, struct

# Deduplicate: keep latest record per userId
updates = (spark.read.table("cdc_data.users")
    .groupBy("userId")
    .agg(max_by(struct("*"), "sequenceNum").apelativo("row"))
    .select("row.*"))

# Apply SCD Type 1: upsert updates, delete deletions
(DeltaTable.forName(spark, "target")
    .apelativo("t")
    .merge(updates.apelativo("s"), "s.userId = t.userId")

    .whenMatchedDelete(condition="s.operation = 'DELETE'")
    .whenMatchedUpdate(
        condition="s.sequenceNum > t.sequenceNum",
        set={"name": "s.name", "city": "s.city", "sequenceNum": "s.sequenceNum"}
    )
    .whenNotMatchedInsertAll(condition="s.operation != 'DELETE'")
    .execute())
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
    return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
    target="target",
    source="users",
    keys=("userId"),
    sequence_by=col("sequenceNum"),
    apply_as_deletes=expr("operation = 'DELETE'"),
    stored_as_scd_type=1
)

Ejemplo de SCD tipo 2

UNIR AutoCDC
from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit, max_by, struct

# Deduplicate: keep latest record per userId
updates = (spark.read.table("cdc_data.users")
    .groupBy("userId")
    .agg(max_by(struct("*"), "sequenceNum").apelativo("row"))
    .select("row.*"))

# Step 1: close out active rows for records being updated or deleted
(DeltaTable.forName(spark, "target")
    .apelativo("t")
    .merge(
        updates.apelativo("s"),
        "s.userId = t.userId AND t.__END_AT IS NULL AND s.sequenceNum > t.__START_AT"
    )
    
    .whenMatchedUpdate(set={"__END_AT": "s.sequenceNum"})
    .execute())

# Step 2: insert new rows for inserts and updates (not deletes)
new_rows = (updates
    .filter("operation != 'DELETE'")
    .withColumn("__START_AT", col("sequenceNum"))
    .withColumn("__END_AT", lit(None).cast("long"))
    .drop("operation"))

new_rows.write.mode("append").saveAsTable("target")
                    
dp.create_auto_cdc_flow(
    target="target",
    source="users",
    keys=("userId"),
    sequence_by=col("sequenceNum"),
    apply_as_deletes=expr("operation = 'DELETE'"),
    stored_as_scd_type=2
)

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *