Apache Hudi: Escrituras de alto rendimiento para flujos de datos incrementales en data lakes

·

·

, , ,

En el anterior blog, conocimos Deltalake, un formato de lakehouse basado en un log transaccional para brindar garantías ACID sobre almacenes de datos immutables. En este blog, introducimos una alternativa a Deltalake: Apache Hudi. Hudi (Hadoop Upserts Deletes and Incrementals) es un framework de código abierto que habilita capacidades transaccional para data lakes, almacenes de datos inmutables a gran escala, como Apache Hadoop o almacenamiento de objetos en la nube (como Amazon S3). Desarrollado originalmente en Uber en 2016, Hudi fue creado para satisfacer los exigentes requisitos de ingestión y gestión de datos analíticos a escala de petabytes en tiempo casi real, con una fuerte consistencia de datos y soporte para consultas de baja latencia.

Anteriormente a Hudi, las arquitecturas tradicionales de lagos de datos eran fundamentalmente de solo anexado y orientadas a procesamiento por lotes. Aunque este modelo funcionaba para algunas cargas de trabajo analíticas, presentaba desafíos importantes para casos de uso modernos, particularmente en entornos que requerían:

  • Ingesta incremental de datos: Procesar eficientemente solo los registros nuevos o modificados. sin reprocesar los conjuntos de datos completos.
  • Actualizaciones y eliminaciones a nivel de registro: Facilitar el cumplimiento del GDPR, corregir problemas de calidad de datos o reflejar cambios del mundo real (por ejemplo, cambios de estado en transacciones).
  • Unificación de streaming y batch: Simplificar los flujos de datos al admitir ambos paradigmas en el mismo formato de almacenamiento.
  • Frescura de los datos y latencia: Hacer que los datos recientes estén disponibles para consultas rápidamente, a menudo dentro de los minutos posteriores a su llegada.

Estos desafíos eran especialmente críticos en Uber, donde las operaciones impulsadas por datos requerían un lago de datos constantemente actualizado, preciso y con actualizaciones rápidas. Las herramientas existentes como Hive y las primeras versiones de Spark carecían de los elementos necesarios para manejar mutaciones a nivel de fila o para indexar y rastrear eficientemente los cambios dentro de conjuntos de datos masivos.

Para abordar estos problemas, Hudi introdujo innovaciones clave en la arquitectura como una capa de indexación basada en metadatos, logs de escritura anticipada para escrituras transaccionales, y soporte para diferentes tipos de tablas optimizadas para consultas rápidas (Copy-On-Write) o para alto rendimiento de escritura (Merge-On-Read). Hudi, en esencia, aportó gestión de registros al estilo de bases de datos y semánticas de transacciones a la escala y flexibilidad de un lago de datos.

El formato de Hudi

Comenzemos aprendiendo cómo Hudi organiza los datos a nivel de almacenamiento. Cada tabla de Hudi contiene un subdirectorio especial llamado .hoodie/ en su raíz. Este actúa como el plano de control de metadatos de la tabla y sirve como fuente de verdad para su estado transaccional. Un ejemplo de cómo se vería el directorio .hoodie en una tabla sería el siguiente:

.hoodie/
├── 20240628010101.commit
├── 20240629010202.deltacommit
├── 20240630010303.deltacommit
├── 20240701010404.compaction.requested
├── 20240701010404.compaction.inflight
├── 20240701010404.compaction
├── 20240701010505.clean
├── 20240701010606.savepoint
├── auxiliary/
│   └── .index/
│       └── ...
├── .hoodie.properties
├── hoodie.table

Transacciones sobre la tabla (.commit, .inflight, .requested)

Hudi mantiene una línea de tiempo de todas las transacciones realizadas en la tabla: commits (añadir, modificar o eliminar filas), compactaciones (juntar varios ficheros pequeños en uno mas grande), limpiezas (eliminar ficheros obsoletos), o rollbacks (restaurar el estado de la tabla a una versión anterior). Estas acciones se registran mediante archivos de metadatos como:

  • <timestamp>.commit o <timestamp>.deltacommit: Representan operaciones de escritura exitosas. Los ficheros .commit se usan en tablas tipo Copy-On-Write, mientras que los ficheros .deltacommit se usan en tablas Merge-On-Read. Más ademante veremos qué significan estos términos.
  • <timestamp>.inflight: Denotan operaciones en curso aún no completadas.
  • <timestamp>.requested: Representan acciones programadas pendientes de ejecución.

Cada uno de estos archivos contiene información como el tipo de operación, rutas de partición afectadas, estadísticas y los identificadores de archivos involucrados. Más adelante veremos qué utilidad tiene el <timestamp> en el nombre de los ficheros.

Dentro de un archivo .commit en Apache Hudi

