`
flyfoxs
  • 浏览: 294568 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

【大数据笔记】--浅谈WordCount的Bug

 
阅读更多

最近精读Hadoop WordCount的示例,发现里面应该存在一个"可能的"Bug,现斗胆发出,希望有高人指点.

 

Bug描述:

WordCount数单词的时候,如果遇到大文件会对文件进行切分.但是切分是按照字节来进行的,完全有可能会将一个单词切分成2个单词,这样也就可能会创造2个不存在的单词.

 

相关代码:

  • WordCount main 函数

   ( FileInputFormat.addInputPath(job, new Path(otherArgs[0]));)

 

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //FileInputFormat定义了大的输入文件,如何拆分
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

 

  •  具体拆分代码:
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files in the job-conf
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDir()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                            minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    //可以很清楚的看出来,这里的拆分全部是基于字节的.
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      if ((length != 0) && isSplitable(fs, path)) { 
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          String[] splitHosts = getSplitHosts(blkLocations, 
              length-bytesRemaining, splitSize, clusterMap);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
              splitHosts));
          bytesRemaining -= splitSize;
        }
        
        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(new FileSplit(path, 0, length, splitHosts));
      } else { 
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

 

 

原因猜想:

Hadoop处理的是大数据,拆分中出现几个单词错误是可以接受的,所以Hadoop就没有对这个进行特别严格的处理.

 

解决方法:

也就是在后期读取文件时,做一些微调. 具体详情可以参考Blog: 大数据笔记--续谈WordCount的Bug  

 

 

 

 

0
0
分享到:
评论
2 楼 flyfoxs 2014-09-09  
为善去恶王守仁 写道
http://blog.csdn.net/wanghai__/article/details/6583364


多谢指点,我自己搜索了好久都没找到答案.
1 楼 为善去恶王守仁 2014-09-09  
http://blog.csdn.net/wanghai__/article/details/6583364

相关推荐

Global site tag (gtag.js) - Google Analytics