MapReduce-Join多种应用

Join多种应用

Reduce Join

Map端的主要工作:为来自不同表或文件的key/value对,`打标签以区别不同来源的记录`。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些`来源于不同文件的记录(在Map阶段已经打标志)分开`,最后进行合并就ok了。

Reduce Join案例实操

1.写入数据

order.txt
1001	01	1
1002	02	2
1003	03	3
1004	01	4
1005	02	5
1006	03	6
id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pd.txt
01	小米
02	华为
03	格力
pid pname
01 小米
02 华为
03 格力

2.输出数据

id pname amount
1001 小米 1
1004 小米 4
1002 华为 2
1005 华为 5
1003 格力 3
1006 格力 6

3.需求分析

![(pho\reducejoin案例需求分析.png)

4.代码实现

Bean类
package com.saddam.bigdata.ShangGuiGu.Join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {

    private String order_id; // 订单id
    private String p_id;      // 产品id
    private int amount;       // 产品数量
    private String pname;     // 产品名称
    private String flag;      // 表的标记

    public TableBean() {
        super();
    }

    public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
        this.order_id = order_id;
        this.p_id = p_id;
        this.amount = amount;
        this.pname = pname;
        this.flag = flag;
    }

    public String getOrder_id() {
        return order_id;
    }

    public void setOrder_id(String order_id) {
        this.order_id = order_id;
    }

    public String getP_id() {
        return p_id;
    }

    public void setP_id(String p_id) {
        this.p_id = p_id;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public String toString() {
        return order_id+"\t"+amount+"\t"+pname;
    }

    @Override
    public void write(DataOutput out) throws IOException {
            out.writeUTF(order_id);
            out.writeUTF(p_id);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
            order_id=in.readUTF();
            p_id=in.readUTF();
            amount=in.readInt();
            pname=in.readUTF();
            flag=in.readUTF();
    }
}
Mapper类
package com.saddam.bigdata.ShangGuiGu.Join;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {
    String name;
    Text outK=new Text();
    TableBean tableBean=new TableBean();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit fileSplit=(FileSplit)context.getInputSplit();
         name = fileSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取一行数据
        String line = value.toString();

        //分文件处理
        if (name.startsWith("order")){//order.txt
            /*
            1001	01	1
            1002	02	2
             */
            String[] fields = line.split("\t");

            /*
            private String order_id; // 订单id
            private String p_id;      // 产品id
            private int amount;       // 产品数量
            private String pname;     // 产品名称
            private String flag;      // 表的标记
             */
            //封装对象
            tableBean.setOrder_id(fields[0]);
            tableBean.setP_id(fields[1]);
            tableBean.setAmount(Integer.parseInt(fields[2]));
            tableBean.setPname("");
            tableBean.setFlag("order");
            //p_id作为key  两个文件都有
            outK.set(fields[1]);
        }else {//pd.txt
            /*
            01	小米
            02	华为
            03	格力
             */
            String[] fields=line.split("\t");
            tableBean.setOrder_id("");
            tableBean.setP_id(fields[0]);
            tableBean.setAmount(0);
            tableBean.setPname(fields[1]);
            tableBean.setFlag("pd");

            outK.set(fields[0]);
        }
        context.write(outK,tableBean);
    }
}
Reducer类
package com.saddam.bigdata.ShangGuiGu.Join;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        // 1准备存储订单的集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        // 2 准备bean对象
        TableBean pdBean=new TableBean();

        for(TableBean bean:values){
            if ("order".equals(bean.getFlag())){
                TableBean orderBean=new TableBean();

                try {
                    BeanUtils.copyProperties(orderBean, bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                orderBeans.add(orderBean);
            }else {
                try {
                    BeanUtils.copyProperties(pdBean, bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
         3 表的拼接
        for(TableBean bean:orderBeans){

            bean.setPname (pdBean.getPname());

            // 4 数据写出去
            context.write(bean, NullWritable.get());
    }
}
}
Driver类
package com.saddam.bigdata.ShangGuiGu.Join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

public class TableDriver{
    public static void main(String[] args)throws Exception {
        BasicConfigurator.configure();
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(TableDriver.class);

        // 3 指定本业务job要使用的Mapper/Reducer业务类
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        // 4 指定Mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text .class);
        job.setMapOutputValueClass(TableBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable .class);

        // 6 指定job的输入原始文件所在目录
        String inputPath1 ="D:\\MR\\MapReduce\\InputDatas\\Reduce Join\\order";
        String inputPath2 ="D:\\MR\\MapReduce\\InputDatas\\Reduce Join\\pd";
        String outputPath ="D:\\MR\\MapReduce\\OutputDatas\\output_join";
        FileInputFormat.setInputPaths(job,new Path(inputPath1),new Path(inputPath2));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

-----------------------------------------

Map Join

1.使用场景

Map Join适用于一张表十分小、一张表很大的场景。

2.优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3.具体办法

采用DistributedCache

	(1)在Mapper的setup阶段,将文件读取到缓存集合中。

	(2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。

job.addCacheFile(new URI("file://e:/cache/pd.txt"));

Map Join案例实操

相关推荐

  1. MapReduce-Join多种应用

    2024-03-10 16:30:02       20 阅读
  2. MapReduce

    2024-03-10 16:30:02       25 阅读
  3. MapReduce

    2024-03-10 16:30:02       24 阅读
  4. MapReduce

    2024-03-10 16:30:02       8 阅读
  5. <span style='color:red;'>MapReduce</span>

    MapReduce

    2024-03-10 16:30:02      6 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-10 16:30:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-10 16:30:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-10 16:30:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-10 16:30:02       18 阅读

热门阅读

  1. MySQL中UNION和UNION ALL的区别

    2024-03-10 16:30:02       18 阅读
  2. 云计算高级课程作业

    2024-03-10 16:30:02       20 阅读
  3. CentOS 8使用笔记

    2024-03-10 16:30:02       18 阅读
  4. CCF-CSP真题201403-2《窗口》(结构体+数组)

    2024-03-10 16:30:02       17 阅读
  5. misc40

    misc40

    2024-03-10 16:30:02      18 阅读
  6. python实现回溯算法

    2024-03-10 16:30:02       21 阅读
  7. Svelte之基础知识一

    2024-03-10 16:30:02       24 阅读
  8. 读书·基于RISC-V和FPGA的嵌入式系统设计·第3章

    2024-03-10 16:30:02       21 阅读
  9. pytorch升级打怪(一)

    2024-03-10 16:30:02       22 阅读