Un archivo .commit es un archivo de metadatos codificado en JSON que registra los detalles de una operación de escritura exitosa — incluyendo qué se escribió, dónde, y cómo afecta la estructura de la tabla.

Estos archivos se almacenan en el directorio .hoodie/ y se nombran utilizando una marca de tiempo como instantáneo, por ejemplo: 20240630010303.commit.

Un ejemplo de contenido simplificado de un archivo .commit sería el siguiente:

{
  "commitTime": "20240630010303",
  "operationType": "UPSERT",
  "partitionToWriteStats": {
    "year=2025/month=06/day=30": [
      {
        "fileId": "f1c7b1a3-3a84-4c9e-b3a0-cafdc8c2cb3e",
        "path": "year=2025/month=06/day=30/f1c7b1a3_20240630010303.parquet",
        "prevCommit": "20240628010101",
        "numWrites": 5000,
        "numDeletes": 200,
        "totalLogRecords": 0,
        "totalWriteBytes": 2048000,
        "fileSizeInBytes": 2048000,
        "minRecordKey": "user_001",
        "maxRecordKey": "user_999"
      }
    ]
  },
  "totalWriteStats": {
    "totalRecords": 5000,
    "totalUpdateRecords": 200,
    "totalBytesWritten": 2048000
  },
  "metadata": {
    "sparkVersion": "3.5.0",
    "hudiVersion": "0.14.0",
    "operation": "UPSERT",
    "schema": "{\"type\":\"record\",\"name\":\"user_event\",...}"
  }
}

Los campos clave incluyen commitTime (un identificador único basado en marca de tiempo), operationType (por ejemplo, INSERT o UPSERT), y partitionToWriteStats, que enumera cada partición escrita junto con estadísticas como el fileId, la ruta del archivo, número de registros escritos o eliminados, tamaño del archivo, y las claves mínimas/máximas de los registros.

También se incluye una instantánea del esquema de datos, el prevCommit para el seguimiento de versiones, y métricas agregadas como totalRecords y totalBytesWritten. Estos metadatos permiten la consistencia transaccional de Hudi, así como funciones de reversión, indexación y consultas incrementales.

Merge-On-Read y Copy-On-Write: Dos tipos de tablas, dos enfoques

Apache Hudi admite dos tipos de tablas: Copy-On-Write (COW) y Merge-On-Read (MOR) — cada uno optimizado para distintos tipos de cargas de trabajo y casos de uso. En esencia, la diferencia radica en cómo se manejan las actualizaciones y eliminaciones en almacenamiento inmutable, y en qué momento los cambios están disponibles para consulta.

Copy-On-Write (COW)

En el modelo Copy-On-Write, cada operación de escritura (INSERT, UPSERT, DELETE) produce nuevos archivos de datos (normalmente en formato Parquet), y las versiones anteriores se marcan como obsoletas. Estas escrituras son atómicas y consistentes — los consumidores que leen la tabla verán solo escrituras totalmente materializados.

COW está optimizado para lectura, ya que todos los datos están en formato columnar (Parquet), lo que permite consultas rápidas sin necesidad de fusionar registros. Sin embargo, esto es costoso en escritura. Las actualizaciones y eliminaciones provocan reescrituras completas de los archivos afectados. El mejor uso de tablas COW es para cargas analíticas con pocas actualizaciones (ej. dashboards, informes por lotes), o escenarios donde la velocidad de consulta y la simplicidad son más importantes que la eficiencia en escritura.

Merge-On-Read (MOR)

El modelo Merge-On-Read aplaza el coste de reescribir datos escribiendo los cambios en archivos de registro (.log.), mientras que los archivos base existentes permanecen intactos. Estos archivos de registro luego se fusionan con los archivos base, ya sea al leer (en tiempo de consulta) o mediante un trabajo de compactación en segundo plano.

Las tablas MOR están optimizadas para escritura. Los cambios se agregan rápidamente a los registros, evitando reescrituras costosas. No obstante, esto afecta negativamente al rendimiento de las lecturas bajo demanda: las consultas deben fusionar archivos base y registros. Para disminuir el numero de fusiones de ficheros .log., se requiere compactar periódicamente la tabla para convertir los registros .log. en nuevos archivos base. El mejor uso de tablas MOR es para ingesta de datos de alto rendimiento (ej. flujos de eventos, pipelines CDC), casos que necesitan datos actualizados con latencia mínima en escritura, o casos casi en tiempo real donde la latencia es crítica y se tolera menor rendimiento en consulta.

Comportamiento de los metadatos para tablas MOR:

Las escrituras generan archivos *.deltacommit en lugar de *.commit. Cada grupo de archivos consiste en un archivo base (opcional) y uno o más archivos log. En lugar de un fichero .parquet en el campo “path” de un fichero *.commit, para ficheros *.deltacommit encontraríamos un path de un fichero log: region=us/state=ny/5df98c2b_20250630123000.log.1. A diferencia de los archivos .parquet, los archivos de registro de Hudi utilizan un formato binario personalizado llamado Hoodie Log Format, y no están codificados en JSON legible por humanos. Sin embargo, es posible describir su estructura y ofrecer un desglose conceptual simplificado.

