12.3 大数据计算框架 MapReduce- 编程框架
1.MapReduce:大规模数据处理
处理海量数据(>1TB)
上百上千CPU实现并行处理
简单得实现以上目的
移动计算比移动数据更划算
把计算程序分发到分块数据存储上,进行计算
分而治之(Divide and Conquer)
对分块数据,同时实时计算
分块计算,然后合并
2.MapReduce特性
自动实现分布式并行计算
容错
提供状态监控工具
模型抽象简洁,程序员易用
3.WordCount举例
##词频统计:单词在文章中出现的次数。
##场景:Google的万亿级网页存储后,词频统计。
#文本前期处理
str1_ist=str.replace('\n',' ').lower().split(' ');
count_dict={ }
#如果该字典里有该单词则加1,否则添加入字典
for str in str1_ist:
if str in count_dict.keys():
count_dict[str]=count_dict[str]+1
else:
count_dict[str]=1
特点:传统方式。读取文本到内存,统一处理。
4.MapReduce的WordCount
特点:分片计算,聚合
public class WordCount{
public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one=new IntWritable(1);
private Text word=new Text();
/**key:偏移量(行号); value:文件中的一行文本**/
/**输入(key,value), 输出(key,value)含义**/
public void map(Object key,Text value,Contetxt context)throws IOException,InterruptedException{
/*分词器**/
StringTokenizer itr=new StringTokenizer(vlaue.toString);
/**迭代遍历单词**/
while(itr.hasMoreTokens()){
/**设置单词到word中**/
word.set(itr.nextToken());
/**输出keyValue对{'hello',1}**/
context.write(word,one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result=new IntWritable();
/**输入(key,values):MapReduce框架将相同的key交给reduce函数比如({'hello',{1,1}})**/
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum=0;
/**对相同key的values求和**/
for(IntWritable val:values){
sum+=val.get();
}
result.set(sum);
/**输出求和结果{'hello',2}**/
context.write(key,result);
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//得到集群配置参数
Job job=Job.getInstance(conf,"wordCount");//设置到本次的Job实例中
job.setJarByClass(WordCount.class);//指定本次执行的主类是WordCount
job.setMapperClass(TokenizerMapper.class);//指定Map类
job.setCombineClass(IntSumReducer.class);//指定Reducer类,Map输出的时候,
//本地数据块的Reduce计算(先在本地聚合计算一次,{'hello',3000})
job.setReducerClass(IntSumReducer.class);//指定Reducer类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));//指定输入数据的路径
FileOutputFormat.addOutputPath(job,new Path(args[1]));//指定输出路径
System.exit(job.waitCompletion(true)?0:1);//指定Job执行模式,等待任务执行完成后,提交任务的客户端才会退出。
}
}
########################################################################
评论