MapReduce con apache hadoop

Introducción

En esta entrada (y como continuación a Apache Hadoop) se empleará Apache Hadoop para realizar procesamiento de grandes cantidades de datos en sistemas de archivos distribuidos usando Spring y MapReduce.

Descripción

MapReduce es un framework (modelo de programación) utilizado para dar soporte a la computación paralela sobre grandes colecciones de datos en grupos de computadoras; es por esta razón por la que este framework suele ejecutarse en sistema de archivos distribuidos (HDFS).

Función Map()

Map toma uno de estos pares de datos con un tipo en un dominio de datos, y devuelve una lista de pares en un dominio diferente:

Map(k1,v1) -> list(k2,v2)

La función map(): se encarga del mapeo y es aplicada en paralelo para cada ítem en la entrada de datos. Esto produce una lista de pares (k2,v2) por cada llamada.

Función Reduce()

La función reduce es aplicada en paralelo para cada grupo, produciendo una colección de valores para cada dominio:

Reduce(k2, list (v2)) -> list(v3)

La función reduce(): cada llamada a Reduce típicamente produce un valor v3 o una llamada vacía, aunque una llamada puede retornar más de un valor.

Y a grandes rasgos, MapReduce transforma una lista de pares (clave, valor) en una lista de valores, los cuales serán procesados en paralelo dentro del sistema distribuidor; esta ejecución distribuida es administrada por el framework (en este caso hadoop), determinando que nodo puede ejecutar que proceso y en que momento, y este administrador se mantiene a la espera hasta que todos los nodos terminen sus ejecuciones para poder "unir" los resultados.

Vamos al ejemplo

Para mostrar la manera en que se puede delegar el proceso de grandes cantidades de datos a Hadoop, escribiremos código para procesar un archivo de textos (un libro en formato texto - o varios incluso -) , contara las palabras y nos entregará un resumen en el que se nos indicarán las palabras que contiene el archivo de texto ( o los archivos ), asi como la cantidad de veces que aparecen en este archivo (o los archivos); esto es equivalente a SELECT palabra, count(*) FROM tabla GROUP BY palabra en SQL.

Para esta tarea debemos escribir un Mapper y un Reducer:

- Mapper (SpringMapper.groovy)

package com.demos

import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Mapper

class SpringMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text()

    @Override
    protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
        def line = value.toString()
        def lineTokenizer = new StringTokenizer(line)

        while (lineTokenizer.hasMoreTokens()) {
            def cleaned = removeNonLettersOrNumbers(lineTokenizer.nextToken())
            word.set(cleaned)
            context.write(word, new IntWritable(1))
        }
    }

    /**
     * Reemplaza todos los caracteres UNICODE que no son letras ni numeros.
     */

    private String removeNonLettersOrNumbers(String original) {
       original.replaceAll("[^\\p{L}\\p{N}]", "")
    }
}

Este mapper agrupara cada palabra en un "mapa" similar a Map y ya, el resto del mapping lo hará el framework.

- Reducer

package com.demos

import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Reducer
import org.apache.commons.logging.LogFactory

class SpringReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer.Context context) {
            int wordCount = 0
            for (IntWritable value: values) {
                wordCount += value.get()
            }
            context.write(key, new IntWritable(wordCount))
    }
}

Finalmente, el reducer se encarga de hacer el group by, obteniendo del iterador (que contiene ya todas las palabras generadas por el mapper) el count por cada palabra e incrementando en uno este contador.

Más detalles ...

Ahora, como funciona esto a mas detalle ? :

La función map() se ejecuta de forma distribuida a lo largo de varias máquinas. Los datos de entrada, procedentes por regla general de un gran archivo, se dividen en un conjunto de M particiones de entrada de generalmente 16 megabytes. Estas particiones pueden ser procesadas en diversas máquinas. En una invocación de MapReduce suelen ocurrir varias operaciones:

  • Se procede a dividir las entradas en en M particiones de tamaño aproximado de 64 megabytes. El programa MapReduce se comienza a instanciar en las diversas máquinas del cluster. Por regla general, el número de instancias se configura en las aplicaciones.
  • Una de las copias del programa es especial y toma el papel de "maestro". El resto de copias se denominan "workers" y reciben la asignación de sus tareas desde el master. Se considera que existen una cantidad de M map() tareas y de R reduce(). El "maestro" se encarga de recopilar "workers" en reposo (es decir sin tarea asignada) y le asignará una tarea específica de map() o de reduce(). Un worker sólo puede tener tres estados: reposo, trabajando, completo.
  • Un worker que tenga asignada una tarea específica de map() tomará como entrada la partición que le corresponda. Se dedicará a parsear los pares (clave, valor) para crear una nueva pareja de salida, tal y como se especifica en su programación. Los pares clave y valor producidos por la función map() se almacenan como buffer en la memoria.
  • Periodicamente, los pares clave-valor almacenados en el buffer se escriben en el disco local, repartidos en R regiones. Las regiones de estos pares clave-valor son pasados al master, que es responsable de redirigir a los "workers" que tienen tareas de reduce().
  • Cuando un worker de tipo reduce es notificado por el "maestro" con la localización de una partición, éste emplea llamadas remotas para hacer lecturas de la información almacenada en los discos duros de los diversos workers de tipo map(). cuando un worker de tipo reduce() lee todos los datos intermedios, ordena las claves de tal suerte que a se agrupan los datos encontrados que poseen la misma clave. El ordenamiento es necesario debido a que, por regla general, muchas claves de funciones map() diversas pueden ir a una misma función reduce(). En aquellos casos en los que la cantidad de datos intermedios sean muy grandes, se suele emplear un ordenamiento externo.
  • El worker de tipo reduce() itera sobre el conjunto de valores ordenados intermedios, y lo hace por cada una de las claves únicas encontradas. Toma la clave y el conjunto de valores asociados a ella y se los pasa a la función reduce(). La salida de reduce() se añade al archivo de salida de MapReduce.
  • Cuando todas las tareas map() y reduce() se han completado, el "maestro" levanta al programa del usuario. Llegados a este punto la llamada MapReduce retorna el control al código de un usuario.

Se considera que ha habido un final de las tareas cuando este control se ha devuelto al usuario. Las salidas se ditribuyen en un archivo completo, o en su defecto se reparten en R archivos. Estos R archivos pueden ser la entrada de otro MapReduce o puede ser procesado por cualquier otro programa que necesite estos datos.

Finalmente, y para hacer esto mas sencillo, le pediremos a Spring que nos ayude a correr esto en el sistema de archivos distribuido; para ello crearemos el archivo XML que contendrá la definición de beans necesarios :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:hdp="http://www.springframework.org/schema/hadoop"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans <a href="http://www.springframework.org/schema/beans/spring-beans.xsd" title="http://www.springframework.org/schema/beans/spring-beans.xsd">http://www.springframework.org/schema/beans/spring-beans.xsd</a> <a href="http://www.springframework.org/schema/hadoop" title="http://www.springframework.org/schema/hadoop">http://www.springframework.org/schema/hadoop</a> <a href="http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">

" title="http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">

">http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">

</a>    <!-- le indicamos a spring como conectarse a Hadoop -->
    <hdp:configuration>
        fs.default.name=hdfs://localhost:8020
    </hdp:configuration>

    <!-- Definimos nuestro job, indicandole donde estaran los archivos de texto de entrada, donde escribira el resultado y que clases son las encargadas de hacer map y reduce -->
    <hdp:job id="wordCountJob"
             input-path="input"
             output-path="output"
             mapper="com.demos.SpringMapper"
             reducer="com.demos.SpringReducer"/>

    <!-- Le decimos al JobRunner de spring que jobs debe de ejecutar en el HDFS indicado al inicio. -->
    <bean class="org.springframework.data.hadoop.mapreduce.JobRunner">
        <property name="jobs">
            <list>
                <!-- Configures the reference to the actual Hadoop job. -->
                <ref bean="wordCountJob"/>
            </list>
        </property>
    </bean>

</beans>

Finalmente, debemos crear o subir los archivos a procesar en nuestro HDFS; para este ejemplo, podremos usar el "Quijote de la Mancha" (http://goo.gl/dldp9) y "El manco de lepanto" (http://goo.gl/akuV4):

$ hadoop fs -mkdir input
$ hadoop fs -put /tmp/pg27900.txt input
$ hadoop fs -put /tmp/2donq10.txt input
$ hadoop fs -ls input

Found 2 items
-rw-r--r--   1 user supergroup    2143292 2012-11-26 11:55 /user/user/input/2donq10.txt
-rw-r--r--   1 user supergroup     343520 2012-11-26 11:55 /user/user/input/pg27900.txt

Finalmente, desde el IDE podemos correr nuestra clase principal (Main.groovy) :

package com.demos

import org.springframework.context.support.ClassPathXmlApplicationContext
import groovy.json.JsonSlurper

class Main {

    def load() {
        new ClassPathXmlApplicationContext("applicationContext.xml");
    }

    static void main(args) {
        new TwitterLoader().load()
    }

}

Veremos algo asi :

26-nov-2012 11:57:29 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 2
26-nov-2012 11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 0% reduce 0%
26-nov-2012 11:57:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 0%
26-nov-2012 11:57:37 org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
26-nov-2012 11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to output
26-nov-2012 11:57:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 100%
26-nov-2012 11:57:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
26-nov-2012 11:57:37 org.apache.hadoop.mapred.Counters log
INFO:   File Output Format Counters

En esta salida vemos que no tuvimos que indicarle que archivos procesar, mas bien, lee los dos archivos que encontro en el directorio 'input' que creamos; tambien vemos el proceso al hacer Map y el proceso al hacer Reduce.

Para ver el resultado debemos hacer :

$ hadoop fs -cat output/part-r-00000

En este ejemplo no se ve mucho diferencia de hacer un script en perl, sh o groovy, pero si se necesita procesar muchos registros (sin importar si vienen de un archivo de texto, o de una base de datos), se puede procesar en un cluster real hadoop y el procesamiento será notoriamente mas rapido.