Capítulo 4. Reducciones en Spark

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Este capítulo se centra en las transformaciones de reducción sobre RDDs en Spark. En concreto, trabajaremos con RDDs de pares (clave, valor), que son una abstracción de datos común necesaria para muchas operaciones en Spark. Pueden ser necesarias algunas operaciones ETL iniciales para obtener tus datos en forma (clave, valor), pero con los RDD de pares puedes realizar cualquier agregación que desees sobre un conjunto de valores.

Spark admite varias potentes transformaciones y acciones de reducción. Las transformaciones de reducción más importantes son:

  • reduceByKey()

  • combineByKey()

  • groupByKey()

  • aggregateByKey()

Todas las transformaciones de *ByKey() aceptan un origenRDD[(K, V)] y crean un destinoRDD[(K, C)] (para algunas transformaciones, como reduceByKey(), V y Cson lo mismo). La función de estas transformaciones es reducir todos los valores de una clave dada (para todas las claves únicas), encontrando, por ejemplo

  • La media de todos los valores

  • La suma y el recuento de todos los valores

  • La moda y la mediana de todos los valores

  • La desviación típica de todos los valores

Selección de la Transformación de Reducción

Al igual que con las transformaciones del mapeador, es importante seleccionar la herramienta adecuada para el trabajo. Para algunas operaciones de reducción (como hallar la mediana), el reductor necesita acceder a todos los valores al mismo tiempo. Para otras, como hallar la suma o el recuento de todos los valores, no. Si quieres hallar la mediana de los valores por clave, entonces groupByKey()será una buena elección, pero esta transformación no va bien si una clave tiene muchos valores (lo que podría causar un problema de OOM). En cambio, si quieres hallar la suma o recuento de todos los valores, entonces reduceByKey() puede ser una buena elección: fusiona los valores de cada clave utilizando una función de reducción asociativa y conmutativa.

Este capítulo te mostrará cómo utilizar las transformaciones de reducción más importantes de Spark, mediante sencillos ejemplos prácticos de PySpark. Nos centraremos en las transformaciones más utilizadas en las aplicaciones Spark. También hablaré del concepto general de reducción, y de los monoides como principio de diseño de algoritmos de reducción eficientes. Empezaremos viendo cómo crear RDD de pares, que son necesarios para las transformaciones de reducción de Spark.

Crear RDD de pares

Dado un conjunto de claves y sus valores asociados, una transformación de reducción reduce los valores de cada clave mediante un algoritmo (suma de valor, mediana de valores, etc.). Por tanto, las transformaciones de reducción presentadas en este capítulo trabajan sobre pares (clave, valor), lo que significa que los elementos del RDD deben ajustarse a este formato. Hay varias formas de crear RDD de pares en Spark. Por ejemplo, también puedes utilizarparallelize() sobre colecciones (como listas de tuplas y diccionarios), como se muestra aquí:

>>> key_value = [('A', 2), ('A', 4), ('B', 5), ('B', 7)]
>>> pair_rdd = spark.sparkContext.parallelize(key_value)
>>> pair_rdd.collect() 1
[('A', 2), ('A', 4), ('B', 5), ('B', 7)]
>>> pair_rdd.count()
4
>>> hashmap = pair_rdd.collectAsMap()
>>> hashmap
{'A': 4, 'B': 7}
1

pair_rdd tiene dos teclas, {'A', 'B'}.

A continuación, supongamos que tienes datos meteorológicos y quieres crear pares de (city_id, temperature). Puedes hacerlo utilizando la transformación map(). Supongamos que tu entrada tiene el siguiente formato:

<city_id><,><latitude><,><longitude><,><temperature>

Primero, define una función para crear los pares (clave, valor) deseados:

def create_key_value(rec):
  tokens = rec.split(",")
  city_id = tokens[0]
  temperature = tokens[3]
  return (city_id, temperature) 1
1

La clave es city_id y el valor es temperature.

A continuación, utiliza map() para crear tu par RDD:

input_path = <your-temperature-data-path>
rdd = spark.sparkContext.textFile(input_path)
pair_rdd = rdd.map(create_key_value)
# or you can write this using a lambda expression as:
# pair_rdd = rdd.map(lambda rec: create_key_value(rec))

Hay muchas otras formas de crear RDD de pares (clave, valor): reduceByKey() por ejemplo, acepta un origen RDD[(K, V)]y produce un destino RDD[(K, V)], y combineByKey() acepta un origen RDD[(K, V)] y produce un destino RDD[(K, C)].

Transformaciones de reducción

Normalmente, una transformación de reducción reduce el tamaño de los datos de un lote grande de valores (como una lista de números) a uno más pequeño. Algunos ejemplos de reducciones son:

  • Hallar la suma y la media de todos los valores

  • Encontrar la media, la moda y la mediana de todos los valores

  • Calcular la media y la desviación típica de todos los valores

  • Encontrar el (min, max, count) de todos los valores

  • Encontrar los 10 primeros de todos los valores

En pocas palabras, una transformación de reducción se corresponde aproximadamente con la operación de pliegue (también llamada reducir, acumular o agregar) en programación funcional. La transformación se aplica a todos los elementos de los datos (por ejemplo, al hallar la suma de todos los elementos) o a todos los elementos por clave (por ejemplo, al hallar la suma de todos los elementos por clave).

En la Figura 4-1 se ilustra una reducción simple de la suma sobre un conjunto de números {47, 11, 42, 13} para una sola partición.

daws 0401
Figura 4-1. Una reducción por adición en una sola partición

La Figura 4-2 muestra una reducción que suma los elementos de dos particiones. Los valores reducidos finales de la Partición-1 y la Partición-2 son 21 y 18. Cada partición realiza reducciones locales y, finalmente, se reducen los resultados de las dos particiones.

daws 0402
Figura 4-2. Una reducción de suma sobre dos particiones

El reductor es un concepto básico de la programación funcional, que se utiliza para transformar un conjunto de objetos (como números, cadenas o listas) en un único valor (como la suma de números o la concatenación de objetos de cadena). Spark y el paradigma MapReduce utilizan este concepto para agregar un conjunto de valores en un único valor por clave. Considera los siguientes pares (clave, valor), donde la clave es unString y el valor es una lista de Integers:

(key1, [1, 2, 3])
(key2, [40, 50, 60, 70, 80])
(key3, [8])

El reductor más sencillo será una función de suma sobre un conjunto de valores por clave. Después de aplicar esta función, el resultado será:

(key1, 6)
(key2, 300)
(key3, 8)

O puedes reducir cada (clave, valor) a (clave, par) donde el par es(sum-of-values, count-of-values):

(key1, (6, 3))
(key2, (300, 5))
(key3, (8, 1))

Los reductores están diseñados para funcionar de forma concurrente e independiente, lo que significa que no hay sincronización entre los reductores. Cuantos más recursos tenga un clúster Spark, más rápido se podrán realizar las reducciones. En el peor de los casos, si sólo tenemos un reductor, la reducción funcionará como una operación de cola. En general, un clúster ofrecerá muchos reductores (dependiendo de la disponibilidad de recursos) para la transformación de reducción.

En MapReduce y en los algoritmos distribuidos, la reducción es una operación necesaria para resolver un problema. En el paradigma de programación MapReduce, el programador define un mapeador y un reductor con las siguientes firmas map()y reduce() (observa que [] denota un iterable):

map()

(K1, V1) → [(K2, V2)]

reduce()

(K2, [V2]) → [(K3, V3)]

La función map() mapea un par (clave1, valor1) en un conjunto de pares (clave2, valor2). Una vez completadas todas las operaciones de mapeo, la ordenación y barajado se realizan automáticamente (esta funcionalidad la proporciona el paradigma MapReduce, no la implementa el programador). La fase de ordenación y barajado de MapReduce es muy similar a la transformación groupByKey()de Spark.

La función reduce() reduce un par (clave2, [valor2]) a un conjunto de pares (clave3, valor3). Por tanto, podemos decir que una transformación de reducción toma una lista de valores y la reduce a un resultado tangible (como la suma de valores, la media de valores o la estructura de datos que desees).

