Capítulo 4. Borrado, actualización y fusión de tablas
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
Como Delta Lake añade una capa transaccional a los lagos de datos clásicos, podemos realizar operaciones DML clásicas, como actualizaciones, eliminaciones y fusiones. Cuando realizas una operación DELETE
en una tabla Delta, la operación se realiza a nivel de archivo de datos, eliminando y añadiendo archivos de datos según sea necesario. Los archivos de datos eliminados ya no forman parte de la versión actual de la tabla Delta, pero no deben borrarse físicamente de inmediato, ya que es posible que quieras volver a una versión anterior de la tabla con el viaje en el tiempo (el viaje en el tiempo se trata en el Capítulo 6). Lo mismo ocurre cuando ejecutas una operación UPDATE
. Los archivos de datos se añadirán y eliminarán de tu tabla Delta según sea necesario.
La operación DML más potente de Delta Lake es la operación MERGE
, que te permite realizar una operación "upsert", que es una mezcla de las operaciones UPDATE
, DELETE
y INSERT
, en tu tabla Delta. Unes una tabla de origen y una de destino, escribes una condición de coincidencia y, a continuación, especificas lo que debe ocurrir con los registros que coincidan o no.
Borrar datos de una tabla Delta
Empezaremos con una tabla taxidb.YellowTaxis
limpia. Esta tabla la crea el script de "Inicialización del Capítulo" para el Capítulo 4.1 Tiene 9.999.995 millones de filas:
%sql SELECT COUNT(id) FROM taxidb.YellowTaxis
Salida:
+----------+ | count(1) | +----------+ | 9999995 | +----------+
Creación de tablas y DESCRIBIR HISTORIA
La tabla taxidb.YellowTaxis
Delta se creó en el script "Inicialización del capítulo" y se copió en nuestra carpeta /capítulo04. Veamos DESCRIBE HISTORY
para la tabla:2
%sql DESCRIBE HISTORY taxidb.YellowTaxis
Salida (sólo se muestran las partes relevantes):
+-----------+--------------------------------+---------------------+ | operation | operationParameters | operationMetrics | +-----------+--------------------------------+---------------------+ | WRITE | [('mode', 'Overwrite'), (...)] | [('numFiles', '2'), | | | | ('numOutputRows', | | | | '9999995'), ...] | +-----------+--------------------------------+---------------------+
Podemos ver que tenemos una transacción que contiene una operación WRITE
, escribiendo dos archivos de datos por un total de 9.999.995 filas. Averigüemos algunos detalles sobre ambos archivos.
En el Capítulo 2 aprendiste cómo puedes utilizar el registro de transacciones para ver las acciones de añadir y eliminar archivos. Echemos un vistazo al directorio _delta_log:
%sh ls /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/*.json
Como era de esperar, sólo vemos una entrada en el registro de transacciones:
/dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/….0000.json
Esta entrada de registro debería tener dos acciones de añadir archivo, ya que la entrada numfiles
de DESCRIBE HISTORY
tenía dos. De nuevo, utilicemos nuestro comando grep
para encontrar esas secciones:
%sh grep \"add\" /dbfs/…/chapter04/YellowTaxisDelta/_delta_log/..0000.json | sed -n 1p > /tmp/commit.json python -m json.tool < /tmp/commit.json
Una variación del comando anterior es que, como ahora tienes dos entradas, tenemos que utilizar el comando sed
para extraer la entrada add correcta.
Consejo
Puedes canalizar la salida del comando grep
al comando sed
3sed
es un editor de flujo que realiza transformaciones básicas de texto en un flujo de entrada y escribe el resultado en un flujo de salida. La opción -n
suprime la salida normal, y el comando 1p
sólo imprime la primera línea de la entrada. Para encontrar la siguiente entrada añadida, puedes utilizar simplemente sed -n 2p
, que imprime la segunda línea.
Salida producida (sólo se muestran las partes relevantes):
{ "add": { "path": "part-00000-...-c000.snappy.parquet", … "stats": "{\"numRecords\":5530100,...}}", "tags": { … } }
Aquí vemos el nombre del primer archivo de datos creado para nuestra tabla, y el número de registros de ese archivo. Podemos utilizar el mismo comando con sed -n 2p
para obtener la segunda acción añadir para obtener el segundo archivo de datos:
{ "add": { "path": "part-00001-...-c000.snappy.parquet", "...: "” stats”: {\"numRecords\":4469895,...}}", "tags": { … } } }
Ahora sabemos que nuestra tabla tiene los siguientes archivos de datos:
Nombre de archivo Parquet | Número de registros |
---|---|
part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet | 5,530,100 |
part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet | 4,469,895 |
Estos archivos se corresponden con nuestro listado de directorios, por lo que el registro de transacciones y el informe del listado de directorios son coherentes:
%sh ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta drwxrwxrwx 2 _delta_log -rwxrwxrwx 1 part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet -rwxrwxrwx 1 part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet
Cómo realizar la operación BORRAR
Supongamos que queremos eliminar un registro único de , en este caso el registro con RideId = 100000
. En primer lugar, debemos asegurarnos de que el registro sigue estando en la tabla con un SELECT de SQL:4
%sql -- First, show that you have data for RideId = 10000 SELECT RideId, VendorId, CabNumber, TotalAmount FROM taxidb.YellowTaxis WHERE RideId = 100000
Salida:
+--------+----------+-----------+-------------+ | RideId | VendorId | CabNumber | TotalAmount | +--------+----------+-----------+-------------+ | 100000 | 2 | T478827C | 7.56 | +--------+----------+-----------+-------------+
Para eliminar esta fila, podemos utilizar un simple SQL DELETE
. Podemos utilizar el comando DELETE
para eliminar selectivamente filas basándonos en un predicado, o condición de filtrado:
%sql DELETE FROM taxidb.YellowTaxis WHERE RideId = 100000
Salida:
+------------------+ |num_affected_rows | +------------------+ | 1 | +------------------+
Podemos confirmar que, efectivamente, hemos eliminado una fila. Si utilizamos el comando DESCRIBE HISTORY
para ver las distintas operaciones en la tabla, obtenemos lo siguiente para la versión 1 (la salida de la fila está pivotada para facilitar la lectura):
version: 1 timestamp: 2022-12-14T17:50:23.000+0000 operation: DELETE operationParameters: [('predicate', '["(spark_catalog.taxidb.YellowTaxis.RideId = 100000)"]')] operationMetrics: [('numRemovedFiles', '1'), ('numCopiedRows', '5530099'), ('numAddedChangeFiles', '0'), ('executionTimeMs', '32534'), ('numDeletedRows', '1'), ('scanTimeMs', '1524'), ('numAddedFiles', '1'), ('rewriteTimeMs', '31009')]
Podemos ver que la operación fue una DELETE
y que el predicado que utilizamos para la eliminación fue WHERE RideId = 100000
. Delta Lake eliminó un archivo (numRemovedFiles
= 1
) y añadió uno nuevo (numAddedFiles =
1
). Si utilizamos nuestro comando de confianza grep
para conocer los detalles, las cosas tienen el siguiente aspecto:
Acción | Archivo | # Número de registros |
---|---|---|
Añade | part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet | 5,530,099 |
Elimina | part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet | 4,469,895 |
La Figura 4-1 ilustra las acciones de Delta Lake cuando borramos el registro.
Delta Lake realiza las siguientes acciones como parte de la operación DELETE
:
Delta Lake realizó la primera exploración de los datos para identificar cualquier archivo que contuviera filas que coincidieran con la condición del predicado. En este caso, el archivo es el archivo de datos
e229aa1d5616
; contiene el registro conRideId = 100000
.En una segunda exploración, Delta Lake lee en memoria los archivos de datos coincidentes. En este punto, Delta Lake borra las filas en cuestión antes de escribir un nuevo archivo de datos limpio en el almacenamiento. Este nuevo archivo de datos es el archivo de datos
0d3e0e77c4c1
. Como Delta Lake eliminó un registro, este nuevo archivo de datos contiene 5.530.099 registros (5.530.100 - 1).Cuando Delta Lake completa la operación
DELETE
, el archivo de datose229aa1d5616
se elimina ahora del registro de transacciones Delta, puesto que ya no forma parte de la tabla Delta. Este proceso se denomina "tombstoning". Sin embargo, es importante tener en cuenta que este antiguo archivo de datos no se elimina, porque aún podrías necesitarlo para viajar en el tiempo a una versión anterior de la tabla. Puedes utilizar el comandoVACUUM
para borrar archivos anteriores a un determinado periodo de tiempo. El viaje en el tiempo y el comandoVACUUM
se tratan en detalle en el Capítulo 6.El archivo de datos
fedaa0f6623d
sigue formando parte de la tabla Delta, ya que no se le ha aplicado ningún cambio.
Podemos ver el único archivo de datos (0d3e0e77c4c1
) que se ha añadido al directorio en nuestro listado de directorios:
%sh ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/ drwxrwxrwx _delta_log -rwxrwxrwx part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet -rwxrwxrwx part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet -rwxrwxrwx part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet
El archivo de datos e229aa1d5616
no se borró físicamente.
El mensaje más importante que hay que extraer de esto es que la transacción de eliminación se produce a nivel del archivo de datos. Delta Lake creará nuevas particiones e insertará nuevas acciones de añadir archivo yeliminar archivo en el registro de transacciones, según sea necesario. En el capítulo 6 sobre el ajuste del rendimiento se tratará el comando VACUUM
y otras formas de limpiar los archivos de datos tombstoned que ya no son necesarios.
Consejos para ajustar el rendimiento de DELETE
La principal forma de mejorar el rendimiento de una operación DELETE
en Delta Lake es añadir más predicados para acotar el espectro de búsqueda. Por ejemplo, si tienes datos particionados y conoces la partición de la que forman parte los registros que debes borrar, puedes añadir su cláusula de partición al predicado DELETE
.
Delta Lake también proporciona otras condiciones de optimización, como la omisión de datos y la optimización del orden Z. El orden en Z reorganiza la disposición de cada archivo de datos, de modo que los valores de columna similares de se coloquen estratégicamente cerca unos de otros para lograr la máxima eficacia. Consulta el Capítulo 5 para obtener más detalles.
Actualizar datos en una tabla
Ahora que has visto el impacto de una operación DELETE
en la tabla YellowTaxis
, echemos un vistazo rápido a una operación UPDATE
. Puedes utilizar la operación UPDATE
para actualizar selectivamente cualquier fila que coincida con una condición de filtrado, también conocida como predicado.
Descripción del caso práctico
Supongamos que se ha producido un error con el DropLocationId
para el registro donde RideId =
9999994
. Primero, asegurémonos de que este registro está presente en nuestra tabla con lo siguiente SELECT
:
SELECT INPUT_FILE_NAME(), RideId, VendorId, DropLocationId FROM taxidb.YellowTaxis WHERE RideId = 9999994
La función Spark SQL INPUT_FILE_NAME()
es una práctica función que nos da el nombre de archivo en el que se encuentra el registro:
+---------------------------+---------+----------+----------------+ | input_file_name() | RideId | VendorId | DropLocationId | +---------------------------+---------+----------+----------------+ | .../part-00001-...parquet | 9999994 | 1 | 243 | +---------------------------+---------+----------+----------------+
La función INPUT_FILE_NAME
muestra que nuestro registro se encuentra en el archivo de datos fedaa0f6623d
, lo cual tiene sentido, ya que es uno de los últimos registros, por lo que lógicamente se encuentra en el último archivo de datos creado. Podemos ver que el DropLocationId
existente es actualmente 243. Supongamos que queremos actualizar este campo para que tenga un valor de 250. A continuación veremos la operación DELETE
propiamente dicha.
Actualizar datos en una tabla
Ahora podemos escribir la sentencia SQL UPDATE
como sigue:
%sql UPDATE taxidb.YellowTaxis SET DropLocationId = 250 WHERE RideId = 9999994
Vemos que hemos actualizado una sola fila:
+-------------------+ | num_affected_rows | +-------------------+ | 1 | +-------------------+
Comprobemos primero que hemos actualizado la tabla correctamente:
%sql SELECT RideId, DropLocationId FROM taxidb.YellowTaxis WHERE RideId = 9999994 +---------+----------------+ | RideId | DropLocationId | +---------+----------------+ | 9999994 | 250 | +---------+----------------+
La salida muestra que el registro se actualizó correctamente. Cuando utilizamos el comando DESCRIBE HISTORY
en la tabla, vemos la operación UPDATE
en la versión 3 de la tabla (salida pivotada para mayor claridad):
version: 3 timestamp: 2022-12-23 17:20:45+00:00 operation: UPDATE operationParameters: [('predicate', '(RideId = 9999994)')] operationMetrics: [('numRemovedFiles', '1'), ('numCopiedRows', '4469894'), ('numAddedChangeFiles', '0'), ('executionTimeMs', '25426'), ('scanTimeMs', '129'), ('numAddedFiles', '1'), ('numUpdatedRows', '1'), ('rewriteTimeMs', '25293')]
Se eliminó un archivo ('numRemovedFiles', '1'
) y se añadió otro ('numAddedFiles', '1'
). También podemos ver nuestro predicado UPDATE
[('predicate', '(RideId = 9999994)')]
. Si utilizamos el comando grep
para conocer los detalles, las cosas tienen el siguiente aspecto:
Acción | Archivo | # Número de registros |
---|---|---|
Añade | part-00000-da1ef656-46e-4de5-a189-50807db851f6-c000.snappy.parquet | 4,469,895 |
Elimina | part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet | 4,469,895 |
La Figura 4-2 ilustra las acciones que realizó Delta Lake cuando borramos el registro.
Delta Lake realiza un UPDATE
en una mesa en dos pasos:
Busca y selecciona los archivos de datos que contienen datos que coinciden con el predicado y que, por tanto, deben actualizarse. Delta Lake utiliza la omisión de datos siempre que es posible para acelerar este proceso. En este caso, se trata del archivo de datos
fedaa0f6623d
. También podríamos comprobarlo con la función SQLINPUT_FILE_NAME()
.A continuación, Delta Lake lee cada archivo coincidente en memoria, actualiza las filas pertinentes y escribe el resultado en un nuevo archivo de datos. El nuevo archivo de datos en este caso es el archivo
50807db851f6
. Ahora contiene todos los registros de la particiónfedaa0f6623d
, pero con las actualizaciones aplicadas, que en este caso es la actualización deRideId = 9999994
. Este archivo de datos es50807db851f6
. Este archivo de datos sigue conteniendo 4.469.895 registros. Una vez que Delta Lake ha ejecutado correctamenteUPDATE
, añade una acción de añadir archivo para el nuevo archivo de datos.
Como ya no es necesario, el archivo de datos fedaa0f6623d
se elimina de la tabla Delta con una acción de confirmación de eliminación en el registro de transacciones. Sin embargo, al igual que la operación DELETE
, el archivo no se elimina físicamente, por si quisiéramos consultar una versión antigua de la tabla con un viaje en el tiempo.
El archivo de datos 0d3e0e77c4c1
no se ha visto afectado por nuestra actualización, por lo que sigue formando parte de la tabla Delta y sigue conteniendo 5.530.099 registros.
Consejos de ajuste del rendimiento UPDATE
Al igual que la operación DELETE
, la principal forma de mejorar el rendimiento de un comando UPDATE
en Delta Lake es añadir más predicados para restringir el ámbito de la búsqueda. Cuanto más específica sea la búsqueda, menos archivos tendrá que escanear y/o modificar Delta Lake.
Como se ha mencionado en la sección anterior, se pueden utilizar otras funciones de Delta Lake, como la ordenación en Z, para acelerar aún más las operaciones de UPDATE
. Consulta el Capítulo 5 para más detalles sobre la optimización de Delta Lake.
Insertar datos utilizando la operación FUSIONAR
El comando MERGE
de Delta Lake te permite realizar upserts en tus datos. Un upsert es una mezcla de un comando UPDATE
y uno INSERT
. Para entender los upserts, supongamos que tenemos una tabla existente (la tabla de destino ) y una tabla de origen que contienen una mezcla de registros nuevos y actualizaciones de registros existentes. He aquí cómo funciona realmente un upsert:
Cuando un registro de la tabla de origen coincide con un registro preexistente en la tabla de destino, Delta Lake actualiza el registro.
Cuando no existe tal coincidencia, Delta Lake inserta el nuevo registro.
Descripción del caso práctico
Apliquemos una operación MERGE
a nuestra tabla YellowTaxis
. Realicemos un recuento de nuestra tabla YellowTaxis
:
%sql SELECT COUNT(*) FROM taxidb.YellowTaxis
Vemos que tenemos 9.999.994 registros.
+----------+ | count(1) | +----------+ | 9999994 | +----------+
Queremos reinsertar el registro con RideId = 100000
que eliminamos en la sección DELETE
de este capítulo. Por lo tanto, en nuestros datos de origen, necesitamos un registro con RideId
establecido en 100000
.
Para este ejemplo, supongamos que también queremos actualizar los registros con RideId
= 999991
porque el VendorId
se insertó mal, y hay que actualizarlo a 1 (VendorId = 1
) para estos cinco registros. Por último, queremos llevar el recuento de registros a un número par de 10.000.000 registros, así que tenemos 5 registros más, con RideId
s que van desde 999996
hasta 10000000
.
El conjunto de datos MERGE
En nuestros archivos de datos fuente complementarios para el libro, tenemos un archivo llamado YellowTaxisMergeData.csv, que tiene estos registros. Como tenemos que proporcionar un esquema, primero leemos el esquema de nuestra tabla existente:
df = spark.read.format("delta").table("taxidb.YellowTaxis") yellowTaxiSchema = df.schema print(yellowTaxiSchema)
Una vez cargado el esquema, podemos cargar nuestro archivo CSV de datos de fusión:
yellowTaxisMergeDataFrame = spark \ .read \ .option("header", "true") \ .schema(yellowTaxiSchema) \ .csv("/mnt/datalake/book/chapter04/YellowTaxisMergeData.csv") .sort(col("RideId")) yellowTaxisMergeDataFrame.show()
Aquí se muestra una salida parcial:
+----------+----------+------------------------------+ | RideId | VendorId | PickupTime | +----------+----------+------------------------------+ | 100000 | 2 | 2022-03-01T00:00:00.000+0000 | | 9999991 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999992 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999993 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999994 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999995 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999996 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999997 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999998 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999999 | 3 | 2022-03-01T00:00:00.000+0000 | | 10000000 | 3 | 2022-03-01T00:00:00.000+0000 | +----------+----------+------------------------------+
Podemos ver nuestro registro con RideId = 100000
, los cinco registros (9999991
a 9999995
) con sus nuevos VendorId
de 1
, y los cinco nuevos registros, a partir de 9999996
.
Queremos escribir nuestra sentencia MERGE
en SQL, por lo que necesitamos tener nuestro DataFrame disponible en SQL. La clase DataFrame
tiene un práctico método llamado createOrReplaceTempView
, que hace exactamente eso:
# Create a Temporary View on top of our DataFrame, making it # accessible to the SQL MERGE statement below yellowTaxisMergeDataFrame.createOrReplaceTempView("YellowTaxiMergeData")
Ahora podemos utilizar simplemente el nombre de la vista en SQL:
%sql SELECT * FROM YellowTaxiMergeData
Esto muestra exactamente el mismo resultado que con el método display()
del DataFrame.
La declaración MERGE
Ahora puedes escribir tu declaración MERGE
como sigue:
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId -- You need to update the VendorId if the records -- matched WHEN MATCHED THEN -- If you want to update all columns, -- you can say "SET *" UPDATE SET target.VendorId = source.VendorId WHEN NOT MATCHED THEN -- If all columns match, you can also do a "INSERT *" INSERT(RideId, VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge) VALUES(RideId, VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)
Analicemos esta afirmación:
Vamos a
MERGE INTO
la tablaYellowTaxis
Delta. Observa que damos a la tabla un alias de fuente.Mediante la cláusula
USING
especificamos el conjunto de datos fuente, que en este caso es la vistaYellowTaxiMergeData
, y le damos un alias de fuente.Define la condición de unión entre el conjunto de datos de origen y el de destino. En nuestro caso, simplemente queremos unirnos en
VendorId
. Si tienes datos particionados y quieres dirigirte a una partición, quizá quieras añadir esa condición aquí con una sentenciaAND
.Especifica la acción cuando el
RideId
coincida entre elsource
y eltarget
. En este caso de uso, queremos actualizar la fuente con elVendorId
de la fuente, que se establece en 1. Aquí, sólo estamos actualizando una columna, pero si lo necesitamos, podemos proporcionar una lista de columnas, separadas por comas. Si queremos actualizar todas las columnas, decimos simplementeUPDATE SET *
.Define la acción cuando el registro exista en el origen, pero no en el destino. No tenemos ninguna condición adicional con el
WHEN NOT MATCHED
, pero puedes añadir cláusulas adicionales si el caso de uso lo requiere. La mayoría de las veces proporcionarás una sentenciaINSERT
como acción. Como los nombres de nuestras columnas de origen y destino son idénticos, también podríamos haber utilizado una simpleINSERT
*
.
Cuando ejecutamos esta sentencia MERGE
, obtenemos la siguiente salida:
+-------------------+------------------+------------------+-------------------+ | num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows | +-------------------+------------------+------------------+-------------------+ | 11 | 5 | 0 | 6 | +-------------------+------------------+------------------+-------------------+
Este resultado es exactamente el que esperabas:
Actualizamos cinco filas (
VendorId
s9999991
hasta9999995
)Insertamos seis filas:
Una fila con un
RideId
de100000
Las cinco filas del final (
9999996
a10000000
)
Podemos ver las actualizaciones en las cinco primeras filas:
%sql -- Make sure that the VendorId has been updated -- for the records with RideId between -- 9999991 and 9999995 SELECT RideId, VendorId FROM taxidb.YellowTaxis WHERE RideId BETWEEN 9999991 and 9999995
+---------+----------+ | RideId | VendorId | +---------+----------+ | 9999991 | 1 | | 9999992 | 1 | | 9999993 | 1 | | 9999994 | 1 | | 9999995 | 1 | +---------+----------+
Ahora todas las filas tienen el origen VendorId
de 1.
Podemos ver el registro insertado con RideId
= 100000
:
%sql --Make sure that you have a record with VendorId = 100000 SELECT * FROM taxidb.YellowTaxis WHERE RideId = 100000
Salida (se muestra una salida parcial):
+--------+----------+---------------------------+---------------------------+ | RideId | VendorId | PickupTime | DropTime | +--------+----------+---------------------------+---------------------------+ | 100000 | 2 | 2022-03-01 00:00:00+00:00 | 2022-03-01 00:12:01+00:00 | +--------+----------+---------------------------+---------------------------+
Y por último, podemos ver las nuevas filas con RideId >
9999995
:
%sql SELECT * FROM taxidb.YellowTaxis WHERE RideId > 9999995 +----------+----------+---------------------------+ | RideId | VendorId | PickupTime | +----------+----------+---------------------------+ | 9999996 | 3 | 2022-03-01 00:00:00+00:00 | | 9999997 | 3 | 2022-03-01 00:00:00+00:00 | | 9999998 | 3 | 2022-03-01 00:00:00+00:00 | | 9999999 | 3 | 2022-03-01 00:00:00+00:00 | | 10000000 | 3 | 2022-03-01 00:00:00+00:00 | +----------+----------+---------------------------+
Y un gran total de 10 millones de registros:
%sql SELECT COUNT(RideId) FROM taxidb.YellowTaxis +----------+ | count(1) | +----------+ | 10000000 | +----------+
Modificar filas sin emparejar utilizando MERGE
Una adición importante a la operación Delta Lake MERGE
es la cláusula WHEN NOT MATCHED BY SOURCE
, lanzada recientemente. Esta cláusula puede utilizarse para UPDATE
o DELETE
registros de la tabla destino que no tienen registros correspondientes en la tabla origen. Puede ser una operación útil para borrar registros de la tabla de destino que ya no existen en la tabla de origen, o para marcar registros que indiquen que los datos deben considerarse borrados o inactivos, pero manteniendo los registros en la tabla de destino (es decir, borrado suave).
Nota
WHEN NOT MATCHED BY SOURCE
son compatibles con las API Delta Lake de Scala, Python y Java a partir de la versión Delta 2.3. SQL es compatible con Delta 2.4 y superiores.
Para borrar registros que existen en las tablas de origen y no en la tabla de destino (es decir, borrado duro), utiliza la cláusula WHEN NOT MATCHED BY SOURCE
, como se ve en el siguiente ejemplo de código:
Nota
El código WHEN NOT MATCHED BY SOURCE
sólo tiene fines demostrativos y no debe ejecutarse en secuencia con los ejemplos de código anteriores. Ten en cuenta que si ejecutas los ejemplos de código WHEN NOT MATCHED BY SOURCE
, el resto de salidas de código de este capítulo no coincidirán con los ejemplos y las salidas esperadas de este capítulo.
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId WHEN MATCHED UPDATE SET * WHEN NOT MATCHED INSERT * -- DELETE records in the target that are not matched by the source WHEN NOT MATCHED BY SOURCE DELETE
Si deseas marcar los registros de la tabla de destino que ya no existen en la tabla de origen (es decir, un borrado suave) que cumplan una determinada condición, puedes especificar una condición MERGE
y una UPDATE
:
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId WHEN MATCHED UPDATE SET * WHEN NOT MATCHED INSERT * -- Set target.status = 'inactive' when records in the target table -- don’t exist in the source table and condition is met WHEN NOT MATCHED BY SOURCE target.PickupTime >= (current_date() - INTERVAL '5' DAY) THEN UPDATE SET target.status = 'inactive'
Es una buena práctica añadir una condición MERGE
opcional cuando añadas la cláusula WHEN NOT MATCHED BY SOURCE
a las filas de destino UPDATE
o DELETE
. Esto se debe a que cuando no se especifica una condición MERGE
, se puede modificar un gran número de filas de destino. Por lo tanto, para obtener el mejor rendimiento, aplica una condición MERGE
a la cláusula WHEN NOT MATCHED BY SOURCE
(por ejemplo, target.PickupTime >= (current_date() - INTERVAL '5' DAY
en el ejemplo de código anterior) para limitar el número de filas de destino que se actualizan o eliminan, porque entonces una fila de destino sólo se modifica si esa condición es verdadera para esa fila.
También puedes añadir varias cláusulas WHEN NOT MATCHED BY SOURCE
a una operación MERGE
. Cuando hay varias cláusulas, se evalúan en el orden en que se especifican y todas las WHEN NOT MATCHED BY SOURCE
cláusulas, excepto la última, deben tener condiciones.
Analizar la operación FUSIONAR con DESCRIBIR HISTORIA
Cuando ejecutamos DESCRIBE HISTORY
en la tabla YellowTaxis
en la sección operationsParameters
de la salida, podemos ver nuestro predicado MERGE
:
operation: MERGE [('predicate', '(target.RideId = source.RideId)'), ('matchedPredicates', '[{"actionType":"update"}]'), ('notMatchedPredicates', '[{"actionType":"insert"}]')]
Podemos ver la condición de unión (target.RideId = source.RideId
), el botón matchedPredicate
que especifica una actualización, y el notMatchedPredicate
, que especifica una inserción.
Las secciones de salida de operationMetrics
muestran los detalles de las diferentes acciones:
[('numTargetRowsCopied', '4469890'), ('numTargetRowsDeleted', '0'), ('numTargetFilesAdded', '4'), ('executionTimeMs', '91902'), ('numTargetRowsInserted', '6'), ('scanTimeMs', '8452'), ('numTargetRowsUpdated', '5'), ('numOutputRows', '4469901'), ('numTargetChangeFilesAdded', '0'), ('numSourceRows', '11'), ('numTargetFilesRemoved', '1'), ('rewriteTimeMs', '16782')]
Aquí podemos ver de nuevo que se insertaron seis filas (numTargetRowsInserted
), y se actualizaron cinco (numTargetRowsUpdated
). Se añadieron cuatro nuevos archivos de datos a nuestra tabla Delta, y se eliminó un archivo de datos.
Funcionamiento interno de la operación FUSIONAR
Internamente, Delta Lake completa una operación MERGE
como ésta en dos pasos:
Primero realiza un
inner join
entre la tabla de destino y la tabla de origen para seleccionar todos los archivos de datos que contengan coincidencias. Esto evita que la operación baraje innecesariamente datos que pueden ignorarse con seguridad.A continuación, realiza un
outer join
entre los archivos seleccionados en las tablas destino y origen, y aplica la cláusulaINSERT
,DELETE
oUPDATE
adecuada, según especifique el usuario.
La principal diferencia entre un MERGE
y un UPDATE
o un DELETE
bajo el capó es que Delta Lake utiliza uniones para completar un MERGE
. Esto te permite utilizar algunas estrategias únicas cuando intentas mejorar el rendimiento.
Conclusión
Las operaciones DML como DELETE
, UPDATE
, y MERGE
son operaciones esenciales para cualquier formato de tabla y operaciones ETL, todas ellas habilitadas a través del registro de transacciones. Aprovechando estas operaciones, puedes empezar a gestionar eficazmente los cambios en los datos y mantener la integridad de los mismos en tu plataforma de datos.
De forma similar a las tablas de un RDBMS tradicional, en este capítulo has leído que con las tablas Delta puedes realizar operaciones DELETE
, UPDATE
, y MERGE
, pero también puedes aplicar estas operaciones utilizando SQL o la API DataFrame. Y lo que es más importante, aprendiste lo que ocurre bajo el capó en Delta Lake con los archivos subyacentes en el directorio de tablas Delta, y cómo el registro de transacciones registra y rastrea estos diferentes tipos de entradas. Utilizando el comando DESCRIBE HISTORY
, podemos ver detalles sobre la salida de las transacciones de una tabla. Cada una de estas operaciones también puede aprovechar predicados para reducir el número de archivos escaneados y mejorar el rendimiento. Aparte de utilizar predicados durante las operaciones, existen otras técnicas de ajuste del rendimiento que se pueden aplicar a las tablas Delta y que conocerás en el capítulo siguiente.
Get Lago Delta: En marcha now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.