Indexación en tablas Hudi

Al realizar operaciones como UPSERT o DELETE, Hudi necesita saber qué archivos contienen qué registros. Sin ayuda, esto implicaría escanear miles de footers de archivos Parquet o directorios completos — lo cual resulta especialmente costoso en almacenamientos en la nube como Amazon S3.

Apache Hudi resuelve esto con su tabla de metadatos, un sistema de indexación incorporado que rastrea dónde reside cada archivo y qué rango de claves de registro contiene cada archivo. Esto permite a Hudi omitir por completo el escaneo de archivos durante operaciones de UPSERT y DELETE.

Concepto de Grupo de Archivos (File Group)

Para comprender cómo funciona esto en la práctica, primero hay que introducir el concepto de grupo de archivos. En Hudi, los datos no se gestionan como archivos Parquet o registros aislados, sino como unidades lógicas llamadas file groups.

Un grupo de archivos representa todas las versiones de un fragmento de conjunto de datos a lo largo del tiempo — normalmente compuesto por un archivo base (en formato Parquet) y opcionalmente uno o más archivos de log (utilizados en tablas MOR). Cada grupo se identifica de forma única mediante un file ID y se asocia a una partición específica. Todas las operaciones de INSERT, UPDATE y DELETE dirigidas a una porción específica del conjunto de datos se aplican al grupo de archivos correspondiente, el cual evoluciona de forma incremental con el tiempo.

La Tabla de Metadatos

Para evitar escanear archivos completos, Hudi utiliza una tabla interna especial — la tabla de metadatos — que mantiene un seguimiento de 1. Qué particiones existen, 2. Qué archivos (file ID) hay en cada partición y 3. Qué rangos de claves de registros o filtros Bloom están presentes en esos archivos.

Entonces, cuando un usuario envía una actualización — por ejemplo:

UPDATE users SET email = 'new@domain.com' WHERE user_id = 'user_123';

Hudi no abre cada archivo del conjunto de datos para buscar el usuario user_123. En su lugar, consulta la tabla de metadatos para identificar rápidamente qué grupo(s) de archivos podrían contener a user_123. La tabla de metadatos puede hacer esta determinación utilizando resúmenes livianos como rangos de claves, filtros Bloom u otras estadísticas auxiliares almacenadas como parte de cada escritura.

Transacciones ACID en Apache Hudi: La Hudi Timeline

Una de las características de Apache Hudi es su capacidad para ofrecer garantías ACID sobre almacenes de objetos distribuidos y eventualmente consistentes como Amazon S3. Hudi lo logra mediante un protocolo transaccional diseñado que se basa en dos bloques fundamentales: el mecanismo de línea de tiempo (timeline) y un concepto inspirado en TrueTime de Google — un sistema de marcas de tiempo monotónicas y sincronizadas de forma laxa.

Timeline: El registro de transacciones de Hudi

En el núcleo del modelo ACID de Hudi se encuentra la línea de tiempo (o timeline), almacenada como una serie de pequeños archivos de metadatos en el directorio .hoodie de cada tabla. Como hemos visto antes, cada cambio en el conjunto de datos — inserción, actualización, compactación, limpieza, reversión, agrupamiento, etc. — se captura como un timeline instant, una acción puntual en el tiempo representada por un identificador único y ordenable: el instant time.

Este instant time suele tener el formato yyyyMMddHHmmssSSS, actuando como identificador de transacción y como marcador de instantánea. Cada transacción tiene un estado, como .requested, .inflight o .committed. Juntos, estos archivos conforman una línea de tiempo versionada y de solo anexado — similar al registro de transacciones en una base de datos relacional.

La línea de tiempo garantiza aislamiento y visibilidad atómica al exponer únicamente operaciones completas (committed) a los lectores. Hasta que un archivo .commit o .deltacommit se escribe por completo y se marca como exitoso, ese instante se considera “inflight” y es ignorado por los lectores. Así se asegura que nunca se exponga información parcial, cumpliendo con el aislamiento de tipo read-committed.

Ordenamiento tipo TrueTime: Monotónico y sincronizado

En sistemas distribuidos, especialmente sobre almacenes de objetos como Amazon S3, confiar en la sincronización de relojes entre nodos puede ser arriesgado. Para garantizar un orden total de las operaciones y evitar condiciones de carrera, Hudi utiliza un enfoque similar a TrueTime de Google. Aunque no requiere relojes estrictamente sincronizados, Hudi exige que todos los escritores asignen instant times estrictamente incrementales que actúan como marcas de tiempo lógicas.