Reducciones de Spark

Spark proporciona un rico conjunto de transformaciones de reducción fáciles de usar. Como se indicó al principio de este capítulo, nos centraremos en las reducciones de RDD de pares. Por tanto, supondremos que cada RDD tiene un conjunto de claves y para cada clave (como K) tenemos un conjunto de valores:

{ (K, V1), (K, V2), ..., (K, Vn) }

La Tabla 4-1 enumera las transformaciones de reducción disponibles en Spark.

Tabla 4-1. Transformaciones de reducción de Spark
Transformación Descripción

aggregateByKey()

Agrega los valores de cada clave utilizando las funciones combinadas dadas y un "valor cero" neutro

combineByKey()

Función genérica para combinar los elementos de cada clave utilizando un conjunto personalizado de funciones de agregación

countByKey()

Cuenta el número de elementos de cada clave y devuelve el resultado al maestro como diccionario

foldByKey()

Fusiona los valores de cada clave utilizando una función asociativa y un "valor cero" neutro

groupByKey()

Agrupa los valores de cada clave del RDD en una única secuencia

reduceByKey()

Fusiona los valores de cada clave utilizando una función de reducción asociativa y conmutativa

sampleByKey()

Devuelve un subconjunto de este RDD muestreado por clave, utilizando frecuencias de muestreo variables para las distintas claves especificadas por fracciones

sortByKey()

Ordena el RDD por clave, de forma que cada partición contenga un rango ordenado de los elementos en orden ascendente

Todas estas funciones de transformación actúan sobre pares (clave, valor) representados por RDDs. En este capítulo, sólo veremos las reducciones de datos sobre un conjunto de claves únicas dadas. Por ejemplo, dados los siguientes pares (clave, valor) para la clave K:

{ (K, V1), (K, V2), ..., (K, Vn) }

suponemos que K tiene una lista de valores n (> 0):

[ V1, V2, ..., Vn ]

Para simplificarlo, el objetivo de la reducción es generar el siguiente par (o conjunto de pares):

(K, R)

donde:

f(V1, V2, ..., Vn) -> R

La función f() se denominareductor o función de reducción. Las transformaciones de reducción de Spark aplican esta función sobre una lista de valores para encontrar el valor reducido, R. Ten en cuenta que Spark no impone ningún orden entre los valores ([V1, V2, ..., Vn]) a reducir.

Este capítulo incluirá ejemplos prácticos de soluciones que demuestran el uso de las transformaciones de reducción más comunes de Spark: reduceByKey(), groupByKey(), aggregateByKey(), y combineByKey(). Para empezar, veamos un ejemplo muy sencillo de la transformación groupByKey(). Como muestra el ejemplo de la Figura 4-3, funciona de forma similar a la sentencia SQL GROUP BY. En este ejemplo, tenemos cuatro claves, {A, B, C, P}, y sus valores asociados se agrupan como una lista de enteros. El RDD de origen es un RDD[(String, Integer)], donde cada elemento es un par de (String, Integer). El RDD de destino es un RDD[(String, [Integer])], donde cada elemento es un par de (String, [Integer]); el valor es una lista iterable de enteros.

daws 0403
Figura 4-3. La transformación groupByKey()
Nota

Por defecto, las reducciones Spark no ordenan los valores reducidos. Por ejemplo, en la Figura 4-3, el valor reducido de la clave B podría ser [4, 8] o [8, 4]. Si lo deseas, puedes ordenar los valores antes de la reducción final. Si tu algoritmo de reducción requiere ordenación, debes ordenar los valores explícitamente.

Ahora que ya tienes una idea general de cómo funcionan los reductores, pasemos a un ejemplo práctico que demuestra cómo se pueden utilizar diferentes transformaciones de reducción de Spark para resolver un problema de datos.

Ejemplo sencillo de calentamiento

Supongamos que tenemos una lista de pares (K, V), donde K (la clave) es un String y V (el valor) es un Integer:

[
 ('alex', 2), ('alex', 4), ('alex', 8),
 ('jane', 3), ('jane', 7),
 ('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
 ('clint', 9)
]

En este ejemplo, tenemos cuatro claves únicas:

{ 'alex', 'jane', 'rafa', 'clint' }

Supongamos que queremos combinar (sumar) los valores por clave. El resultado de esta reducción será

[
 ('alex', 14),
 ('jane', 10),
 ('rafa', 15),
 ('clint', 9)
]

donde:

key: alex =>    14 = 2+4+8
key: jane =>    10 = 3+7
key: rafa =>    15 = 1+3+5+6
key: clint =>    9 (single value, no operation is done)

Hay muchas formas de sumar estos números para obtener el resultado deseado. ¿Cómo llegamos a estos pares reducidos (clave, valor)? Para este ejemplo, podríamos utilizar cualquiera de las transformaciones habituales de Spark. Agregar o combinar los valores porclave es un tipo de reducción-en el paradigma clásico MapReduce, esto se denomina función reducir por clave(o simplemente reducir). El marco MapReduce llama a la función de reducción de la aplicación (definida por el usuario) una vez por cada clave única. La función itera a través de los valores que están asociados a esa clave y produce cero o más salidas como pares (clave, valor), resolviendo el problema de combinar los elementos de cada clave única en un único valor. (Ten en cuenta que, en algunas aplicaciones, el resultado puede ser más de un único valor).

Aquí presento cuatro soluciones diferentes utilizando las transformaciones de Spark. Para todas las soluciones, utilizaremos los siguientes RDD de Python data ykey_value_pairs:

>>> data = 1
[
 ('alex', 2), ('alex', 4), ('alex', 8),
 ('jane', 3), ('jane', 7),
 ('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
 ('clint', 9)
]
>>> key_value_pairs = spark.SparkContext.parallelize(data) 2
>>> key_value_pairs.collect()
[
 ('alex', 2), ('alex', 4), ('alex', 8),
 ('jane', 3), ('jane', 7),
 ('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
 ('clint', 9)
]
1

data es una colección de Python: una lista de pares (clave, valor).

2

key_value_pairs es un RDD[(String, Integer)].

Resolver con reduceByKey()

Sumar los valores de una clave determinada es bastante sencillo: suma los dos primeros valores, luego el siguiente, y sigue así. La transformación reduceByKey() de Spark fusiona los valores de cada clave utilizando una función reductora asociativa y conmutativa. Se utilizan combinadores (minirreductores optimizados) en todos los nodos del clúster antes de fusionar los valores por partición.

Para la transformación reduceByKey(), el RDD de origen es un RDD[(K, V)] y el RDD de destino es un RDD[(K, V)]. Ten en cuenta que los tipos de datos de origen y destino de los valores del RDD (V) son los mismos. Se trata de una limitación de reduceByKey(), que puede evitarse utilizandocombineByKey() o aggregateByKey()).

Podemos aplicar la transformación reduceByKey() utilizando una expresión lambda (función anónima):

# a is (an accumulated) value for key=K
# b is a value for key=K
sum_per_key = key_value_pairs.reduceByKey(lambda a, b: a+b)
sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]

Como alternativa, podemos utilizar una función definida, como add:

from operator import add
sum_per_key = key_value_pairs.reduceByKey(add)
sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]

La agregación de valores por clave mediante reduceByKey() es una solución optimizada, ya que la agregación se produce a nivel de partición antes de la agregación final de todas las particiones.

Resolver con groupByKey()

También podemos resolver este problema utilizando la transformación groupByKey(), pero esta solución no funcionará tan bien porque implica mover muchos datos a los nodos reductores (aprenderás más sobre por qué es así cuando hablemos del paso de barajar más adelante en este capítulo).

Con la transformación reduceByKey(), el RDD de origen es un RDD[(K, V)] y el RDD de destino es un RDD[(K, [V])]. Ten en cuenta que los tipos de datos de origen y destino no son iguales: el tipo de datos de valor para el RDD de origen es V, mientras que para el RDD de destino es [V] (un iterable/lista de Vs).

El siguiente ejemplo demuestra el uso de groupByKey() con una expresión lambda para sumar los valores por clave:

sum_per_key = key_value_pairs
                 .grouByKey() 1
                 .mapValues(lambda values: sum(values)) 2
sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]
1

Agrupa los valores por clave (similar a GROUP BY de SQL). Ahora cada clave tendrá un conjunto de valores Integer; por ejemplo, los tres pares{('alex', 2), ('alex', 4), ('alex', 8)}se reducirán a un único par, ('alex', [2, 4, 8]).

2

Añade valores por clave utilizando la función sum() de Python.

Resolver con aggregateByKey()

En su forma más simple, la transformación aggregateByKey()se define como:

aggregateByKey(zero_value, seq_func, comb_func)

source RDD: RDD[(K, V)]
target RDD: RDD[(K, C))

Agrega los valores de cada clave del RDD de origen en un RDD de destino, utilizando las funciones de combinación dadas y un "valor cero" neutro (el valor inicial utilizado para cada partición). Esta función puede devolver un tipo de resultado distinto (C) del tipo de losvalores del RDD de origen (V), aunque en este ejemplo ambos son tipos de datos Integer. Por lo tanto, necesitamos una operación para fusionar valores dentro de una única partición (fusionar valores de tipo V en un valor de tipo C) y una operación para fusionar valores entre particiones (fusionar valores de tipo C de múltiples particiones). Para evitar la asignación innecesaria de memoria, ambas funciones pueden modificar y devolver su primer argumento en lugar de crear un nuevo C.

El siguiente ejemplo demuestra el uso de latransformación aggregateByKey():

# zero_value -> C
# seq_func: (C, V) -> C
# comb_func: (C, C) -> C

>>> sum_per_key = key_value_pairs.aggregateByKey(
... 0, 1
... (lambda C, V: C+V), 2
... (lambda C1, C2: C1+C2) 3
... )
>>> sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]
1

El zero_value aplicado en cada partición es 0.

2

seq_func se utiliza en una única partición.

3

comb_func se utiliza para combinar los valores de las particiones.

Resolver con combineByKey()

La transformación combineByKey() es la más general y potente de las transformaciones de reducción de Spark. En su forma más simple, se define como:

combineByKey(create_combiner, merge_value, merge_combiners)

source RDD: RDD[(K, V)]
target RDD: RDD[(K, C))

Al igual que aggregateByKey(), la transformación combineByKey() convierte un origenRDD[(K, V)] en un destino RDD[(K, C)]. De nuevo, V y Cpueden ser tipos de datos diferentes (esto forma parte de la potencia de combineByKey()-por ejemplo, V puede ser una String o Integer, mientras que C puede ser una lista, tupla o diccionario), pero para este ejemplo ambos son tipos de datos Integer.

La interfaz combineByKey() nos permite personalizar el comportamiento de reducción y combinación, así como el tipo de datos. Así, para utilizar esta transformación tenemos que proporcionar tres funciones:

create_combiner

Esta función convierte un único V en un C (por ejemplo, creando una lista de un elemento). Se utiliza dentro de una única partición para inicializar un C.

merge_value

Esta función fusiona un Ven un C (por ejemplo, añadiéndolo al final de una lista). Se utiliza dentro de una misma partición para agregar valores en un C.

merge_combiners

Esta función combina dos Cs en un único C(por ejemplo, fusionando las listas). Se utiliza para fusionar valores de dos particiones.

Nuestra solución con combineByKey() tiene este aspecto:

>>> sum_per_key = key_value_pairs.combineByKey(
...           (lambda v: v), 1
...           (lambda C,v: C+v), 2
...           (lambda C1,C2: C1+C2) 3
... )
>>> sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]
1

create_combiner crea los valores iniciales en cada partición.

2

merge_value fusiona los valores de una partición.

3

merge_combiners fusiona los valores de las distintas particiones en el resultado final.

Para que te hagas una mejor idea de la potencia de la transformación combineByKey(), veamos otro ejemplo. Supongamos que queremos hallar la media de valores por clave. Para solucionarlo, podemos crear un tipo de datos combinado (C) como (sum, count), que contendrá las sumas de valores y sus recuentos asociados:

# C = combined type as (sum, count)
>>> sum_count_per_key = key_value_pairs.combineByKey(
...           (lambda v: (v, 1)),
...           (lambda C,v: (C[0]+v, C[1]+1),
...           (lambda C1,C2: (C1[0]+C2[0], C1[1]+C2[1]))
... )
>>> mean_per_key = sum_count_per_key.mapValues(lambda C: C[0]/C[1])

Dadas tres particiones denominadas {P1, P2, P3},la Figura 4-4 muestra cómo crear un combinador (tipo de datos C), cómo fusionar un valor en un combinador y, por último, cómo fusionar dos combinadores.

daws 0404
Figura 4-4. Ejemplo de transformacióncombineByKey()

A continuación, hablaré del concepto de monoide, que te ayudará a comprender cómo funcionan los combinadores en las transformaciones reductoras.

¿Qué es un monoide?

Los monoides son un principio de diseño útil para escribir algoritmos MapReduce eficientes.1 Si no entiendes los monoides, puedes escribir algoritmos reductores que no produzcan resultados semánticamente correctos. Si tu reductor es un monoide, puedes estar seguro de que producirá resultados correctos en un entorno distribuido.

Dado que las reducciones de Spark se ejecutan partición a partición (es decir, tu función reductora está distribuida en lugar de ser una función secuencial), para obtener la salida adecuada debes asegurarte de que tu función reductora es semánticamente correcta. En breve veremos algunos ejemplos de utilización de monoides, pero antes examinemos el concepto matemático subyacente.

En álgebra, un monoide es una estructura algebraica con una única operación binaria asociativa y un elemento identidad (también llamado elemento cero).

Para nuestros propósitos, podemos definir informalmente un monoide como M = (T, f, Zero), donde:

  • T es un tipo de dato.

  • f() es una operación binaria: f: (T, T) -> T.

  • Zero es una instancia de T.

Nota

Zero es un elemento identidad (neutro) de tipo T; no es necesariamente el número cero.

Si a, b, c, y Zero son del tipo T, para que el triple (T, f, Zero) sea un monoide deben cumplirse las siguientes propiedades:

  • Operación binaria

    f: (T, T) -> T
  • Elemento neutro

    for all a in T:
    
    f(Zero, a) = a
    f(a, Zero) = a
  • Asociatividad

    for all a, b, c in T:
    
    f(f(a, b), c) = f(a, f(b, c))

No toda operación binaria es un monoide. Por ejemplo, la función mean() sobre un conjunto de enteros no es una función asociativa y, por tanto, no es un monoide, como muestra la siguiente demostración:

mean(10, mean(30, 50)) != mean(mean(10, 30), 50)

where

   mean(10, mean(30, 50))
      = mean (10, 40)
      = 25

   mean(mean(10, 30), 50)
      = mean (20, 50)
      = 35

   25 != 35

¿Qué significa esto? Dado unRDD[(String, Integer)], podríamos tener la tentación de escribir la siguiente transformación para hallar una media por clave:

# rdd: RDD[(String, Integer)]
# WRONG REDUCTION to find average by key
avg_by_key = rdd.reduceByKey(lambda x, y: (x+y)/2)

Pero esto no producirá los resultados correctos, porque la media de medias no es una media; en otras palabras, la función media/promedio utilizada aquí no es un monoide. Supongamos que este rdd tiene tres elementos:{("A", 1), ("A", 2), ("A", 3)}; {("A", 1), ("A", 2)} están en la partición 1 y {("A", 3)}está en la partición 2. Utilizando la solución anterior se obtendrán los valores agregados de ("A", 1.5) para la partición 1 y ("A", 3.0) para la partición 2. Combinando los resultados de las dos particiones obtendremos una media final de (1,5 + 3,0) / 2 = 2,25, que no es el resultado correcto (la media de los tres valores es 2,0). Si tu reductor es un monoide, está garantizado que se comporta correctamente y produce resultados correctos.

Ejemplos Monoides y No Monoides

Para ayudarte a comprender y reconocer los monoides, veamos algunos ejemplos de monoides y no monoides. Los siguientes son ejemplos de monoides:

  • Números enteros con suma:

    ((a + b ) + c) = (a + (b + c))
    0 + n = n
    n + 0 = n
    The zero element for addition is the number 0.
  • Enteros con multiplicación:

    ((a * b) * c) = (a * (b * c))
    1 * n = n
    n * 1 = n
    The zero element for multiplication is the number 1.
  • Cadenas con concatenación:

    (a + (b + c)) = ((a + b) + c)
    "" + s = s
    s + "" = s
    The zero element for concatenation is an empty string of size 0.
  • Listas con concatenación:

    List(a, b) + List(c, d) = List(a,b,c,d)
  • Conjuntos con su unión:

    Set(1,2,3) + Set(2,4,5)
       = Set(1,2,3,2,4,5)
       = Set(1,2,3,4,5)
    
    S + {} = S
    {} + S = S
    The zero element is an empty set {}.

Y aquí tienes algunos ejemplos no monoides:

  • Enteros con función media:

    mean(mean(a,b),c) != mean(a, mean(b,c))
  • Números enteros con resta:

    ((a - b) -c) != (a - (b - c))
  • Enteros con división:

    ((a / b) / c) != (a / (b / c))
  • Enteros con función de modo:

    mode(mode(a, b), c) != mode(a, mode(b, c))
  • Enteros con función mediana:

    median(median(a, b), c) != median(a, median(b, c))

En algunos casos, es posible convertir un no monoide en un monoide. Por ejemplo, con un simple cambio en nuestras estructuras de datos podemos hallar la media correcta de un conjunto de números. Sin embargo, no existe ningún algoritmo para convertir automáticamente una estructura no monoide en un monoide.

Escribir algoritmos distribuidos en Spark es muy diferente de escribir algoritmos secuenciales en un único servidor, porque los algoritmos operan en paralelo sobre datos particionados. Por lo tanto, al escribir un reductor, debes asegurarte de que tu función de reducción es un monoide. Ahora que entiendes este importante concepto, pasemos a algunos ejemplos prácticos.

El problema de la película

El objetivo de este primer ejemplo es presentar un problema básico y, a continuación, ofrecer soluciones utilizando distintas transformaciones de reducción de Spark mediante PySpark. Para todas las transformaciones de reducción, he seleccionado cuidadosamente los tipos de datos de forma que formen un monoide.

El problema de las películas se puede plantear de la siguiente manera: dado un conjunto de usuarios, películas y valoraciones, (en el intervalo de 1 a 5), queremos encontrar la valoración media de todas las películas de un usuario. Así, si el usuario con userID=100) ha valorado cuatro películas:

