博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MR案例:多文件输出MultipleOutputs
阅读量:4974 次
发布时间:2019-06-12

本文共 4544 字,大约阅读时间需要 15 分钟。

问题描述:现有 ip-to-hosts.txt 数据文件,文件中每行数据有两个字段:分别是ip地址和该ip地址对应的国家,以'\t'分隔。要求汇总不同国家的IP数,并以国家名为文件名将其输出。

测试数据:ip-to-hosts.txt

18.217.167.70 United States206.96.54.107 United States196.109.151.139 Mauritius174.52.58.113 United States142.111.216.8 Canada

代码实现:

package country;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class Ip2Hosts {    public static void main(String[] args) throws Exception {                 //指定输入输出路径        args =new String[] {"hdfs://10.16.17.182:9000/test/in/ip-to-hosts.txt","hdfs://10.16.17.182:9000/test/out/0821/09"};        System.exit(run(args));    }    public static int run(String[] args) throws Exception {        Job job = Job.getInstance(new Configuration());        job.setJarByClass(Ip2Hosts.class);        job.setMapperClass(IPCountryMapper.class);        job.setReducerClass(IPCountryReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        /**         * 输出 08 和 09 需要调用此设置,07 就需要注释掉         */         MultipleOutputs.addNamedOutput(job,"abc",TextOutputFormat.class,Text.class,IntWritable.class);                //通过此配置可以不再产生默认的空文件【part-*-00000】        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);                    return job.waitForCompletion(true) ? 1 : 0;    }    //map阶段    public static class IPCountryMapper    extends Mapper
{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split("\t"); context.write(new Text(splited[1]), new IntWritable(1)); } } //reduce阶段 public static class IPCountryReducer extends Reducer
{ //1.定义多文件输出类MultipleOutputs private MultipleOutputs
mos; @Override protected void setup(Context context ) throws IOException, InterruptedException { //2.MultipleOutputs初始化 mos = new MultipleOutputs
(context); } @Override protected void reduce(Text key, Iterable
values, Context context ) throws IOException, InterruptedException { int total = 0; for(IntWritable value: values) { total += value.get(); }        //3.调用MultipleOutputs中的write()方法   //07-输出 mos.write(/*"abc",*/ key, new IntWritable(total),key.toString()); //08-输出 mos.write("abc", key, new IntWritable(total)/*,key.toString()*/); //09-输出 mos.write("abc", key, new IntWritable(total),key.toString()); } @Override protected void cleanup(Context context ) throws IOException, InterruptedException { //4.关闭流资源 mos.close(); } }}

代码解读:

1).输出-07所调用的方法和对应的输出结果:

/** * @ 输出的key类型  * @ 输出的value类型 * @ 输出的基路径,实际输出结果为:'基路径-r-00000' */MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)

2).输出-08所调用的方法和对应的输出结果:

/** * @ 自定义的输出.对于不指定'基路径',则结果为:'自定义的输出-r-00000' * @ 输出的key类型 * @ 输出的value类型 */MultipleOutputs.write(String namedOutput, K key, V value)

3).输出-09所调用的方法和对应的输出结果:

/** * @ 自定义的输出. * @ 输出的key类型 * @ 输出的value类型 * @ 输出的基路径,指定输出'基路径',则结果为:'基路径-r-00000' */MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)

用法总结:

  1. 在Mapper或Reducer类中创建 MultipleOutputs 成员变量 mos
  2. setup()方法中初始化 mos 变量,
  3. map()或reduce()方法中调用 mos.write() 方法输出数据,代替context.write()
  4. mos.write() 方法具有三个重载,对于 输出-08-09 还需Job配置中指定输出格式
  5. cleanup()方法中调用 mos.close() 方法关闭输出流

转载于:https://www.cnblogs.com/skyl/p/4732300.html

你可能感兴趣的文章
个人作业-最长英语链
查看>>
JMeter-性能测试之报表设定的注意事项
查看>>
1066-堆排序
查看>>
仿面包旅行个人中心下拉顶部背景放大高斯模糊效果
查看>>
强大的css3
查看>>
c#中的组件拖拽和MouseMove事件
查看>>
C# 小叙 Encoding (二)
查看>>
python创建对象数组避免浅拷贝
查看>>
CSS自学笔记(14):CSS3动画效果
查看>>
项目应用1
查看>>
Ubuntu下配置jdk和tomcat
查看>>
大型网站的演变升级
查看>>
图片延迟加载的实现
查看>>
C# 委托链(多播委托)
查看>>
解密个推持续集成
查看>>
基本SCTP套接字编程常用函数
查看>>
C 编译程序步骤
查看>>
页面抓取匹配时,万恶的\r,\n,\t 要先替换掉为空,出现匹配有问题,都是这个引起的...
查看>>
利用Node.js调用Elasticsearch
查看>>
构造函数
查看>>