问题描述:现有 ip-to-hosts.txt 数据文件,文件中每行数据有两个字段:分别是ip地址和该ip地址对应的国家,以'\t'分隔。要求汇总不同国家的IP数,并以国家名为文件名将其输出。
测试数据:ip-to-hosts.txt
18.217.167.70 United States206.96.54.107 United States196.109.151.139 Mauritius174.52.58.113 United States142.111.216.8 Canada
代码实现:
package country;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class Ip2Hosts { public static void main(String[] args) throws Exception { //指定输入输出路径 args =new String[] {"hdfs://10.16.17.182:9000/test/in/ip-to-hosts.txt","hdfs://10.16.17.182:9000/test/out/0821/09"}; System.exit(run(args)); } public static int run(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(Ip2Hosts.class); job.setMapperClass(IPCountryMapper.class); job.setReducerClass(IPCountryReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /** * 输出 08 和 09 需要调用此设置,07 就需要注释掉 */ MultipleOutputs.addNamedOutput(job,"abc",TextOutputFormat.class,Text.class,IntWritable.class); //通过此配置可以不再产生默认的空文件【part-*-00000】 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); return job.waitForCompletion(true) ? 1 : 0; } //map阶段 public static class IPCountryMapper extends Mapper{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split("\t"); context.write(new Text(splited[1]), new IntWritable(1)); } } //reduce阶段 public static class IPCountryReducer extends Reducer { //1.定义多文件输出类MultipleOutputs private MultipleOutputs mos; @Override protected void setup(Context context ) throws IOException, InterruptedException { //2.MultipleOutputs初始化 mos = new MultipleOutputs (context); } @Override protected void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { int total = 0; for(IntWritable value: values) { total += value.get(); } //3.调用MultipleOutputs中的write()方法 //07-输出 mos.write(/*"abc",*/ key, new IntWritable(total),key.toString()); //08-输出 mos.write("abc", key, new IntWritable(total)/*,key.toString()*/); //09-输出 mos.write("abc", key, new IntWritable(total),key.toString()); } @Override protected void cleanup(Context context ) throws IOException, InterruptedException { //4.关闭流资源 mos.close(); } }}
代码解读:
1).输出-07所调用的方法和对应的输出结果:
/** * @ 输出的key类型 * @ 输出的value类型 * @ 输出的基路径,实际输出结果为:'基路径-r-00000' */MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
2).输出-08所调用的方法和对应的输出结果:
/** * @ 自定义的输出.对于不指定'基路径',则结果为:'自定义的输出-r-00000' * @ 输出的key类型 * @ 输出的value类型 */MultipleOutputs.write(String namedOutput, K key, V value)
3).输出-09所调用的方法和对应的输出结果:
/** * @ 自定义的输出. * @ 输出的key类型 * @ 输出的value类型 * @ 输出的基路径,指定输出'基路径',则结果为:'基路径-r-00000' */MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)
用法总结:
- 在Mapper或Reducer类中创建 MultipleOutputs 成员变量 mos
- 在setup()方法中初始化 mos 变量,
- 在map()或reduce()方法中调用 mos.write() 方法输出数据,代替context.write()
- mos.write() 方法具有三个重载,对于 输出-08-09 还需在Job配置中指定输出格式
- 在cleanup()方法中调用 mos.close() 方法关闭输出流