(100, "Lion King", 4.0)
(100, "Crash", 3.0)
(100, "Dead Man Walking", 3.5)
(100, "The Godfather", 4.5)

queremos generar el siguiente resultado

(100, 3.75)

donde:

3.75 = mean(4.0, 3.0, 3.5, 4.5)
     = (4.0 + 3.0 + 3.5 + 4.5) / 4
     = 15.0 / 4

Para este ejemplo, observa que la transformación reduceByKey()sobre un conjunto de valoraciones no siempre producirá la salida correcta, ya que la media (o promedio) no es un monoide algebraico sobre un conjunto de números flotantes/enteros. En otras palabras, como se ha comentado en el apartado anterior, la media de las medias no es igual a la media de todos los números de entrada.He aquí una prueba sencilla. Supongamos que queremos hallar la media de seis valores (los números 1-6), almacenados en una única partición. Podemos hacerlo con la función mean() de la siguiente manera:

mean(1, 2, 3, 4, 5, 6)
   = (1 + 2 + 3 + 4 + 5 + 6) / 6
   = 21 / 6
   = 3.5 [correct result]

Ahora, hagamos que mean() funcione como una función distribuida. Supongamos que los valores se almacenan en tres particiones:

Partition-1: (1, 2, 3)
Partition-2: (4, 5)
Partition-3: (6)

En primer lugar, calculamos la media de cada partición:

mean(1, 2, 3, 4, 5, 6)
  =  mean (
           mean(Partition-1),
           mean(Partition-2),
           mean(Partition-3)
          )

mean(Partition-1)
  = mean(1, 2, 3)
  = mean( mean(1,2), 3)
  = mean( (1+2)/2, 3)
  = mean(1.5, 3)
  = (1.5+3)/2
  = 2.25

mean(Partition-2)
  = mean(4,5)
  = (4+5)/2
  = 4.5

mean(Partition-3)
  = mean(6)
  = 6

A continuación, hallamos la media de estos valores. Por tanto, una vez procesadas todas las particiones, obtenemos

mean(1, 2, 3, 4, 5, 6)
  =  mean (
           mean(Partition-1),
           mean(Partition-2),
           mean(Partition-3)
          )
  = mean(2.25, 4.5, 6)
  = mean(mean(2.25, 4.5), 6)
  = mean((2.25 + 4.5)/2, 6)
  = mean(3.375, 6)
  = (3.375 + 6)/2
  = 9.375 / 2
  = 4.6875  [incorrect result]

Para evitar este problema, podemos utilizar una estructura de datos monoide (que admite la asociatividad y la conmutatividad) como un par de (sum, count), donde sum es la suma total de todos los números que hemos visto hasta ahora (por partición) y count es el número de valoraciones que hemos visto hasta ahora. Si definimos nuestra función mean() como

mean(pair(sum, count)) = sum / count

obtenemos:

mean(1,2,3,4,5,6)
  = mean(mean(1,2,3), mean(4,5), mean(6))
  = mean(pair(1+2+3, 1+1+1), pair(4+5, 1+1), pair(6,1))
  = mean(pair(6, 3), pair(9, 2), pair(6,1))
  = mean(mean(pair(6, 3), pair(9, 2)), pair(6,1))
  = mean(pair(6+9, 3+2), pair(6,1))
  = mean(pair(15, 5), pair(6,1))
  = mean(pair(15+6, 5+1))
  = mean(pair(21, 6))
  = 21 / 6 = 3.5 [correct result]

Como muestra este ejemplo, utilizando un monoide podemos conseguir la asociatividad. Por tanto, puedes aplicar la transformación reduceByKey() cuando tu función f() sea conmutativa y asociativa:

# a = (sum1, count1)
# b = (sum2, count2)
# f(a, b) = a + b
#         = (sum1+sum2, count1+count2)
#
reduceByKey(lambda a, b: f(a, b))

Por ejemplo, la operación de suma (+) es conmutativa y asociativa, pero la función media/promedio no satisface estas propiedades.

Nota

Como vimos en el Capítulo 1, una función conmutativa garantiza que el resultado sea independiente del orden de los elementos del RDD que se está agregando:

f(A, B) = f(B, A)

Una función asociativa garantiza que el orden en que se agrupan los elementos durante la agregación no afecte al resultado final:

f(f(A, B), C) = f(A, f(B, C))

Conjunto de datos de entrada para analizar

Los datos de muestra que utilizaremos para este problema son un conjunto de datos deMovieLens. Para simplificar, supondré que has descargado y descomprimido los archivos en un directorio /tmp/movielens/. Ten en cuenta que no es obligatorio colocar los archivos en la ubicación sugerida; puedes colocarlos en el directorio que prefieras y actualizar las rutas de entrada en consecuencia.

Consejo

El conjunto de datos completo de MovieLens(ml-latest.zip) ocupa 265 MB. Si quieres utilizar un conjunto de datos más pequeño para ejecutar, probar y depurar los programas que se indican aquí, puedes descargar en su lugar el conjunto de datos pequeño MovieLens, un archivo de 1 MB que consta de 100.000 valoraciones y 3.600 aplicaciones de etiquetas aplicadas a 9.000 películas por 600 usuarios.

Todas las valoraciones están contenidas en el archivovaloraciones.csv. Cada línea de este archivo, después de la fila de encabezamiento, representa una valoración de una película por un usuario, y tiene el siguiente formato:

<userId><,><movieId><,><rating><,><timestamp>

