写点什么

Hadoop 之 MapReduce04【客户端源码分析】

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:4233 字

    阅读完需:约 14 分钟

throws IOException, InterruptedException, ClassNotFoundException {


// 再次确认任务状态


ensureState(JobState.DEFINE);


// 默认使用 new APIs


setUseNewAPI();


// 初始化 cluster 对象


connect();


// 根据初始化得到的 cluster 对象生成 JobSubmitter 对象


final JobSubmitter submitter =


getJobSubmitter(cluster.getFileSystem(), cluster.getClient());


//


status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {


public JobStatus run() throws IOException, InterruptedException,


ClassNotFoundException {


// 进入 submitJobInternal 方法查看


return submitter.submitJobInternal(Job.this, cluster);


}


});


//将 job 的状态设置为 RUNNING


state = JobState.RUNNING;


LOG.info("The url to track the job: " + getTrackingURL());


}

3.3 submitJobInternal

/**


  • 检查 job 的输入输出规范

  • 计算 job 的 InputSplit

  • 如果需要的话,设置需要的核算信息对于 job 的分布式缓存

  • 复制 job 的 jar 和配置文件到分布式文件系统的系统目录

  • 提交作业执行以及监控它的状态


*/


JobStatus submitJobInternal(Job job, Cluster cluster)


throws ClassNotFoundException, InterruptedException, IOException {


//检查 job 的输出空间


checkSpecs(job);


Configuration conf = job.getConfiguration();


// 将 MapReduce 框架加入分布式缓存中


addMRFrameworkToDistributedCache(conf);


// 初始化 job 的工作根目录并返回 path 路径


Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);


//configure the command line options correctly on the submitting dfs


InetAddress ip = InetAddress.getLocalHost();


if (ip != null) {


submitHostAddress = ip.getHostAddress();


submitHostName = ip.getHostName();


conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);


conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);


}


// 为 job 分配一个名字


JobID jobId = submitClient.getNewJobID();


job.setJobID(jobId);


// 获得 job 的提交路径,也就是在 jobStagingArea 目录下建一个以 jobId 为文件名的目录


Path submitJobDir = new Path(jobStagingArea, jobId.toString());


JobStatus status = null;


// 进行一系列的配置


try {


conf.set(MRJobConfig.USER_NAME,


UserGroupInformation.getCurrentUser().getShortUserName());


conf.set("hadoop.http.filter.initializers",


"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");


conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());


LOG.debug("Configuring job " + jobId + " with " + submitJobDir


  • " as the submit dir");


// get delegation token for the dir


TokenCache.obtainTokensForNamenodes(job.getCredentials(),


new Path[] { submitJobDir }, conf);


populateTokenCache(conf, job.getCredentials());


// generate a secret to authenticate shuffle transfers


if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {


KeyGenerator keyGen;


try {


keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);


keyGen.init(SHUFFLE_KEY_LENGTH);


} catch (NoSuchAlgorithmException e) {


throw new IOException("Error generating shuffle secret key", e);


}


SecretKey shuffleKey = keyGen.generateKey();


TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),


job.getCredentials());


}


// 这个方法实现文件上传


copyAndConfigureFiles(job, submitJobDir);


Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);


// Create the splits for the job


LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));


// 方法内部会根据我们之前的设置,选择使用 new-api 还是 old-api 分别进行分片操作


int maps = writeSplits(job, submitJobDir);


conf.setInt(MRJobConfig.NUM_MAPS, maps);


LOG.info("number of splits:" + maps);


// write "queue admins of the queue to which job is being submitted"


// to job file.


String queue = conf.get(MRJobConfig.QUEUE_NAME,


JobConf.DEFAULT_QUEUE_NAME);


AccessControlList acl = submitClient.getQueueAdmins(queue);


