Exemplos de Hadoop e Mapreduce: crie o primeiro programa em Java

Neste tutorial, vocรช aprenderรก a usar o Hadoop com exemplos de MapReduce. Os dados de entrada usados โ€‹โ€‹sรฃo VendasJan2009.csv. Ele contรฉm informaรงรตes relacionadas a vendas, como nome do produto, preรงo, forma de pagamento, cidade, paรญs do cliente, etc. Descubra o nรบmero de produtos vendidos em cada paรญs.

Primeiro programa Hadoop MapReduce

Agora neste Tutorial MapReduce, criaremos nosso primeiro Java Programa MapReduce:

Primeiro programa Hadoop MapReduce

Dados de VendasJan2009

Certifique-se de ter o Hadoop instalado. Antes de iniciar o processo real, altere o usuรกrio para 'hduser' (id usado durante a configuraรงรฃo do Hadoop, vocรช pode mudar para o ID do usuรกrio usado durante a configuraรงรฃo de programaรงรฃo do Hadoop).

su - hduser_

Primeiro programa Hadoop MapReduce

Passo 1)

Crie um novo diretรณrio com nome Tutorial MapReduce como mostrado no exemplo MapReduce abaixo

sudo mkdir MapReduceTutorial

Primeiro programa Hadoop MapReduce

Dรช permissรตes

sudo chmod -R 777 MapReduceTutorial

Primeiro programa Hadoop MapReduce

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);

	public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

		String valueString = value.toString();
		String[] SingleCountryData = valueString.split(",");
		output.collect(new Text(SingleCountryData[7]), one);
	}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
		Text key = t_key;
		int frequencyForCountry = 0;
		while (values.hasNext()) {
			// replace type of value with the actual type of our value
			IntWritable value = (IntWritable) values.next();
			frequencyForCountry += value.get();
			
		}
		output.collect(key, new IntWritable(frequencyForCountry));
	}
}

SalesCountryDriver.java

package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        // Create a configuration object for the job
        JobConf job_conf = new JobConf(SalesCountryDriver.class);

        // Set a name of the Job
        job_conf.setJobName("SalePerCountry");

        // Specify data type of output key and value
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);

        // Specify names of Mapper and Reducer Class
        job_conf.setMapperClass(SalesCountry.SalesMapper.class);
        job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

        // Specify formats of the data type of Input and output
        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        // Set input and output directories using command line arguments, 
        //arg[0] = name of input directory on HDFS, and arg[1] =  name of output directory to be created to store the output file.

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            // Run the job 
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Baixe os arquivos aqui

Primeiro programa Hadoop MapReduce

Verifique as permissรตes de arquivo de todos esses arquivos

Primeiro programa Hadoop MapReduce

e se as permissรตes de 'leitura' estiverem faltando, conceda o mesmo-

Primeiro programa Hadoop MapReduce

Passo 2)

Exporte o caminho de classe conforme mostrado no exemplo do Hadoop abaixo

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Primeiro programa Hadoop MapReduce

Passo 3)

Compilar Java arquivos (esses arquivos estรฃo presentes no diretรณrio Final-MapReduceHandsOn). Seus arquivos de classe serรฃo colocados no diretรณrio do pacote

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Primeiro programa Hadoop MapReduce

Este aviso pode ser ignorado com seguranรงa.

Esta compilaรงรฃo criarรก um diretรณrio em um diretรณrio atual nomeado com o nome do pacote especificado no arquivo fonte java (ou seja, Paรญs de Vendas no nosso caso) e coloque todos os arquivos de classe compilados nele.

Primeiro programa Hadoop MapReduce

Passo 4)

Crie um novo arquivo Manifesto.txt

sudo gedit Manifest.txt

adicione as seguintes linhas a ele,

Main-Class: SalesCountry.SalesCountryDriver

Primeiro programa Hadoop MapReduce

SalesCountry.SalesCountryDriver รฉ o nome da classe principal. Observe que vocรช deve pressionar a tecla Enter no final desta linha.

Passo 5)

Crie um arquivo Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Primeiro programa Hadoop MapReduce

Verifique se o arquivo jar foi criado

Primeiro programa Hadoop MapReduce

Passo 6)

Inicie o Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Passo 7)

Copie o arquivo VendasJan2009.csv para dentro ~/inputMapReduce