Cada cliente de escritura (como un ejecutor de Spark o una tarea de Flink) genera un instant time antes de iniciar una operación. El servicio de línea de tiempo de Hudi (o el mecanismo de coordinación) garantiza que solo una operación por instante puede ejecutarse, y que los instantes aumentan de forma monótonica.

Esta combinación de línea de tiempo centralizada e instantes ordenados globalmente elimina conflictos de escritura y asegura consistencia transaccional incluso cuando múltiples trabajos modifican el conjunto de datos al mismo tiempo.

En conjunto: ACID sobre almacenes de objetos

Así es como Hudi garantiza cada propiedad de ACID utilizando la línea de tiempo y TrueTime:

  • Atomicidad: Una escritura solo se considera completa si su archivo .commit o .deltacommit está totalmente escrito. Si un trabajo falla a mitad de camino, Hudi deja un marcador .inflight y puede revertir o reintentar de forma segura.
  • Consistencia: Los lectores solo ven una instantánea consistente — el último instante exitoso — nunca datos parciales o corruptos.
  • Aislamiento: Como las operaciones se serializan por instant time y las escrituras en curso están ocultas, las lecturas y escrituras concurrentes no interfieren.
  • Durabilidad: Una vez que el marcador de commit se persiste (por ejemplo, en S3), la escritura es duradera y será visible en todas las lecturas futuras.

Además, Hudi admite control de concurrencia multi-escritor mediante control de concurrencia optimista (OCC) y puede integrarse con gestores de bloqueo externos si es necesario, lo que fortalece su modelo transaccional.

Ejemplo

Considera un caso donde dos escritores intentan hacer un UPSERT de registros que pertenecen al mismo grupo de archivos existente, aproximadamente al mismo tiempo.

  • El escritor A selecciona el instante 20250708120000 y planea actualizar el grupo de archivos fileId-123.
  • El escritor B selecciona el instante 20250708120002 y también desea actualizar fileId-123.

Como el grupo de archivos solo puede ser actualizado de forma segura por un escritor a la vez, Hudi utiliza el mecanismo de línea de tiempo y un protocolo de bloqueo para garantizar la consistencia:

  • El escritor A obtiene un bloqueo (o utiliza control de concurrencia optimista) y procede a escribir sus datos actualizados, creando finalmente el archivo 20250708120000.commit.
  • Mientras, el escritor B también crea nuevos archivos de datos (o archivos de registro) basados en el estado del grupo de archivos que observó inicialmente. No obstante, antes de finalizar el commit del escritor B (es decir, antes de escribir el archivo .commit o .deltacommit), Hudi realiza una verificación de conflictos contra la línea de tiempo. Específicamente, Hudi verifica si algún nuevo commit con un instante de tiempo superior ya ha modificado los mismos grupos de archivos que el escritor B pretende actualizar. Esto es posible porque la línea de tiempo en .hoodie/ contiene un historial secuencial de todos los instantes comprometidos, y para cada commit, los metadatos registran qué grupos de archivos fueron modificados.

⚠️ La línea de tiempo muestra que el grupo de archivos fileId-123 fue actualizado por un instante con una marca de tiempo posterior a la instantánea que usó el escritor B, significa que el escritor A realizó primero los cambios en ese grupo. Esto deja obsoleta la vista del escritor B.
🚫 ¿Qué ocurre a continuación? Dado que la vista del escritor B está desactualizada y hay un conflicto, Hudi rechaza el intento de commit del escritor B para evitar sobrescribir o corromper datos.
✅ El escritor B entonces debe actualizar su instantánea — volver a cargar la línea de tiempo más reciente y los metadatos del grupo de archivos — y reaplicar sus actualizaciones sobre los datos más actuales. Este mecanismo de reintento forma parte del enfoque de control de concurrencia optimista (OCC) de Hudi.

Conclusión

Apache Hudi aporta integridad transaccional, gestión eficiente de datos y capacidades de procesamiento en tiempo real a los data lakes modernos — todo ello sin sacrificar la apertura y flexibilidad del almacenamiento basado en archivos.

Gracias a innovaciones como el mecanismo de línea de tiempo, la tabla de metadatos y el soporte para los modelos de almacenamiento Copy-on-Write y Merge-on-Read, Hudi habilita casos de uso potentes como operaciones tipo upsert, consultas incrementales y escrituras concurrentes a gran escala.

Al abstraer la complejidad relacionada con el seguimiento de archivos, la resolución de conflictos y el cumplimiento de propiedades ACID, Hudi transforma tu data lake en una plataforma de datos confiable y de alto rendimiento, cerrando la brecha entre el almacenamiento bruto y la analítica operacional.


Discover more from Catedra T-Systems X URV

Subscribe to get the latest posts sent to your email.


Leave a Reply

Your email address will not be published. Required fields are marked *