En este archivo:

  • Las líneas se ordenan primero por userId, y luego, para cada usuario, por movieId.

  • Las valoraciones se hacen en una escala de 5 estrellas, con incrementos de media estrella (de 0,5 estrellas a 5,0 estrellas).

  • Las marcas de tiempo representan segundos desde la medianoche del Tiempo Universal Coordinado (UTC) del 1 de enero de 1970 (este campo se ignora en nuestro análisis).

Después de descomprimir el archivo descargado, deberías tener los siguientes archivos:

$ ls -l /tmp/movielens/
       8,305  README.txt
     725,770  links.csv
   1,729,811  movies.csv
 620,204,630  ratings.csv
  21,094,823  tags.csv

En primer lugar, comprueba el número de registros (el número de registros que ves puede ser diferente en función de cuándo descargaste el archivo):

$ wc -l /tmp/movielens/ratings.csv
22,884,378 /tmp/movielens/ratings.csv

A continuación, echa un vistazo a los primeros registros:

$ head -6 /tmp/movielens/ratings.csv
userId,movieId,rating,timestamp
1,169,2.5,1204927694
1,2471,3.0,1204927438
1,48516,5.0,1204927435
2,2571,3.5,1436165433
2,109487,4.0,1436165496

Como estamos utilizando RDDs, no necesitamos los metadatos asociados a los datos. Por lo tanto, podemos eliminar la primera línea (la línea de encabezamiento) del archivoratings.csv:

$ tail -n +2 ratings.csv > ratings.csv.no.header
$ wc -l ratings.csv ratings.csv.no.header
22,884,378 ratings.csv
22,884,377 ratings.csv.no.header

Ahora que hemos adquirido nuestros datos de muestra, podemos trabajar con algunas soluciones a este problema. La primera solución utilizará aggregateByKey(), pero antes presentaré la lógica de esta transformación.

La transformación aggregateByKey()

La transformación aggregateByKey() de Spark inicializa cada clave de cada partición con el valor cero, que es un tipo de datos combinado inicial (C); se trata de un valor neutro, normalmente (0, 0) si el tipo de datos combinado es (sum, count). Este valor cero se fusiona con el primer valor de la partición para crear un nuevo C, que luego se fusiona con el segundo valor. Este proceso continúa hasta que hayamos fusionado todos los valores de esa clave. Por último, si la misma clave existe en varias particiones, estos valores se combinan para producir el C final.

Las figuras 4-5 y 4-6 muestran cómo funcionaaggregateByKey() con distintos valores cero. El valor cero se aplica por clave, por partición. Esto significa que si una clave X está en N particiones, el valor cero se aplica N veces (cada una de estas N particiones se inicializará con el valor cero para la clave X). Por lo tanto, es importante seleccionar este valor con cuidado.

La Figura 4-5 muestra cómo funciona aggregateByKey()con zero-value=(0, 0).

daws 0405
Figura 4-5. aggregateByKey() con zero-value=(0, 0)

Normalmente, utilizarías (0, 0), pero la Figura 4-6 muestra cómo funciona la misma transformación con un valor cero de (10, 20).

daws 0406
Figura 4-6. aggregateByKey() con zero-value=(10, 20)

Primera solución utilizando aggregateByKey()

Para encontrar la valoración media de cada usuario, el primer paso es mapear cada registro en pares (clave, valor) de la forma

(userID-as-key, rating-as-value)

La forma más sencilla de sumar valores por clave es utilizar la transformación reduceByKey(), pero no podemos utilizar reduceByKey()para hallar la valoración media por usuario porque, como hemos visto, la función media/promedio no es un monoide sobre un conjunto de valoraciones (como números flotantes). Para que sea una operación monoide, utilizamos una estructura de datos de pares (una tupla de dos elementos) para contener un par de valores, (sum, count), donde sum es la suma agregada de valoraciones y countes el número de valoraciones que hemos añadido (sumado) hasta ahora, y utilizamos la transformación aggregateByKey().

Vamos a demostrar que la estructura de pares(sum, count) con un operador de suma sobre un conjunto de números es un monoide.

Si utilizamos (0.0, 0) como elemento cero, es neutro:

f(A, Zero) = A
f(Zero, A) = A

A = (sum, count)

f(A, Zero)
  = (sum+0.0, count+0)
  = (sum, count)
  = A

f(Zero, A)
  = (0.0+sum, 0+count)
  = (sum, count)
  = A

La operación es conmutativa (es decir, el resultado es independiente del orden de los elementos del RDD que se agregan):

f(A, B) = f(B, A)

A = (sum1, count1)
B = (sum2, count2)

f(A, B)
  = (sum1+sum2, count1+count2)
  = (sum2+sum1, count2+count1)
  = f(B, A)

También es asociativo (el orden en que se agregan los elementos no afecta al resultado final):

f(f(A, B), C) = f(A, f(B, C))

A = (sum1, count1)
B = (sum2, count2)
C = (sum3, count3)

f(f(A, B), C)
  = f((sum1+sum2, count1+count2), (sum3, count3))
  = (sum1+sum2+sum3, count1+count2+count3)
  = (sum1+(sum2+sum3), count1+(count2+count3))
  = f(A, f(B, C))

Para simplificar las cosas, definiremos una función Python muy básica, create_pair(), que acepta un registro de datos de clasificación de películas y devuelve un par de (userID, rating):

# Define a function that accepts a CSV record
# and returns a pair of (userID, rating)
# Parameters: rating_record (as CSV String)
# rating_record = "userID,movieID,rating,timestamp"
def create_pair(rating_record):
	tokens = rating_record.split(",")
	userID = tokens[0]
	rating = float(tokens[2])
	return (userID, rating)
#end-def

A continuación, probamos la función:

key_value_1 = create_pair("3,2394,4.0,920586920")
print key_value_1
('3', 4.0)

key_value_2 = create_pair("1,169,2.5,1204927694")
print key_value_2
('1', 2.5)

Aquí tienes una solución PySpark utilizando aggregateByKey() y nuestra función create_pair(). El tipo combinado (C) para denotar valores para la operaciónaggregateByKey() es un par de (sum-of-ratings, count-of-ratings).

# spark: an instance of SparkSession
ratings_path = "/tmp/movielens/ratings.csv.no.header"
rdd = spark.sparkContext.textFile(ratings_path)
# load user-defined Python function
ratings = rdd.map(lambda rec : create_pair(rec)) 1
ratings.count()
#
# C = (C[0], C[1]) = (sum-of-ratings, count-of-ratings)
# zero_value -> C = (0.0, 0)
# seq_func: (C, V) -> C
# comb_func: (C, C) -> C
sum_count = ratings.aggregateByKey( 2
    (0.0, 0), 3
    (lambda C, V: (C[0]+V, C[1]+1)), 4
    (lambda C1, C2: (C1[0]+C2[0], C1[1]+C2[1])) 5
)
1

El RDD fuente, ratings, es un RDD[(String, Float)] donde la clave es un userID y el valor es un rating.

2

El RDD objetivo, sum_count, es un RDD[(String, (Float, Integer))] donde la clave es un userID y el valor es un par (sum-of-ratings, count-of-ratings).

3

C se inicializa con este valor en cada partición.

4

Se utiliza para combinar valores dentro de una misma partición.

5

Se utiliza para combinar los resultados de diferentes particiones.

Vamos a desglosar lo que ocurre aquí. Primero, utilizamos la función aggregateByKey() y creamos un conjunto de resultados "plantilla" con los valores iniciales. Vamos a comenzar los datos como (0.0, 0), por lo que la suma inicial de valoraciones es 0.0 y el recuento inicial de registros es 0. Para cada fila de datos, vamos a hacer algunas sumas. C es la nueva plantilla, así que C[0] se refiere a nuestro elemento "suma" (sum-of-ratings), mientras que C[1] es el elemento "recuento" (count-of-ratings). Por último, combinamos los valores de las distintas particiones. Para ello, simplemente añadimos los valores de C1 a los de C2 basándonos en la plantilla que hemos creado.

Los datos del RDD sum_count acabarán teniendo el siguiente aspecto:

sum_count
  = [(userID, (sum-of-ratings, count-of-ratings)), ...]
  = RDD[(String, (Float, Integer))]

[
  (100, (40.0, 10)),
  (200, (51.0, 13)),
  (300, (340.0, 90)),
  ...
]