Agora use o comando abaixo para copiar ~/inputMapReduce para HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Primeiro programa Hadoop MapReduce

Podemos ignorar este aviso com seguranรงa.

Verifique se um arquivo foi realmente copiado ou nรฃo.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Primeiro programa Hadoop MapReduce

Passo 8)

Execute o trabalho MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Primeiro programa Hadoop MapReduce

Isto criarรก um diretรณrio de saรญda chamado mapreduce_output_sales em HDFS. O conteรบdo deste diretรณrio serรก um arquivo contendo as vendas de produtos por paรญs.

Passo 9)

O resultado pode ser visto atravรฉs da interface de comando como,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Primeiro programa Hadoop MapReduce

Os resultados tambรฉm podem ser vistos atravรฉs de uma interface web como-

Abra r em um navegador da web.

Primeiro programa Hadoop MapReduce

Agora selecione 'Navegar no sistema de arquivos' e navegue atรฉ /mapreduce_output_sales

Primeiro programa Hadoop MapReduce

Abra parte-r-00000

Primeiro programa Hadoop MapReduce

Explicaรงรฃo da classe SalesMapper

Nesta seรงรฃo, entenderemos a implementaรงรฃo de SalesMapper classe.

1. Comeรงamos especificando um nome de pacote para nossa classe. Paรญs de Vendas รฉ o nome do nosso pacote. Observe que a saรญda da compilaรงรฃo, SalesMapper.class irรก para um diretรณrio nomeado por este nome de pacote: Paรญs de Vendas.

Em seguida, importamos pacotes de biblioteca.

O instantรขneo abaixo mostra uma implementaรงรฃo de SalesMapper aula-

Explicaรงรฃo da classe SalesMapper

Explicaรงรฃo do cรณdigo de exemplo:

1. Definiรงรฃo de classe SalesMapper-

