需求:
利用mapReduce实现类似微博中查找共同粉丝的功能。如下:
A:B,C,D,F,E,O
B:A,C,E,KC:F,A,D,ID:A,E,F,LE:B,C,D,M,LF:A,B,C,D,E,O,MG:A,C,D,E,FH:A,C,D,E,OI:A,OJ:B,OK:A,C,DL:D,E,FM:E,F,GO:A,H,I,J求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都是谁。
比如:A,B [C,E]分析:
在利用MapReduce程序解答之前,我们不妨用单机程序练习一下,思路很简单,可以利用两个for循环进行遍历,分别找之间的共同好友,如果有则存到list中,设一个map,key就是两个人的ID,value就是存的list,最后就能求得两个人之间的共同好友。程序如下:
package com.darrenchan.test;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.LinkedHashMap;import java.util.List;import java.util.Map;public class Test { public static void main(String[] args) throws Exception { FileInputStream fis = new FileInputStream(new File("data.txt")); InputStreamReader isr = new InputStreamReader(fis); BufferedReader br = new BufferedReader(isr); String line = null; // 将文件中的内容存到list中 Listlist = new ArrayList (); while ((line = br.readLine()) != null) { list.add(line); } Map > map = new LinkedHashMap<>(); // 对list进行处理 for (int i = 0; i < list.size(); i++) { for (int j = i + 1; j < list.size(); j++) { //临时的list,用于拼接最后结果中的共同好友 List tempList = new ArrayList<>(); //按照":"进行分割 String keyi = list.get(i).split(":")[0]; String keyj = list.get(j).split(":")[0]; String contenti = list.get(i).split(":")[1]; String contentj = list.get(j).split(":")[1]; //让i层的每一个好友分别和j层的好友找共同好友 String[] fields = contenti.split(","); for (int k = 0; k < fields.length; k++) { if (contentj.contains(fields[k])) { tempList.add(fields[k]); } } // 如果tempList里面有内容说明就是有相同元素 if (tempList.size() > 0) { map.put(keyi + "," + keyj, tempList); } } } // 打印map for (String key : map.keySet()) { System.out.println(key + ":" + map.get(key)); } }}
求得结果:
A,B [C, E]
A,C [D, F]A,D [F, E]A,E [B, C, D]A,F [B, C, D, E, O]A,G [C, D, F, E]A,H [C, D, E, O]A,I [O]A,J [B, O]A,K [C, D]A,L [D, F, E]A,M [F, E]B,C [A]B,D [A, E]B,E [C]B,F [A, C, E]B,G [A, C, E]B,H [A, C, E]B,I [A]B,K [A, C]B,L [E]B,M [E]B,O [A]C,D [F, A]C,E [D]C,F [A, D]C,G [F, A, D]C,H [A, D]C,I [A]C,K [A, D]C,L [F, D]C,M [F]C,O [A, I]D,E [L]D,F [A, E]D,G [A, E, F]D,H [A, E]D,I [A]D,K [A]D,L [E, F]D,M [E, F]D,O [A]E,F [B, C, D, M]E,G [C, D]E,H [C, D]E,J [B]E,K [C, D]E,L [D]F,G [A, C, D, E]F,H [A, C, D, E, O]F,I [A, O]F,J [B, O]F,K [A, C, D]F,L [D, E]F,M [E]F,O [A]G,H [A, C, D, E]G,I [A]G,K [A, C, D]G,L [D, E, F]G,M [E, F]G,O [A]H,I [A, O]H,J [O]H,K [A, C, D]H,L [D, E]H,M [E]H,O [A]I,J [O]I,K [A]I,O [A]K,L [D]K,O [A]L,M [E, F]接下来我们思考:如何用MapReduce的程序进行求解呢?
一般如果一个步骤解决不了的问题,我们通常会采用两个步骤来进行求解。在本题中,我们进行思考,让求任意两个人的共同粉丝,那么我们不妨先求得某一个人是哪些人的粉丝,比如:B是A,E,F,G的粉丝,这是第一步我们需要求的。第二步呢?我们就两两配对,AE共同粉丝有B,AF共同粉丝有B,AG共同粉丝有B......然后reduce合并一下即可。
ShareFriendsStepOne.java:
package com.darrenchan.sharefriends;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ShareFriendsStepOne { public static class ShareFriendsStepOneMapper extends Mapper{ Text keyText = new Text(); Text valueText = new Text(); /** * 拿到的数据格式是A:B,C,D,F,E,O */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //按照":"进行分割 String person = line.split(":")[0]; String content = line.split(":")[1]; //该person下的所有fans String[] fans = content.split(","); valueText.set(person); for (int i = 0; i < fans.length; i++) { keyText.set(fans[i]); context.write(keyText, valueText); } } } public static class ShareFriendsStepOneReducer extends Reducer { /** * 拿到的数据格式是 ,即B是AEFG的粉丝 */ Text valueText = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text fan : values) { sb.append(fan).append(","); } //最后多了一个“,”,把它消掉 String outFans = sb.substring(0, sb.length()-1); valueText.set(outFans); context.write(key, valueText); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ShareFriendsStepOne.class); job.setMapperClass(ShareFriendsStepOneMapper.class); job.setReducerClass(ShareFriendsStepOneReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
ShareFriendsStepTwo.java:
package com.darrenchan.sharefriends;import java.io.IOException;import java.util.Arrays;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ShareFriendsStepTwo { public static class ShareFriendsStepTwoMapper extends Mapper{ Text keyText = new Text(); Text valueText = new Text(); /** * 拿到的数据格式是A I,K,C,B,G,F,H,O,D 即A是I,K,C,B,G,F,H,O,D的粉丝,然后将后面的两两配对 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String fan = line.split("\t")[0]; String content = line.split("\t")[1]; String[] persons = content.split(","); // 将persons进行排序 Arrays.sort(persons); valueText.set(fan); for (int i = 0; i < persons.length; i++) { for (int j = i + 1; j < persons.length; j++) { keyText.set(persons[i] + "," + persons[j]); context.write(keyText, valueText); } } } } public static class ShareFriendsStepTwoReducer extends Reducer { /** * 拿到的数据格式是 ,即AB之间的共同粉丝有CE */ Text valueText = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); sb.append("["); for (Text fan : values) { sb.append(fan).append(","); } sb.append("]"); //去掉多余的“,” sb.deleteCharAt(sb.length()-2); valueText.set(sb.toString()); context.write(key, valueText); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ShareFriendsStepTwo.class); job.setMapperClass(ShareFriendsStepTwoMapper.class); job.setReducerClass(ShareFriendsStepTwoReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
求得结果同上:
A,B [E,C]
A,C [D,F]A,D [E,F]A,E [D,B,C]A,F [O,B,C,D,E]A,G [F,E,C,D]A,H [E,C,D,O]A,I [O]A,J [O,B]A,K [D,C]A,L [F,E,D]A,M [E,F]B,C [A]B,D [A,E]B,E [C]B,F [E,A,C]B,G [C,E,A]B,H [A,E,C]B,I [A]B,K [C,A]B,L [E]B,M [E]B,O [A]C,D [A,F]C,E [D]C,F [D,A]C,G [D,F,A]C,H [D,A]C,I [A]C,K [A,D]C,L [D,F]C,M [F]C,O [I,A]D,E [L]D,F [A,E]D,G [E,A,F]D,H [A,E]D,I [A]D,K [A]D,L [E,F]D,M [F,E]D,O [A]E,F [D,M,C,B]E,G [C,D]E,H [C,D]E,J [B]E,K [C,D]E,L [D]F,G [D,C,A,E]F,H [A,D,O,E,C]F,I [O,A]F,J [B,O]F,K [D,C,A]F,L [E,D]F,M [E]F,O [A]G,H [D,C,E,A]G,I [A]G,K [D,A,C]G,L [D,F,E]G,M [E,F]G,O [A]H,I [O,A]H,J [O]H,K [A,C,D]H,L [D,E]H,M [E]H,O [A]I,J [O]I,K [A]I,O [A]K,L [D]K,O [A]L,M [E,F]