Esto nos dice que el usuario 100 ha valorado 10 películas y la suma de todas sus valoraciones ha sido 40,0; el usuario 200 ha valorado 13 películas y la suma de todas sus valoraciones ha sido 51,0

Ahora, para obtener la valoración media real por usuario, tenemos que utilizar la transformación mapValues() y dividir la primera entrada (sum-of-ratings) por la segunda (count-of-ratings):

# x =  (sum-of-ratings, count-of-ratings)
# x[0] = sum-of-ratings
# x[1] = count-of-ratings
# avg = sum-of-ratings / count-of-ratings
average_rating = sum_count.mapValues(lambda x: (x[0]/x[1])) 1
1

average_rating es un RDD[(String, Float)] donde la clave es un userID y el valor es un average-rating.

El contenido de este RDD es el siguiente, que nos da el resultado que buscamos:

average_rating
[
  (100, 4.00),
  (200, 3.92),
  (300, 3.77),
  ...
]

Segunda solución utilizando aggregateByKey()

Aquí presentaré otra solución utilizando la transformación aggregateByKey(). Ten en cuenta que, para ahorrar espacio, he recortado la salida generada por el intérprete de comandos PySpark.

El primer paso es leer los datos y crear pares (clave, valor), donde la clave es un userID y el valor es un rating:

# ./bin/pyspark
SparkSession available as 'spark'.
>>># create_pair() returns a pair (userID, rating)
>>># rating_record = "userID,movieID,rating,timestamp"
>>> def create_pair(rating_record):
...     tokens = rating_record.split(",")
...     return (tokens[0], float(tokens[2]))
...
>>> key_value_test = create_pair("3,2394,4.0,920586920")
>>> print key_value_test
('3', 4.0)
>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>> rdd = spark.sparkContext.textFile(ratings_path)
>>> rdd.count()
22884377
>>> ratings = rdd.map(lambda rec : create_pair(rec))
>>> ratings.count()
22884377
>>> ratings.take(3)
[(u'1', 2.5), (u'1', 3.0), (u'1', 5.0)]

Una vez creados los pares (clave, valor), podemos aplicar la transformación aggregateByKey() para sumar las valoraciones. El valor inicial de (0.0, 0)se utiliza para cada partición, donde 0.0 es la suma de las valoraciones y 0 es el número de valoraciones:

>>># C is a combined data structure, (sum, count)
>>> sum_count = ratings.aggregateByKey( 1
...     (0.0, 0), 2
...     (lambda C, V: (C[0]+V, C[1]+1)), 3
...     (lambda C1, C2: (C1[0]+C2[0], C1[1]+C2[1]))) 4

>>> sum_count.count()
247753

>>> sum_count.take(3)
[
 (u'145757', (148.0, 50)),
 (u'244330', (36.0, 17)),
 (u'180162', (1882.0, 489))
]
1

El RDD objetivo es un RDD[(String, (Float, Integer))].

2

C se inicializa en (0.0, 0) en cada partición.

3

Esta expresión lambda añade un único valor de Va C (utilizado en una única partición).

4

Esta expresión lambda combina los valores de las particiones (suma dos Cs para crear un único C).

Podríamos utilizar funciones Python en lugar de expresiones lambda. Para ello, tendríamos que escribir las siguientes funciones:

# C = (sum, count)
# V is a single value of type Float
def seq_func(C, V):
    return (C[0]+V, C[1]+1)
#end-def

# C1 = (sum1, count1)
# C2 = (sum2, count2)
def comb_func(C1, C2):
    return (C1[0]+C2[0], C1[1]+C2[1])
#end-def

Ahora, podemos calcular sum_countutilizando las funciones definidas:

sum_count = ratings.aggregateByKey(
    (0.0, 0),
    seq_func,
    comb_func
)

En el paso anterior se crearon elementos RDD del siguiente tipo:

(userID, (sum-of-ratings, number-of-ratings))

A continuación, hacemos el cálculo final para hallar la valoración media por usuario:

>>># x refers to a pair of (sum-of-ratings, number-of-ratings)
>>># where
>>>#      x[0] denotes sum-of-ratings
>>>#      x[1] denotes number-of-ratings
>>>
>>> average_rating = sum_count.mapValues(lambda x:(x[0]/x[1]))
>>> average_rating.count()
247753

>>> average_rating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.1176470588235294),
 (u'180162', 3.8486707566462166)
]

A continuación, presentaré una solución al problema de las películas utilizando groupByKey().

Solución completa de PySpark con groupByKey()

Para un conjunto dado de pares (K, V),groupByKey() tiene la siguiente firma:

groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)
groupByKey : RDD[(K, V)] --> RDD[(K, [V])]

Si el RDD de origen es un RDD[(K, V)], la transformación groupByKey() agrupa los valores de cada clave (K) del RDD en una única secuencia como una lista/iterable de Vs. A continuación, hace una partición hash del RDD resultante con el particionador/nivel de paralelismo existente. El orden de los elementos dentro de cada grupo no está garantizado, e incluso puede diferir cada vez que se evalúa el RDD resultante.

Consejo

Puedes personalizar tanto el número de particiones (numPartitions) como la función de partición (partitionFunc).

Aquí presento una solución completa utilizando la transformación groupByKey().

El primer paso es leer los datos y crear pares (clave, valor), donde la clave es un userID y el valor es un rating:

>>># spark: SparkSession
>>> def create_pair(rating_record):
...     tokens = rating_record.split(",")
...     return (tokens[0], float(tokens[2]))
...
>>> key_value_test = create_pair("3,2394,4.0,920586920")
>>> print key_value_test
('3', 4.0)

>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>> rdd = spark.sparkContext.textFile(ratings_path)
>>> rdd.count()
22884377
>>> ratings = rdd.map(lambda rec : create_pair(rec)) 1
>>> ratings.count()
22884377
>>> ratings.take(3)
[
 (u'1', 2.5),
 (u'1', 3.0),
 (u'1', 5.0)
]
1

ratings es unaRDD[(String, Float)]

Una vez creados los pares (clave, valor), podemos aplicar la transformación groupByKey() para agrupar todas las valoraciones de un usuario. Este paso crea (userID, [R1, ..., Rn]) pares, donde R1, ..., Rn son todas las valoraciones de un único userID.

Como verás, la transformación groupByKey()funciona exactamente igual que la GROUP BY de SQL. Agrupa valores de la misma clave como una iterable de valores:

>>> ratings_grouped = ratings.groupByKey() 1
>>> ratings_grouped.count()
247753
>>> ratings_grouped.take(3)
[
 (u'145757', <ResultIterable object at 0x111e42e50>), 2
 (u'244330', <ResultIterable object at 0x111e42dd0>),
 (u'180162', <ResultIterable object at 0x111e42e10>)
]
>>> ratings_grouped.mapValues(lambda x: list(x)).take(3) 3
[
 (u'145757', [2.0, 3.5, ..., 3.5, 1.0]),
 (u'244330', [3.5, 1.5, ..., 4.0, 2.0]),
 (u'180162', [5.0, 4.0, ..., 4.0, 5.0])
]
1

ratings_grouped es un RDD[(String, [Float])] donde la clave es un userID y el valor es una lista de ratings.

2

El nombre completo de ResultIterable es pyspark.resultiterable.ResultIterable.

3

Para depurar, convierte el objeto ResultIterableen una lista de Integers.

Para hallar la valoración media por usuario, sumamos todas las valoraciones de cada userID y luego calculamos las medias:

>>># x refers to all ratings for a user as [R1, ..., Rn]
>>># x: ResultIterable object
>>> average_rating = ratings_grouped.mapValues(lambda x: sum(x)/len(x)) 1
>>> average_rating.count()
247753
>>> average_rating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.12),
 (u'180162', 3.85)
]
1

average_rating es un RDD[(String, Float)] donde la clave es userID y el valor es average-rating.

Solución PySpark completa con reduceByKey()

En su forma más simple, reduceByKey() tiene la siguiente firma (los tipos de datos de origen y destino, V, deben ser iguales):