classe pรบblica SalesMapper estende MapReduceBase implementa Mapper {

Cada classe de mapeador deve ser estendida de MapReduceBase classe e deve implementar Mapper interface.

2. Definindo a funรงรฃo 'mapa'-

public void map(LongWritable key,
         Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException

A parte principal da classe Mapper รฉ um 'mapa()' mรฉtodo que aceita quatro argumentos.

A cada chamada para 'mapa()' mรฉtodo, um valor chave par ('chave' e 'valor' neste cรณdigo) รฉ passado.

'mapa()' O mรฉtodo comeรงa dividindo o texto de entrada que รฉ recebido como argumento. Ele usa o tokenizer para dividir essas linhas em palavras.

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");

Aqui, ',' รฉ usado como delimitador.

Depois disso, um par รฉ formado usando um registro no 7ยบ รญndice do array 'Dados de Paรญs รšnico' e um valor '1'.

output.collect(novo Texto(SingleCountryData[7]), um);

Estamos escolhendo o registro no 7ยบ รญndice porque precisamos Paรญs dados e estรก localizado no 7ยบ รญndice da matriz 'Dados de Paรญs รšnico'.

Observe que nossos dados de entrada estรฃo no formato abaixo (onde Paรญs รฉ ร s 7th รญndice, com 0 como รญndice inicial)-

Data_da_transaรงรฃo,Produto,Preรงo,Tipo_de_pagamento,Nome,Cidade,Estado,Paรญs,Conta_criada,รšltimo_login,Latitude,Longitude

Uma saรญda do mapper รฉ novamente um valor chave par que รฉ emitido usando 'coletar ()' mรฉtodo de 'Coletor de saรญda'.

Explicaรงรฃo da classe SalesCountryReducer

Nesta seรงรฃo, entenderemos a implementaรงรฃo de SalesCountryReducer classe.

1. Comeรงamos especificando o nome do pacote para nossa classe. Paรญs de Vendas รฉ o nome do nosso pacote. Observe que a saรญda da compilaรงรฃo, SalesCountryReducer.class irรก para um diretรณrio nomeado por este nome de pacote: Paรญs de Vendas.

Em seguida, importamos pacotes de biblioteca.

O instantรขneo abaixo mostra uma implementaรงรฃo de SalesCountryReducer aula-

Explicaรงรฃo da classe SalesCountryReducer

Explicaรงรฃo do cรณdigo:

1. Definiรงรฃo da classe SalesCountryReducer-

classe pรบblica SalesCountryReducer estende MapReduceBase implementa Redutor {

Aqui, os dois primeiros tipos de dados, 'Texto' e 'Intgravรกvel' sรฃo tipos de dados de valor-chave de entrada para o redutor.

A saรญda do mapeador estรก na forma de , . Esta saรญda do mapeador torna-se uma entrada para o redutor. Entรฃo, para alinhar com seu tipo de dados, Texto e Intgravรกvel sรฃo usados โ€‹โ€‹โ€‹โ€‹como tipo de dados aqui.

Os dois รบltimos tipos de dados, 'Texto' e 'IntWritable' sรฃo tipos de dados de saรญda gerados pelo redutor na forma de par chave-valor.

Toda classe redutora deve ser estendida de MapReduceBase classe e deve implementar Redutor interface.

2. Definindo a funรงรฃo 'reduzir'-

public void reduce( Text t_key,
             Iterator<IntWritable> values,                           
             OutputCollector<Text,IntWritable> output,
             Reporter reporter) throws IOException {

Uma entrada para o reduzir() mรฉtodo รฉ uma chave com uma lista de vรกrios valores.

Por exemplo, no nosso caso, serรก-

, , , , , .

Isso รฉ dado ao redutor como

Portanto, para aceitar argumentos desta forma, os primeiros dois tipos de dados sรฃo usados, a saber, Texto e Iterador. Texto รฉ um tipo de dados de chave e Iterador รฉ um tipo de dados para a lista de valores dessa chave.

O prรณximo argumento รฉ do tipo Coletor de saรญda que coleta a saรญda da fase redutora.

reduzir() O mรฉtodo comeรงa copiando o valor da chave e inicializando a contagem de frequรชncia como 0.

Chave de texto = t_key;
int frequรชnciaForCountry = 0;

Entรฃo, usando 'enquanto' loop, iteramos pela lista de valores associados ร  chave e calculamos a frequรชncia final somando todos os valores.

 while (values.hasNext()) {
            // replace type of value with the actual type of our value
            IntWritable value = (IntWritable) values.next();
            frequencyForCountry += value.get();
            
        }

Agora, enviamos o resultado para o coletor de saรญda na forma de chave e obtido contagem de freqรผรชncia.

O cรณdigo abaixo faz isso-

output.collect(key, new IntWritable(frequencyForCountry));

Explicaรงรฃo da classe SalesCountryDriver

Nesta seรงรฃo, entenderemos a implementaรงรฃo de SalesCountryDriver classe

1. Comeรงamos especificando um nome de pacote para nossa classe. Paรญs de Vendas รฉ o nome do nosso pacote. Observe que a saรญda da compilaรงรฃo, SalesCountryDriver.class irรก para o diretรณrio nomeado por este nome de pacote: Paรญs de Vendas.

Aqui estรก uma linha especificando o nome do pacote seguido do cรณdigo para importar pacotes de biblioteca.

Explicaรงรฃo da classe SalesCountryDriver

2. Defina uma classe de driver que criarรก um novo trabalho de cliente, objeto de configuraรงรฃo e anunciarรก classes Mapeadora e Redutora.

A classe driver รฉ responsรกvel por configurar nosso trabalho MapReduce para ser executado em Hadoop. Nesta classe, especificamos nome do trabalho, tipo de dados de entrada/saรญda e nomes das classes mapeadoras e redutoras.

Explicaรงรฃo da classe SalesCountryDriver

3. No trecho de cรณdigo abaixo, definimos diretรณrios de entrada e saรญda que sรฃo usados โ€‹โ€‹para consumir o conjunto de dados de entrada e produzir saรญda, respectivamente.

arg [0] e arg [1] sรฃo os argumentos de linha de comando passados โ€‹โ€‹โ€‹โ€‹com um comando fornecido no MapReduce prรกtico, ou seja,

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Explicaรงรฃo da classe SalesCountryDriver

4. Acione nosso trabalho

Abaixo do cรณdigo inicia a execuรงรฃo do trabalho MapReduce-

try {
    // Run the job 
    JobClient.runJob(job_conf);
} catch (Exception e) {
    e.printStackTrace();
}

Resuma esta postagem com: