Ejemplo MapReduce Hadoop 3

Clase Map

Analizado el texto del cual obtendremos la informacion, vamos a crear la clase map la cual nos agrupara los paises por region, descartando todas las partes del texto que no sean necesarias.

En esta clase map lo que se va a hacer es identificar los paises y los datos necesarios por region:

 

  context.write(new Text(Pais.getRegion()), new Text(Pais.getNombre() + ":" +poblacion));

Se utiliza tambien patterns para identificar los datos necesarios:

 

  final static Pattern PAIS_PATTERN = Pattern.compile("[@][A-Z][a-z]*(,)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?[:][Geography]{9}");
 final static Pattern POBLACION_PATTERN = Pattern.compile("^\\s[Population]{10}[:]{1}\\s*");//Population: 65,780 (July 1995 est.)
 final static Pattern REGION_PATTERN = Pattern.compile("^\\s[Map]{3}\\s[references]{10}[:]*");// Map references: Africa
       

Se utiliza una clase plana para guardar los datos de pais

 

  Pais.nombre = pais;
 Pais.region = region;
       

debemos quitar los datos del mundo para que no se contabilice

Pais.getRegion().regionMatches(0, "World", 0, 5)

Aqui el codigo completo del Map

 

  package mx.com.sinapsis.ds.test.ComparaPais;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 *El map va a sacar una key/value de tipo Long(1)/text ya que queremos que entre al reduce una sola key la cual
 *engloba a pares de valores pais-poblacion para poder buscar la maxima poblacion y a que pais pertenece
 */

public class MaximaPoblacionMapper extends Mapper<LongWritable, Text, Text, Text> {
       
        final static Pattern PAIS_PATTERN = Pattern.compile("[@][A-Z][a-z]*(,)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?(\\s)?([A-Z][a-z]*)?[:][Geography]{9}");
        final static Pattern POBLACION_PATTERN = Pattern.compile("^\\s[Population]{10}[:]{1}\\s*");//Population: 65,780 (July 1995 est.)
        final static Pattern REGION_PATTERN = Pattern.compile("^\\s[Map]{3}\\s[references]{10}[:]*");// Map references: Africa
       
        /**
     *Se procesa linea por linea y se tiene que encontrar el match para el pais el cual se guarda y
     *el total de su poblacion
     */

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        
        Matcher matcherPais = PAIS_PATTERN.matcher(value.toString());
        Matcher matcherPoblacion = POBLACION_PATTERN.matcher(value.toString());
        Matcher matcherRegion = REGION_PATTERN.matcher(value.toString());
       
        //Una vez localizado el patron PAIS_PATTERN significa que las siguientes lineas pertenecen a un pais
        //se procede a localizar el POBLACION_PATTERN para ligarlo al pais
       
        String[] paismatch;
        String[] regionmatch;
        String pais = "";
        String region = "";
        String poblacion = "";
       
        if(matcherPais.find()){
                paismatch = value.toString().split(":");               
                pais = paismatch[0].substring(1,paismatch[0].length());
                System.out.println("pais : " + pais );
                //Se guarda el nombre pais hasta encontrar su poblacion
                Pais.nombre = pais;
        }
       
        if(matcherRegion.find()){
                regionmatch = value.toString().split(":");             
                region = regionmatch[1].trim();
                System.out.println("region : " + region );
                //Se guarda region del pais hasta encontrar su poblacion
                Pais.region = region;
        }
       
        if(matcherPoblacion.find()){
                String values[] = value.toString().split(" ");                                 
                if( values.length > 2 && values[2] != null && !values[2].isEmpty()){
                        poblacion = values[2].trim().replace(",", "");
                }              
        }
       
        if(Pais.getNombre() != null && !Pais.getNombre().equals("") && Pais.getRegion() != null && !Pais.getRegion().equals("")
                        && !Pais.getRegion().regionMatches(0, "World", 0, 5)    && !poblacion.equals("") && NumberUtils.isNumber(poblacion)){
                                context.write(new Text(Pais.getRegion()), new Text(Pais.getNombre() + ":" +poblacion));
        }
       

    }

}

       

estos son los datos que se pintan al ejecutarse el map:

14/04/25 10:45:55 INFO mapred.MapTask: Processing split: file:/home/cloudera/workspace/training/input/CW.txt:0+3013209
14/04/25 10:45:55 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/04/25 10:45:55 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/04/25 10:45:55 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/04/25 10:45:55 INFO mapred.MapTask: soft limit at 83886080
14/04/25 10:45:55 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
14/04/25 10:45:55 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
pais : Afghanistan
region : Asia
pais : Albania
region : Ethnic Groups in Eastern Europe, Europe
pais : Algeria
region : Africa
pais : American Samoa
region : Oceania
pais : Andorra
region : Europe
pais : Angola
region : Africa
pais : Anguilla
region : Central America and the Caribbean
pais : Antarctica
region : Antarctic Region
pais : Antigua And Barbuda
region : Central America and the Caribbean
pais : Arctic Ocean
region : Arctic Region
pais : Argentina
region : South America
pais : Armenia
region : Commonwealth of Independent States - European States
pais : Aruba
region : Central America and the Caribbean

etc....