conf.set(toFullPropertyName(queue,


QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());


// removing jobtoken referrals before copying the jobconf to HDFS


// as the tasks don't need this setting, actually they may break


// because of it if present as the referral will point to a


// different job.


TokenCache.cleanUpTokenReferral(conf);


if (conf.getBoolean(


MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,


MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {


// Add HDFS tracking ids


ArrayList<String> trackingIds = new ArrayList<String>();


for (Token<? extends TokenIdentifier> t :


job.getCredentials().getAllTokens()) {


trackingIds.add(t.decodeIdentifier().getTrackingId());


}


conf.setStrings(MRJob


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


Config.JOB_TOKEN_TRACKING_IDS,


trackingIds.toArray(new String[trackingIds.size()]));


}


// 提交规划文件 job.split wc.jar ...


writeConf(conf, submitJobFile);


//


// Now, actually submit the job (using the submit name)


// 提交任务


printTokens(jobId, job.getCredentials());


status = submitClient.submitJob(


jobId, submitJobDir.toString(), job.getCredentials());


if (status != null) {


return status;


} else {


throw new IOException("Could not launch job");


}


} finally {


if (status == null) {


LOG.info("Cleaning up the staging area " + submitJobDir);


if (jtFs != null && submitJobDir != null)


jtFs.delete(submitJobDir, true);


}


}


}

3.4writeSplits

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,


Path jobSubmitDir) throws IOException,


InterruptedException, ClassNotFoundException {


JobConf jConf = (JobConf)job.getConfiguration();


int maps;


if (jConf.getUseNewMapper()) {


//进入


maps = writeNewSplits(job, jobSubmitDir);


} else {


maps = writeOldSplits(jConf, jobSubmitDir);


}


return maps;


}

3.5writeNewSplits

int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,


InterruptedException, ClassNotFoundException {


Configuration conf = job.getConfiguration();


// 根据我们设置的 inputFormat.class 通过反射获得 inputFormat 对象


InputFormat<?, ?> input =


ReflectionUtils.newInstance(job.getInputFormatClass(), conf);


// 获取分片信息


List<InputSplit> splits = input.getSplits(job);


T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);


// sort the splits into order based on size, so that the biggest


// go first


Arrays.sort(array, new SplitComparator());


// 将分片的信息写入到 jobSubmitDir --job.split 文件中


JobSplitWriter.createSplitFiles(jobSubmitDir, conf,


jobSubmitDir.getFileSystem(conf), array);


return array.length;


}

3.6 getSplits

public List<InputSplit> getSplits(JobContext job) throws IOException {


Stopwatch sw = new Stopwatch().start();


// 最小值


long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));


// 最大值


long maxSize = getMaxSplitSize(job);


// generate splits


List<InputSplit> splits = new ArrayList<InputSplit>();


List<FileStatus> files = listStatus(job);


for (FileStatus file: files) {


Path path = file.getPath();


long length = file.getLen();


if (length != 0) {


BlockLocation[] blkLocations;


if (file instanceof LocatedFileStatus) {


blkLocations = ((LocatedFileStatus) file).getBlockLocations();


} else {


FileSystem fs = path.getFileSystem(job.getConfiguration());


blkLocations = fs.getFileBlockLocations(file, 0, length);


}


if (isSplitable(job, path)) {


// 获取 block 大小


long blockSize = file.getBlockSize();


// 获取 splitSize 大小


long splitSize = computeSplitSize(blockSize, minSize, maxSize);


long bytesRemaining = length;


while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {


int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);


splits.add(makeSplit(path, length-bytesRemaining, splitSize,


blkLocations[blkIndex].getHosts(),


blkLocations[blkIndex].getCachedHosts()));


bytesRemaining -= splitSize;


}


if (bytesRemaining != 0) {


int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);


splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,


blkLocations[blkIndex].getHosts(),


blkLocations[blkIndex].getCachedHosts()));


}


} else { // not splitable


splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),


blkLocations[0].getCachedHosts()));


}


} else {


//Create empty hosts array for zero length files


splits.add(makeSplit(path, 0, length, new String[0]));


}


}


// Save the number of input files for metrics/loadgen


job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());


sw.stop();


if (LOG.isDebugEnabled()) {


LOG.debug("Total # of splits generated by getSplits: " + splits.size()


  • ", TimeTaken: " + sw.elapsedMillis());


}


return splits;


}

3.7computeSplitSize

protected long computeSplitSize(long blockSize, long minSize,


long maxSize) {


return Math.max(minSize, Math.min(maxSize, blockSize));


}

3.8 submitJobInternal

回到 submitJobInternal 方法中


// 提交规划文件 job.split wc.jar ...


writeConf(conf, submitJobFile);


//


// Now, actually submit the job (using the submit name)


// 提交任务


printTokens(jobId, job.getCredentials());


status = submitClient.submitJob(


jobId, submitJobDir.toString(), job.getCredentials());


if (status != null) {


return status;


} else {


throw new IOException("Could not launch job");

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Hadoop之MapReduce04【客户端源码分析】