reduceByKey(func, numPartitions=None, partitionFunc)
reduceByKey: RDD[(K, V)] --> RDD[(K, V)]

reduceByKey() fusiona los valores de cada clave utilizando una función reductora asociativa y conmutativa. También realizará la fusión localmente encada mapeador antes de enviar los resultados a un reductor, de forma similar a un combinador enMapReduce. La salida se particionará con particiones numPartitions, o el nivel de paralelismo por defecto si no se especifica numPartitions. El particionador por defecto es HashPartitioner.

Como queremos hallar la valoración media de todas las películas valoradas por un usuario, y sabemos que la media de medias no es una media (la función media no es un monoide), tenemos que sumar todas las valoraciones de cada usuario y llevar la cuenta del número de películas que ha valorado. Entonces, (sum_of_ratings, number_of_ratings) es un monoide sobre una función de suma, pero al final tenemos que realizar una transformación más mapValues()para hallar la valoración media real dividiendo sum_of_ratings por number_of_ratings. Aquí se da la solución completa utilizando reduceByKey(). Observa que reduceByKey() es más eficiente y escalable que una transformación groupByKey(), ya que la fusión y la combinación se realizan localmente antes de enviar los datos para la reducción final.

Paso 1: Leer datos y crear pares

El primer paso es leer los datos y crear pares (clave, valor), donde la clave es un userID y el valor es un par de (rating, 1). Para utilizar reduceByKey()para hallar medias, necesitamos encontrar el(sum_of_ratings, number_of_ratings). Empezamos leyendo los datos de entrada y creando un RDD[String]:

>>># spark: SparkSession
>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>># rdd: RDD[String]
>>> rdd = spark.sparkContext.textFile(ratings_path)
>>> rdd.take(3)
[
 u'1,169,2.5,1204927694',
 u'1,2471,3.0,1204927438',
 u'1,48516,5.0,1204927435'
]

A continuación, transformamos el RDD[String] en un RDD[(String, (Float, Integer))]:

>>> def create_combined_pair(rating_record):
...     tokens = rating_record.split(",")
...     userID = tokens[0]
...     rating = float(tokens[2])
...     return (userID, (rating, 1))
...
>>># ratings: RDD[(String, (Float, Integer))]
>>> ratings = rdd.map(lambda rec : create_combined_pair(rec)) 1
>>> ratings.count()
22884377
>>> ratings.take(3)
[
 (u'1', (2.5, 1)),
 (u'1', (3.0, 1)),
 (u'1', (5.0, 1))
]
1

Crea el par RDD.

Paso 2: Utiliza reduceByKey() para sumar las valoraciones

Una vez creados los pares (userID, (rating, 1)), podemos aplicar la transformación reduceByKey() para sumar todas las valoraciones y el número de valoraciones de un usuario determinado. El resultado de este paso serán tuplas de(userID, (sum_of_ratings,number_of_ratings)):

>>># x refers to (rating1, frequency1)
>>># y refers to (rating2, frequency2)
>>># x = (x[0] = rating1, x[1] = frequency1)
>>># y = (y[0] = rating2, y[1] = frequency2)
>>># x + y = (rating1+rating2, frequency1+frequency2)
>>># ratings is the source RDD 1
>>> sum_and_count = ratings.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])) 2
>>> sum_and_count.count()
247753
>>> sum_and_count.take(3)
[
 (u'145757', (148.0, 50)),
 (u'244330', (36.0, 17)),
 (u'180162', (1882.0, 489))
]
1

El RDD fuente (ratings) es unRDD[(String, (Float, Integer))].

2

El RDD de destino (sum_and_count) es unRDD[(String, (Float, Integer))]. Observa que los tipos de datos del origen y del destino son iguales.

Paso 3: Encontrar la valoración media

Divide sum_of_ratings por number_of_ratings para hallar la valoración media por usuario:

>>># x refers to (sum_of_ratings, number_of_ratings)
>>># x = (x[0] = sum_of_ratings, x[1] = number_of_ratings)
>>># avg = sum_of_ratings / number_of_ratings = x[0] / x[1]
>>> avgRating = sum_and_count.mapValues(lambda x : x[0] / x[1])
>>> avgRating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.1176470588235294),
 (u'180162', 3.8486707566462166)
]

Solución PySpark completa utilizando combineByKey()

combineByKey() es una versión más general y ampliada de reduceByKey() en la que el tipo del resultado puede ser distinto del tipo de los valores que se agregan. Ésta es una limitación de reduceByKey(); significa que, dado lo siguiente:

# let rdd represent (key, value) pairs
# where value is of type T
rdd2 = rdd.reduceByKey(lambda x, y: func(x,y))

func(x,y) debe crear un valor de tipo T.

La transformación combineByKey() es una optimización que agrega valores para una clave determinada antes de enviar los valores agregados de la partición al reductor designado. Esta agregación se realiza en cada partición, y luego los valores de todas las particiones se fusionan en un único valor. Así, al igual que en reduceByKey(), cada partición emite como máximo un valor por cada clave para enviar a través de la red, lo que acelera el paso de barajar. Sin embargo, a diferencia de reduceByKey(), el tipo del valor combinado (resultado) no tiene por qué coincidir con el tipo del valor original.

Para un conjunto dado de pares (K, V), combineByKey()tiene la siguiente firma (esta transformación tiene muchas versiones diferentes; ésta es la forma más sencilla):

combineByKey(create_combiner, merge_value, merge_combiners)
combineByKey : RDD[(K, V)] --> RDD[(K, C)]

V and C can be different data types.

Se trata de una función genérica para combinar los elementos de cada clave utilizando un conjunto personalizado de funciones de agregación. Convierte un RDD[(K, V)] en un resultado de tipoRDD[(K, C)], donde C es un tipo combinado. Puede ser un tipo de datos simple como Integer o String, o puede ser una estructura de datos compuesta como un par (clave, valor), un triplete (x, y, z), o cualquier otra cosa que desees. Esta flexibilidad, hace de combineByKey()un reductor muy potente.

Como ya se ha comentado en este capítulo, dado un RDD fuente RDD[(K, V)], tenemos que proporcionar tres funciones básicas:

create_combiner: (V) -> C
merge_value: (C, V) -> C
merge_combiners: (C, C) -> C

Para evitar la asignación de memoria, tanto merge_valuecomo merge_combiners pueden modificar y devolver su primer argumento en lugar de crear un nuevo C (esto evita crear nuevos objetos, lo que puede ser costoso si tienes muchos datos).

Además, los usuarios pueden controlar (proporcionando parámetros adicionales) la partición del RDD de salida, el serializador que se utiliza para la mezcla y si se realiza la agregación del lado del mapa (es decir, si un mapeador puede producir varios elementos con la misma clave). Así pues, la transformación combineByKey()proporciona bastante flexibilidad, pero es un poco más compleja de utilizar que otras transformaciones de reducción.

Veamos cómo podemos utilizar combineByKey() para resolver el problema de las películas.

Paso 1: Leer datos y crear pares

Como en las soluciones anteriores, el primer paso es leer los datos y crear pares (clave, valor) donde la clave es un userID y el valor es un rating:

>>># spark: SparkSession
>>># create and return a pair of (userID, rating)
>>> def create_pair(rating_record):
...     tokens = rating_record.split(",")
...     return (tokens[0], float(tokens[2]))
...
>>> key_value_test = create_pair("3,2394,4.0,920586920")
>>> print key_value_test
('3', 4.0)

>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>> rdd = spark.sparkContext.textFile(ratings_path) 1
>>> rdd.count()
22884377
>>> ratings = rdd.map(lambda rec : create_pair(rec)) 2
>>> ratings.count()
22884377
>>> ratings.take(3)
[
 (u'1', 2.5),
 (u'1', 3.0),
 (u'1', 5.0)
]
1

rdd es un RDD[String].

2

ratings es un RDD[(String, Float)].

Paso 2: Utiliza combineByKey() para sumar las valoraciones

Una vez creados los pares (userID, rating), podemos aplicar la transformación combineByKey()para sumar todas las valoraciones y el número de valoraciones de cada usuario. El resultado de este paso serán los pares (userID, (sum_of_ratings, number_of_ratings)):

