写点什么

12.3 大数据计算框架 MapReduce- 编程框架

用户头像
张荣召
关注
发布于: 2020 年 12 月 14 日

1.MapReduce:大规模数据处理

  • 处理海量数据(>1TB)

  • 上百上千CPU实现并行处理



简单得实现以上目的

  • 移动计算比移动数据更划算

           把计算程序分发到分块数据存储上,进行计算



  • 分而治之(Divide and Conquer)

          对分块数据,同时实时计算

          分块计算,然后合并



2.MapReduce特性

  • 自动实现分布式并行计算

  • 容错

  • 提供状态监控工具

  • 模型抽象简洁,程序员易用

3.WordCount举例

##词频统计:单词在文章中出现的次数。

##场景:Google的万亿级网页存储后,词频统计。

#文本前期处理

str1_ist=str.replace('\n',' ').lower().split(' ');

count_dict={ }



#如果该字典里有该单词则加1,否则添加入字典

for str in str1_ist:

if str in count_dict.keys():

     count_dict[str]=count_dict[str]+1

     else:

            count_dict[str]=1

特点:传统方式。读取文本到内存,统一处理。

4.MapReduce的WordCount

特点:分片计算,聚合

public class WordCount{

     public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable>{

           private final static IntWritable one=new IntWritable(1);

           private Text word=new Text();

           /**key:偏移量(行号); value:文件中的一行文本**/

           /**输入(key,value),     输出(key,value)含义**/

           public void map(Object key,Text value,Contetxt context)throws IOException,InterruptedException{

                 /*分词器**/

                 StringTokenizer itr=new StringTokenizer(vlaue.toString);

                 /**迭代遍历单词**/

                 while(itr.hasMoreTokens()){

                        /**设置单词到word中**/

                        word.set(itr.nextToken());

                        /**输出keyValue对{'hello',1}**/

                        context.write(word,one);

                 }

           }

     }



     public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

           private IntWritable result=new IntWritable();

           /**输入(key,values):MapReduce框架将相同的key交给reduce函数比如({'hello',{1,1}})**/

           public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{

                int sum=0;

                /**对相同key的values求和**/

                for(IntWritable val:values){

                       sum+=val.get();

                }

                result.set(sum);

                /**输出求和结果{'hello',2}**/

                context.write(key,result);

           }

     }



     public static void main(String[] args)throws Exception{

           Configuration conf=new Configuration();//得到集群配置参数

           Job job=Job.getInstance(conf,"wordCount");//设置到本次的Job实例中

           job.setJarByClass(WordCount.class);//指定本次执行的主类是WordCount

           job.setMapperClass(TokenizerMapper.class);//指定Map类

           job.setCombineClass(IntSumReducer.class);//指定Reducer类,Map输出的时候,

                                                                                 //本地数据块的Reduce计算(先在本地聚合计算一次,{'hello',3000})

           job.setReducerClass(IntSumReducer.class);//指定Reducer类

           job.setOutputKeyClass(Text.class);

           job.setOutputValueClass(IntWritable.class);

           FileInputFormat.addInputPath(job,new Path(args[0]));//指定输入数据的路径

           FileOutputFormat.addOutputPath(job,new Path(args[1]));//指定输出路径

           System.exit(job.waitCompletion(true)?0:1);//指定Job执行模式,等待任务执行完成后,提交任务的客户端才会退出。  

     }

}

########################################################################











用户头像

张荣召

关注

还未添加个人签名 2018.05.02 加入

还未添加个人简介

评论

发布
暂无评论
12.3大数据计算框架MapReduce-编程框架