Capítulo 4. Borrado, actualización y fusión de tablas

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Como Delta Lake añade una capa transaccional a los lagos de datos clásicos, podemos realizar operaciones DML clásicas, como actualizaciones, eliminaciones y fusiones. Cuando realizas una operación DELETE en una tabla Delta, la operación se realiza a nivel de archivo de datos, eliminando y añadiendo archivos de datos según sea necesario. Los archivos de datos eliminados ya no forman parte de la versión actual de la tabla Delta, pero no deben borrarse físicamente de inmediato, ya que es posible que quieras volver a una versión anterior de la tabla con el viaje en el tiempo (el viaje en el tiempo se trata en el Capítulo 6). Lo mismo ocurre cuando ejecutas una operación UPDATE. Los archivos de datos se añadirán y eliminarán de tu tabla Delta según sea necesario.

La operación DML más potente de Delta Lake es la operación MERGE, que te permite realizar una operación "upsert", que es una mezcla de las operaciones UPDATE, DELETE y INSERT, en tu tabla Delta. Unes una tabla de origen y una de destino, escribes una condición de coincidencia y, a continuación, especificas lo que debe ocurrir con los registros que coincidan o no.

Borrar datos de una tabla Delta

Empezaremos con una tabla taxidb.YellowTaxis limpia. Esta tabla la crea el script de "Inicialización del Capítulo" para el Capítulo 4.1 Tiene 9.999.995 millones de filas:

%sql
SELECT  
    COUNT(id)
FROM    
    taxidb.YellowTaxis

Salida:

+----------+
| count(1) |
+----------+
| 9999995  |
+----------+

Creación de tablas y DESCRIBIR HISTORIA

La tabla taxidb.YellowTaxis Delta se creó en el script "Inicialización del capítulo" y se copió en nuestra carpeta /capítulo04. Veamos DESCRIBE HISTORY para la tabla:2

%sql
DESCRIBE HISTORY taxidb.YellowTaxis

Salida (sólo se muestran las partes relevantes):

+-----------+--------------------------------+---------------------+
| operation | operationParameters            | operationMetrics    |
+-----------+--------------------------------+---------------------+
| WRITE     | [('mode', 'Overwrite'), (...)] | [('numFiles', '2'), |
|           |                                |  ('numOutputRows',  | 
|           |                                |  '9999995'), ...]   |
+-----------+--------------------------------+---------------------+

Podemos ver que tenemos una transacción que contiene una operación WRITE, escribiendo dos archivos de datos por un total de 9.999.995 filas. Averigüemos algunos detalles sobre ambos archivos.

En el Capítulo 2 aprendiste cómo puedes utilizar el registro de transacciones para ver las acciones de añadir y eliminar archivos. Echemos un vistazo al directorio _delta_log:

%sh
ls /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/*.json

Como era de esperar, sólo vemos una entrada en el registro de transacciones:

/dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/….0000.json

Esta entrada de registro debería tener dos acciones de añadir archivo, ya que la entrada numfiles de DESCRIBE HISTORY tenía dos. De nuevo, utilicemos nuestro comando grep para encontrar esas secciones:

%sh
grep \"add\" /dbfs/…/chapter04/YellowTaxisDelta/_delta_log/..0000.json |
  sed -n 1p > /tmp/commit.json
python -m json.tool < /tmp/commit.json

Una variación del comando anterior es que, como ahora tienes dos entradas, tenemos que utilizar el comando sed para extraer la entrada add correcta.

Consejo

Puedes canalizar la salida del comando grep al comando sed3sed es un editor de flujo que realiza transformaciones básicas de texto en un flujo de entrada y escribe el resultado en un flujo de salida. La opción -n suprime la salida normal, y el comando 1p sólo imprime la primera línea de la entrada. Para encontrar la siguiente entrada añadida, puedes utilizar simplemente sed -n 2p, que imprime la segunda línea.

Salida producida (sólo se muestran las partes relevantes):

{
    "add": {
        "path": "part-00000-...-c000.snappy.parquet",
        …
        "stats": "{\"numRecords\":5530100,...}}",
        "tags": {
            …
        }
  }

Aquí vemos el nombre del primer archivo de datos creado para nuestra tabla, y el número de registros de ese archivo. Podemos utilizar el mismo comando con sed -n 2p para obtener la segunda acción añadir para obtener el segundo archivo de datos:

{
    "add": {
        "path": "part-00001-...-c000.snappy.parquet",
        "...: "”
stats”: {\"numRecords\":4469895,...}}",
        "tags": {
            …
        }
    }
}

Ahora sabemos que nuestra tabla tiene los siguientes archivos de datos:

Tabla 4-1. Ficheros parquet creados
Nombre de archivo Parquet Número de registros
part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet 5,530,100
part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet 4,469,895

Estos archivos se corresponden con nuestro listado de directorios, por lo que el registro de transacciones y el informe del listado de directorios son coherentes:

%sh
ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta

drwxrwxrwx 2 _delta_log
-rwxrwxrwx 1 part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet
-rwxrwxrwx 1 part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet

Cómo realizar la operación BORRAR

Supongamos que queremos eliminar un registro único de , en este caso el registro con RideId = 100000. En primer lugar, debemos asegurarnos de que el registro sigue estando en la tabla con un SELECT de SQL:4

%sql
-- First, show that you have data for RideId = 10000
SELECT  
    RideId,
    VendorId,
    CabNumber,
    TotalAmount
FROM    
    taxidb.YellowTaxis
WHERE  
    RideId = 100000

Salida:

+--------+----------+-----------+-------------+
| RideId | VendorId | CabNumber | TotalAmount |
+--------+----------+-----------+-------------+
| 100000 | 2        | T478827C  | 7.56        |
+--------+----------+-----------+-------------+

Para eliminar esta fila, podemos utilizar un simple SQL DELETE. Podemos utilizar el comando DELETE para eliminar selectivamente filas basándonos en un predicado, o condición de filtrado:

%sql
DELETE FROM
    taxidb.YellowTaxis
WHERE RideId = 100000

Salida:

+------------------+
|num_affected_rows |
+------------------+
|  1               |
+------------------+

Podemos confirmar que, efectivamente, hemos eliminado una fila. Si utilizamos el comando DESCRIBE HISTORY para ver las distintas operaciones en la tabla, obtenemos lo siguiente para la versión 1 (la salida de la fila está pivotada para facilitar la lectura):

version:             1
timestamp:           2022-12-14T17:50:23.000+0000
operation:           DELETE
operationParameters: [('predicate', 
                     '["(spark_catalog.taxidb.YellowTaxis.RideId = 100000)"]')]
operationMetrics:    [('numRemovedFiles', '1'),
                      ('numCopiedRows', '5530099'),
                      ('numAddedChangeFiles', '0'),
                      ('executionTimeMs', '32534'),
                      ('numDeletedRows', '1'),
                      ('scanTimeMs', '1524'),
                      ('numAddedFiles', '1'),
                      ('rewriteTimeMs', '31009')]

Podemos ver que la operación fue una DELETE y que el predicado que utilizamos para la eliminación fue WHERE RideId = 100000. Delta Lake eliminó un archivo (numRemovedFiles = 1 ) y añadió uno nuevo (numAdded​Files = 1 ). Si utilizamos nuestro comando de confianza grep para conocer los detalles, las cosas tienen el siguiente aspecto:

Tabla 4-2. Resultado de la operación DELETE
Acción Archivo # Número de registros
Añade part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet 5,530,099
Elimina part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet 4,469,895

La Figura 4-1 ilustra las acciones de Delta Lake cuando borramos el registro.

YellowTaxis Delta table before and after the DELETE operation
Figura 4-1. YellowTaxis Tabla delta antes y después de la operación DELETE

Delta Lake realiza las siguientes acciones como parte de la operación DELETE:

  1. Delta Lake realizó la primera exploración de los datos para identificar cualquier archivo que contuviera filas que coincidieran con la condición del predicado. En este caso, el archivo es el archivo de datos e229aa1d5616; contiene el registro con RideId = 100000.

  2. En una segunda exploración, Delta Lake lee en memoria los archivos de datos coincidentes. En este punto, Delta Lake borra las filas en cuestión antes de escribir un nuevo archivo de datos limpio en el almacenamiento. Este nuevo archivo de datos es el archivo de datos 0d3e0e77c4c1. Como Delta Lake eliminó un registro, este nuevo archivo de datos contiene 5.530.099 registros (5.530.100 - 1).

  3. Cuando Delta Lake completa la operación DELETE, el archivo de datos e229aa1d5616 se elimina ahora del registro de transacciones Delta, puesto que ya no forma parte de la tabla Delta. Este proceso se denomina "tombstoning". Sin embargo, es importante tener en cuenta que este antiguo archivo de datos no se elimina, porque aún podrías necesitarlo para viajar en el tiempo a una versión anterior de la tabla. Puedes utilizar el comando VACUUM para borrar archivos anteriores a un determinado periodo de tiempo. El viaje en el tiempo y el comando VACUUM se tratan en detalle en el Capítulo 6.

  4. El archivo de datos fedaa0f6623d sigue formando parte de la tabla Delta, ya que no se le ha aplicado ningún cambio.

Podemos ver el único archivo de datos (0d3e0e77c4c1) que se ha añadido al directorio en nuestro listado de directorios:

%sh
ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/

drwxrwxrwx _delta_log
-rwxrwxrwx part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet
-rwxrwxrwx part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet
-rwxrwxrwx part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet

El archivo de datos e229aa1d5616 no se borró físicamente.

El mensaje más importante que hay que extraer de esto es que la transacción de eliminación se produce a nivel del archivo de datos. Delta Lake creará nuevas particiones e insertará nuevas acciones de añadir archivo yeliminar archivo en el registro de transacciones, según sea necesario. En el capítulo 6 sobre el ajuste del rendimiento se tratará el comando VACUUM y otras formas de limpiar los archivos de datos tombstoned que ya no son necesarios.

Consejos para ajustar el rendimiento de DELETE

La principal forma de mejorar el rendimiento de una operación DELETE en Delta Lake es añadir más predicados para acotar el espectro de búsqueda. Por ejemplo, si tienes datos particionados y conoces la partición de la que forman parte los registros que debes borrar, puedes añadir su cláusula de partición al predicado DELETE.

Delta Lake también proporciona otras condiciones de optimización, como la omisión de datos y la optimización del orden Z. El orden en Z reorganiza la disposición de cada archivo de datos, de modo que los valores de columna similares de se coloquen estratégicamente cerca unos de otros para lograr la máxima eficacia. Consulta el Capítulo 5 para obtener más detalles.

Actualizar datos en una tabla

Ahora que has visto el impacto de una operación DELETE en la tabla YellowTaxis, echemos un vistazo rápido a una operación UPDATE. Puedes utilizar la operación UPDATE para actualizar selectivamente cualquier fila que coincida con una condición de filtrado, también conocida como predicado.

Descripción del caso práctico

Supongamos que se ha producido un error con el DropLocationId para el registro donde RideId = 9999994 . Primero, asegurémonos de que este registro está presente en nuestra tabla con lo siguiente SELECT:

SELECT
    INPUT_FILE_NAME(),
    RideId,
    VendorId,
    DropLocationId
FROM    
    taxidb.YellowTaxis
WHERE
    RideId = 9999994

La función Spark SQL INPUT_FILE_NAME() es una práctica función que nos da el nombre de archivo en el que se encuentra el registro:

+---------------------------+---------+----------+----------------+
| input_file_name()         | RideId  | VendorId | DropLocationId |
+---------------------------+---------+----------+----------------+
| .../part-00001-...parquet | 9999994 | 1        | 243            |
+---------------------------+---------+----------+----------------+

La función INPUT_FILE_NAME muestra que nuestro registro se encuentra en el archivo de datos fedaa0f6623d, lo cual tiene sentido, ya que es uno de los últimos registros, por lo que lógicamente se encuentra en el último archivo de datos creado. Podemos ver que el DropLocationId existente es actualmente 243. Supongamos que queremos actualizar este campo para que tenga un valor de 250. A continuación veremos la operación DELETE propiamente dicha.

Actualizar datos en una tabla

Ahora podemos escribir la sentencia SQL UPDATE como sigue:

%sql

UPDATE
    taxidb.YellowTaxis
SET
    DropLocationId = 250
WHERE
    RideId = 9999994

Vemos que hemos actualizado una sola fila:

+-------------------+
| num_affected_rows |
+-------------------+
|   1               |
+-------------------+

Comprobemos primero que hemos actualizado la tabla correctamente:

%sql
SELECT  
    RideId,
    DropLocationId
FROM
    taxidb.YellowTaxis
WHERE
    RideId = 9999994

+---------+----------------+
| RideId  | DropLocationId |
+---------+----------------+
| 9999994 | 250            |
+---------+----------------+

La salida muestra que el registro se actualizó correctamente. Cuando utilizamos el comando DESCRIBE HISTORY en la tabla, vemos la operación UPDATE en la versión 3 de la tabla (salida pivotada para mayor claridad):

version:             3
timestamp:           2022-12-23 17:20:45+00:00
operation:           UPDATE
operationParameters: [('predicate', '(RideId = 9999994)')]
operationMetrics:    [('numRemovedFiles', '1'),
                      ('numCopiedRows', '4469894'),
                      ('numAddedChangeFiles', '0'),
                      ('executionTimeMs', '25426'),
                      ('scanTimeMs', '129'),
                      ('numAddedFiles', '1'),
                      ('numUpdatedRows', '1'),
                      ('rewriteTimeMs', '25293')]

Se eliminó un archivo ('numRemovedFiles', '1') y se añadió otro ('numAdded​Files', '1'). También podemos ver nuestro predicado UPDATE [('predicate', '(RideId = 9999994)')] . Si utilizamos el comando grep para conocer los detalles, las cosas tienen el siguiente aspecto:

Tabla 4-3. Medidas adoptadas a raíz de la operación UPDATE
Acción Archivo # Número de registros
Añade part-00000-da1ef656-46e-4de5-a189-50807db851f6-c000.snappy.parquet 4,469,895
Elimina part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet 4,469,895

La Figura 4-2 ilustra las acciones que realizó Delta Lake cuando borramos el registro.

Before and after an UPDATE operation
Figura 4-2. Antes y después de una operación UPDATE

Delta Lake realiza un UPDATE en una mesa en dos pasos:

  1. Busca y selecciona los archivos de datos que contienen datos que coinciden con el predicado y que, por tanto, deben actualizarse. Delta Lake utiliza la omisión de datos siempre que es posible para acelerar este proceso. En este caso, se trata del archivo de datos fedaa0f6623d. También podríamos comprobarlo con la función SQL INPUT_FILE_NAME().

  2. A continuación, Delta Lake lee cada archivo coincidente en memoria, actualiza las filas pertinentes y escribe el resultado en un nuevo archivo de datos. El nuevo archivo de datos en este caso es el archivo 50807db851f6. Ahora contiene todos los registros de la partición fedaa0f6623d, pero con las actualizaciones aplicadas, que en este caso es la actualización de RideId = 9999994. Este archivo de datos es 50807db851f6. Este archivo de datos sigue conteniendo 4.469.895 registros. Una vez que Delta Lake ha ejecutado correctamente UPDATE, añade una acción de añadir archivo para el nuevo archivo de datos.

Como ya no es necesario, el archivo de datos fedaa0f6623d se elimina de la tabla Delta con una acción de confirmación de eliminación en el registro de transacciones. Sin embargo, al igual que la operación DELETE, el archivo no se elimina físicamente, por si quisiéramos consultar una versión antigua de la tabla con un viaje en el tiempo.

El archivo de datos 0d3e0e77c4c1 no se ha visto afectado por nuestra actualización, por lo que sigue formando parte de la tabla Delta y sigue conteniendo 5.530.099 registros.

Consejos de ajuste del rendimiento UPDATE

Al igual que la operación DELETE, la principal forma de mejorar el rendimiento de un comando UPDATE en Delta Lake es añadir más predicados para restringir el ámbito de la búsqueda. Cuanto más específica sea la búsqueda, menos archivos tendrá que escanear y/o modificar Delta Lake.

Como se ha mencionado en la sección anterior, se pueden utilizar otras funciones de Delta Lake, como la ordenación en Z, para acelerar aún más las operaciones de UPDATE. Consulta el Capítulo 5 para más detalles sobre la optimización de Delta Lake.

Insertar datos utilizando la operación FUSIONAR

El comando MERGE de Delta Lake te permite realizar upserts en tus datos. Un upsert es una mezcla de un comando UPDATE y uno INSERT. Para entender los upserts, supongamos que tenemos una tabla existente (la tabla de destino ) y una tabla de origen que contienen una mezcla de registros nuevos y actualizaciones de registros existentes. He aquí cómo funciona realmente un upsert:

  1. Cuando un registro de la tabla de origen coincide con un registro preexistente en la tabla de destino, Delta Lake actualiza el registro.

  2. Cuando no existe tal coincidencia, Delta Lake inserta el nuevo registro.

Descripción del caso práctico

Apliquemos una operación MERGE a nuestra tabla YellowTaxis. Realicemos un recuento de nuestra tabla YellowTaxis:

%sql
SELECT
    COUNT(*)
FROM
    taxidb.YellowTaxis

Vemos que tenemos 9.999.994 registros.

+----------+
| count(1) |
+----------+
| 9999994  |
+----------+

Queremos reinsertar el registro con RideId = 100000 que eliminamos en la sección DELETE de este capítulo. Por lo tanto, en nuestros datos de origen, necesitamos un registro con RideId establecido en 100000.

Para este ejemplo, supongamos que también queremos actualizar los registros con RideId = 999991 porque el VendorId se insertó mal, y hay que actualizarlo a 1 (VendorId = 1) para estos cinco registros. Por último, queremos llevar el recuento de registros a un número par de 10.000.000 registros, así que tenemos 5 registros más, con RideIds que van desde 999996 hasta 10000000.

El conjunto de datos MERGE

En nuestros archivos de datos fuente complementarios para el libro, tenemos un archivo llamado YellowTaxisMergeData.csv, que tiene estos registros. Como tenemos que proporcionar un esquema, primero leemos el esquema de nuestra tabla existente:

df = spark.read.format("delta").table("taxidb.YellowTaxis")
yellowTaxiSchema = df.schema
print(yellowTaxiSchema)

Una vez cargado el esquema, podemos cargar nuestro archivo CSV de datos de fusión:

yellowTaxisMergeDataFrame = spark      \
            .read                      \
            .option("header", "true")  \
            .schema(yellowTaxiSchema)  \
            .csv("/mnt/datalake/book/chapter04/YellowTaxisMergeData.csv") 
            .sort(col("RideId"))

yellowTaxisMergeDataFrame.show()

Aquí se muestra una salida parcial:

+----------+----------+------------------------------+
| RideId   | VendorId | PickupTime                   |
+----------+----------+------------------------------+
|   100000 | 2        | 2022-03-01T00:00:00.000+0000 |
|  9999991 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999992 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999993 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999994 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999995 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999996 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999997 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999998 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999999 | 3        | 2022-03-01T00:00:00.000+0000 |
| 10000000 | 3        | 2022-03-01T00:00:00.000+0000 |
+----------+----------+------------------------------+

Podemos ver nuestro registro con RideId = 100000, los cinco registros (9999991 a 9999995) con sus nuevos VendorId de 1, y los cinco nuevos registros, a partir de 9999996.

Queremos escribir nuestra sentencia MERGE en SQL, por lo que necesitamos tener nuestro DataFrame disponible en SQL. La clase DataFrame tiene un práctico método llamado createOrReplaceTempView, que hace exactamente eso:

# Create a Temporary View on top of our DataFrame, making it
# accessible to the SQL MERGE statement below
yellowTaxisMergeDataFrame.createOrReplaceTempView("YellowTaxiMergeData")

Ahora podemos utilizar simplemente el nombre de la vista en SQL:

%sql
SELECT
    *
FROM    
    YellowTaxiMergeData

Esto muestra exactamente el mismo resultado que con el método display() del DataFrame.

La declaración MERGE

Ahora puedes escribir tu declaración MERGE como sigue:

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId

-- You need to update the VendorId if the records
-- matched
WHEN MATCHED                                      
    THEN
        -- If you want to update all columns,
        -- you can say "SET *"
        UPDATE SET target.VendorId = source.VendorId
WHEN NOT MATCHED
    THEN
        -- If all columns match, you can also do a "INSERT *"
        INSERT(RideId, VendorId, PickupTime, DropTime, PickupLocationId,
               DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount,
               TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount,
               Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)
        VALUES(RideId, VendorId, PickupTime, DropTime, PickupLocationId,
               DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount,
               TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount,
               Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)

Analicemos esta afirmación:

  1. Vamos a MERGE INTO la tabla YellowTaxis Delta. Observa que damos a la tabla un alias de fuente.

  2. Mediante la cláusula USING especificamos el conjunto de datos fuente, que en este caso es la vista YellowTaxiMergeData, y le damos un alias de fuente.

  3. Define la condición de unión entre el conjunto de datos de origen y el de destino. En nuestro caso, simplemente queremos unirnos en VendorId. Si tienes datos particionados y quieres dirigirte a una partición, quizá quieras añadir esa condición aquí con una sentencia AND.

  4. Especifica la acción cuando el RideId coincida entre el source y el target. En este caso de uso, queremos actualizar la fuente con el VendorId de la fuente, que se establece en 1. Aquí, sólo estamos actualizando una columna, pero si lo necesitamos, podemos proporcionar una lista de columnas, separadas por comas. Si queremos actualizar todas las columnas, decimos simplemente UPDATE SET *.

  5. Define la acción cuando el registro exista en el origen, pero no en el destino. No tenemos ninguna condición adicional con el WHEN NOT MATCHED, pero puedes añadir cláusulas adicionales si el caso de uso lo requiere. La mayoría de las veces proporcionarás una sentencia INSERT como acción. Como los nombres de nuestras columnas de origen y destino son idénticos, también podríamos haber utilizado una simple INSERT * .

Cuando ejecutamos esta sentencia MERGE, obtenemos la siguiente salida:

+-------------------+------------------+------------------+-------------------+
| num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows |
+-------------------+------------------+------------------+-------------------+
|        11         |        5         |        0         |         6         |
+-------------------+------------------+------------------+-------------------+

Este resultado es exactamente el que esperabas:

  • Actualizamos cinco filas (VendorIds 9999991 hasta 9999995)

  • Insertamos seis filas:

    • Una fila con un RideId de 100000

    • Las cinco filas del final (9999996 a 10000000)

Podemos ver las actualizaciones en las cinco primeras filas:

%sql
-- Make sure that the VendorId has been updated
-- for the records with RideId between
-- 9999991 and 9999995
SELECT
    RideId,
    VendorId
FROM
    taxidb.YellowTaxis
WHERE RideId BETWEEN 9999991 and 9999995

+---------+----------+
| RideId  | VendorId |
+---------+----------+
| 9999991 | 1        |
| 9999992 | 1        |
| 9999993 | 1        |
| 9999994 | 1        |
| 9999995 | 1        |
+---------+----------+

Ahora todas las filas tienen el origen VendorId de 1.

Podemos ver el registro insertado con RideId = 100000 :

%sql
--Make sure that you have a record with VendorId = 100000
SELECT
    *
FROM    
    taxidb.YellowTaxis
WHERE
    RideId = 100000

Salida (se muestra una salida parcial):

+--------+----------+---------------------------+---------------------------+
| RideId | VendorId | PickupTime                | DropTime                  |
+--------+----------+---------------------------+---------------------------+
| 100000 | 2        | 2022-03-01 00:00:00+00:00 | 2022-03-01 00:12:01+00:00 |
+--------+----------+---------------------------+---------------------------+

Y por último, podemos ver las nuevas filas con RideId > 9999995 :

%sql
SELECT  
    *
FROM
    taxidb.YellowTaxis
WHERE
    RideId > 9999995

+----------+----------+---------------------------+
| RideId   | VendorId | PickupTime                |
+----------+----------+---------------------------+
| 9999996  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999997  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999998  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999999  | 3        | 2022-03-01 00:00:00+00:00 |
| 10000000 | 3        | 2022-03-01 00:00:00+00:00 |
+----------+----------+---------------------------+

Y un gran total de 10 millones de registros:

%sql
SELECT  
    COUNT(RideId)
FROM    
    taxidb.YellowTaxis

+----------+
| count(1) |
+----------+
| 10000000 |
+----------+

Modificar filas sin emparejar utilizando MERGE

Una adición importante a la operación Delta Lake MERGE es la cláusula WHEN NOT MATCHED BY SOURCE, lanzada recientemente. Esta cláusula puede utilizarse para UPDATE o DELETE registros de la tabla destino que no tienen registros correspondientes en la tabla origen. Puede ser una operación útil para borrar registros de la tabla de destino que ya no existen en la tabla de origen, o para marcar registros que indiquen que los datos deben considerarse borrados o inactivos, pero manteniendo los registros en la tabla de destino (es decir, borrado suave).

Nota

WHEN NOT MATCHED BY SOURCE son compatibles con las API Delta Lake de Scala, Python y Java a partir de la versión Delta 2.3. SQL es compatible con Delta 2.4 y superiores.

Para borrar registros que existen en las tablas de origen y no en la tabla de destino (es decir, borrado duro), utiliza la cláusula WHEN NOT MATCHED BY SOURCE, como se ve en el siguiente ejemplo de código:

Nota

El código WHEN NOT MATCHED BY SOURCE sólo tiene fines demostrativos y no debe ejecutarse en secuencia con los ejemplos de código anteriores. Ten en cuenta que si ejecutas los ejemplos de código WHEN NOT MATCHED BY SOURCE, el resto de salidas de código de este capítulo no coincidirán con los ejemplos y las salidas esperadas de este capítulo.

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId
WHEN MATCHED                                      
        UPDATE SET *
WHEN NOT MATCHED
        INSERT *
-- DELETE records in the target that are not matched by the source
WHEN NOT MATCHED BY SOURCE
    DELETE

Si deseas marcar los registros de la tabla de destino que ya no existen en la tabla de origen (es decir, un borrado suave) que cumplan una determinada condición, puedes especificar una condición MERGE y una UPDATE:

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId
WHEN MATCHED                                      
        UPDATE SET *
WHEN NOT MATCHED
        INSERT *
-- Set target.status = 'inactive' when records in the target table 
-- don’t exist in the source table and condition is met
WHEN NOT MATCHED BY SOURCE target.PickupTime >= 
  (current_date() - INTERVAL '5' DAY) 
THEN
          UPDATE SET target.status = 'inactive'

Es una buena práctica añadir una condición MERGE opcional cuando añadas la cláusula WHEN NOT MATCHED BY SOURCE a las filas de destino UPDATE o DELETE. Esto se debe a que cuando no se especifica una condición MERGE, se puede modificar un gran número de filas de destino. Por lo tanto, para obtener el mejor rendimiento, aplica una condición MERGE a la cláusula WHEN NOT MATCHED BY SOURCE (por ejemplo, target.PickupTime >= (current_date() - INTERVAL '5' DAY en el ejemplo de código anterior) para limitar el número de filas de destino que se actualizan o eliminan, porque entonces una fila de destino sólo se modifica si esa condición es verdadera para esa fila.

También puedes añadir varias cláusulas WHEN NOT MATCHED BY SOURCE a una operación MERGE. Cuando hay varias cláusulas, se evalúan en el orden en que se especifican y todas las WHEN NOT MATCHED BY SOURCE cláusulas, excepto la última, deben tener condiciones.

Analizar la operación FUSIONAR con DESCRIBIR HISTORIA

Cuando ejecutamos DESCRIBE HISTORY en la tabla YellowTaxis en la sección operationsParameters de la salida, podemos ver nuestro predicado MERGE:

operation: MERGE

[('predicate',
  '(target.RideId = source.RideId)'),
   ('matchedPredicates', '[{"actionType":"update"}]'),
   ('notMatchedPredicates', '[{"actionType":"insert"}]')]

Podemos ver la condición de unión (target.RideId = source.RideId), el botón matchedPredicate que especifica una actualización, y el notMatchedPredicate, que especifica una inserción.

Las secciones de salida de operationMetrics muestran los detalles de las diferentes acciones:

 [('numTargetRowsCopied', '4469890'),
  ('numTargetRowsDeleted', '0'),
  ('numTargetFilesAdded', '4'),
  ('executionTimeMs', '91902'),
  ('numTargetRowsInserted', '6'),
  ('scanTimeMs', '8452'),
  ('numTargetRowsUpdated', '5'),
  ('numOutputRows', '4469901'),
  ('numTargetChangeFilesAdded', '0'),
  ('numSourceRows', '11'),
  ('numTargetFilesRemoved', '1'),
  ('rewriteTimeMs', '16782')]

Aquí podemos ver de nuevo que se insertaron seis filas (numTargetRowsInserted), y se actualizaron cinco (numTargetRowsUpdated). Se añadieron cuatro nuevos archivos de datos a nuestra tabla Delta, y se eliminó un archivo de datos.

Funcionamiento interno de la operación FUSIONAR

Internamente, Delta Lake completa una operación MERGE como ésta en dos pasos:

  1. Primero realiza un inner join entre la tabla de destino y la tabla de origen para seleccionar todos los archivos de datos que contengan coincidencias. Esto evita que la operación baraje innecesariamente datos que pueden ignorarse con seguridad.

  2. A continuación, realiza un outer join entre los archivos seleccionados en las tablas destino y origen, y aplica la cláusula INSERT, DELETE o UPDATE adecuada, según especifique el usuario.

La principal diferencia entre un MERGE y un UPDATE o un DELETE bajo el capó es que Delta Lake utiliza uniones para completar un MERGE. Esto te permite utilizar algunas estrategias únicas cuando intentas mejorar el rendimiento.

Conclusión

Las operaciones DML como DELETE, UPDATE, y MERGE son operaciones esenciales para cualquier formato de tabla y operaciones ETL, todas ellas habilitadas a través del registro de transacciones. Aprovechando estas operaciones, puedes empezar a gestionar eficazmente los cambios en los datos y mantener la integridad de los mismos en tu plataforma de datos.

De forma similar a las tablas de un RDBMS tradicional, en este capítulo has leído que con las tablas Delta puedes realizar operaciones DELETE, UPDATE, y MERGE, pero también puedes aplicar estas operaciones utilizando SQL o la API DataFrame. Y lo que es más importante, aprendiste lo que ocurre bajo el capó en Delta Lake con los archivos subyacentes en el directorio de tablas Delta, y cómo el registro de transacciones registra y rastrea estos diferentes tipos de entradas. Utilizando el comando DESCRIBE HISTORY, podemos ver detalles sobre la salida de las transacciones de una tabla. Cada una de estas operaciones también puede aprovechar predicados para reducir el número de archivos escaneados y mejorar el rendimiento. Aparte de utilizar predicados durante las operaciones, existen otras técnicas de ajuste del rendimiento que se pueden aplicar a las tablas Delta y que conocerás en el capítulo siguiente.

1 Ubicación del repositorio de GitHub: /capitulo04/00 - Inicialización del capítulo

2 Ubicación del repositorio de GitHub: /capítulo04/01 - Operaciones de eliminación

3 Página del manual para sed

4 Ubicación del repositorio de GitHub: /capítulo04/01 - Operaciones de eliminación

Get Lago Delta: En marcha now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.