>>># v is a rating from (userID, rating)
>>># C represents (sum_of_ratings, number_of_ratings)
>>># C[0] denotes sum_of_ratings
>>># C[1] denotes number_of_ratings
>>># ratings: source RDD  1
>>> sum_count = ratings.combineByKey( 2
          (lambda v: (v, 1)), 3
          (lambda C,v: (C[0]+v, C[1]+1)), 4
          (lambda C1,C2: (C1[0]+C2[0], C1[1]+C2[1])) 5
    )
>>> sum_count.count()
247753
>>> sum_count.take(3)
[
 (u'145757', (148.0, 50)),
 (u'244330', (36.0, 17)),
 (u'180162', (1882.0, 489))
]
1

El RDD de origen es un RDD[(String, Float)].

2

El RDD objetivo es un RDD[(String, (Float, Integer))].

3

Esto convierte un V (un único valor) en un C como (V, 1).

4

Esto fusiona un V (clasificación) en un Ccomo (sum, count).

5

Esto combina dos Cs en un único C.

Paso 3: Encontrar la valoración media

Divide sum_of_ratings por number_of_ratings para hallar la valoración media por usuario:

>>># x = (sum_of_ratings, number_of_ratings)
>>># x[0] = sum_of_ratings
>>># x[1] = number_of_ratings
>>># avg = sum_of_ratings / number_of_ratings
>>> average_rating = sum_count.mapValues(lambda x:(x[0] / x[1]))
>>> average_rating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.1176470588235294),
 (u'180162', 3.8486707566462166)
]

A continuación, examinaremos el paso de barajar en las transformaciones de reducción de Spark.

El paso aleatorio en las reducciones

Una vez que todos los mapeadores han terminado de emitir pares (clave, valor), se produce la magia de MapReduce: el paso ordenar y barajar. Este paso agrupa (ordena) la salida de la fase de mapa por claves y envía los resultados al reductor o reductores. Desde el punto de vista de la eficiencia y la escalabilidad, es diferente para las distintas transformaciones.

La idea de ordenar por claves ya debería resultarte familiar, así que aquí me centraré en la ordenación aleatoria. En pocas palabras, barajar es el proceso de redistribuir datos entre particiones. Puede o no hacer que los datos se muevan entre procesos de la JVM, o incluso por cable (entre ejecutores de servidores distintos).

Explicaré el concepto de barajar con un ejemplo. Imagina que tienes un clúster Spark de 100 nodos. Cada nodo tiene registros que contienen datos sobre la frecuencia de visitas a URL, y quieres calcular la frecuencia total por URL. Como ya sabrás, puedes conseguirlo leyendo los datos y creando pares (clave, valor), donde la clave es un URL y el valor es un frequency, y luego sumando las frecuencias de cada URL. Pero si los datos están repartidos por todo el clúster, ¿cómo puedes sumar los valores de la misma clave almacenados en distintos servidores? La única forma de hacerlo es poner todos los valores de la misma clave en el mismo servidor; entonces podrás sumarlos fácilmente. Este proceso se llama barajar.

Hay muchas transformaciones (comoreduceByKey() y join()) que requieren barajar los datos en el clúster, pero puede ser una operación costosa. Barajar los datos para groupByKey() es diferente de barajar los datos para reduceByKey(), y esta diferencia afecta al rendimiento de cada transformación. Por tanto, es muy importante seleccionar y utilizar adecuadamente las transformaciones de reducción.

Considera la siguiente solución PySpark a un sencillo problema de recuento de palabras:

# spark: SparkSession
# We use 5 partitions for textFile(), flatMap(), and map()
# We use 3 partitions for the reduceByKey() reduction
rdd = spark.sparkContext.textFile("input.txt", 5)\
   .flatMap(lambda line: line.split(" "))\
   .map(lambda word: (word, 1))\
   .reduceByKey(lambda a, b: a + b, 3)\ 1
   .collect()
1

3 es el número de particiones.

Como hemos dirigido la transformación reduceByKey()para crear tres particiones, el RDD resultante se particionará en tres trozos, como se muestra en la Figura 4-7. Las operaciones RDD se compilan en un grafo acíclico dirigido de objetos RDD, donde cada RDD mantiene un puntero al padre o padres de los que depende. Como muestra esta figura, en los límites de la barajada, el DAG se divide enetapas (Etapa 1, Etapa 2, etc.) que se ejecutan en orden.

daws 0407
Figura 4-7. El concepto de barajar de Spark

Dado que barajar implica copiar datos entre ejecutores y servidores, se trata de una operación compleja y costosa. Veamos más detenidamente cómo funciona para dos transformaciones de reducción de Spark, groupByKey() y reduceByKey(). Esto ayudará a ilustrar la importancia de elegir la reducción adecuada.

Paso aleatorio para groupByKey()

El paso de barajar groupByKey() es bastante sencillo. No fusiona los valores de cada clave, sino que baraja directamente. Esto significa que se envía un gran volumen de datos a cada partición, porque no se reducen los valores iniciales de los datos. La fusión de los valores de cada clave se produce después del paso de barajar. CongroupByKey(), es necesario almacenar muchos datos en los nodos trabajadores finales (reductores), lo que significa que puedes encontrarte con errores OOM si hay muchos datos por clave.La Figura 4-8 ilustra el proceso. Observa que después de groupByKey(), necesitas llamar a mapValues()para generar la salida final deseada.

daws 0408
Figura 4-8. Paso de barajar para groupByKey()

Como groupByKey() no fusiona ni combina valores, es una operación cara que requiere mover grandes cantidades de datos por la red.

Paso aleatorio para reduceByKey()

Con reduceByKey(), los datos de cada partición se combinan de modo que haya como máximo un valor para cada clave en cada partición. A continuación se barajan los datos y se envían por la red a los reductores, como se ilustra en la Figura 4-9. Observa que conreduceByKey(), no necesitas llamar a mapValues() para generar la salida final deseada. En general, es equivalente a utilizar groupByKey() y mapValues(), pero debido a la reducción de la cantidad de datos enviados por la red, es una solución mucho más eficiente y eficaz.

daws 0409
Figura 4-9. Paso de barajar para reduceByKey()

Resumen

En este capítulo se han introducido las transformaciones de reducción de Spark y se han presentado múltiples soluciones a un problema de datos del mundo real con las más utilizadas de estas transformaciones: reduceByKey(),aggregateByKey(), combineByKey(), y groupByKey(). Como has visto, hay muchas formas de resolver el mismo problema de datos, pero no todas tienen el mismo rendimiento.

La Tabla 4-2 resume los tipos de transformaciones que realizan estas cuatro transformaciones de reducción (ten en cuenta que Vy C pueden ser tipos de datos diferentes).

Tabla 4-2. Comparación de las reducciones de chispa
Reducción Fuente RDD RDD objetivo

reduceByKey()

RDD[(K, V)]

RDD[(K, V)]

groupByKey()

RDD[(K, V)]

RDD[(K, [V])]

aggregateByKey()

RDD[(K, V)]

RDD[(K, C)]

combineByKey()

RDD[(K, V)]

RDD[(K, C)]

Hemos aprendido que algunas de las transformaciones de reducción (como reduceByKey() y combineByKey()) son preferibles a groupByKey(), debido a que el paso de barajar para groupByKey() es más caro. Cuando sea posible, deberías utilizar reduceByKey() en lugar de groupByKey(), o utilizar combineByKey() cuando estés combinando elementos pero tu tipo de retorno difiera del tipo de valor de entrada. En general, para grandes volúmenes de datos, reduceByKey() ycombineByKey() funcionarán y escalarán mejor que groupByKey().

La transformación aggregateByKey()es más adecuada para las agregaciones por clave que implican cálculos, como hallar la suma, la media, la varianza, etc. La consideración importante aquí es que el cómputo extra empleado para la combinación del lado del mapa puede reducir la cantidad de datos enviados a otros nodos trabajadores y al controlador.

En el próximo capítulo pasaremos a tratar la partición de datos.

Get Algoritmos de datos con Spark now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.