partition的作用是把环形缓冲区中的map输出分区存储,以便分配给不同的reducer。

把内部的实现写下来,作为一个学习笔记

  1. 在map函数,调用context.write()时,会去调用分区函数,得到分区号,把分区号一块写进keyvalue的元数据。
  2. 当环形缓冲区达到溢写磁盘时
    • a) 对每个分区内的数据进行排序
    • b) 把每个分区内的数据写到磁盘

下面通过代码来说明

1

context.write(K,V) -> MapTask.NewOutputCollector.write(K, V) -> MapOutputBuffer.collect(K, V, partion)

void MapTask.NewOutputCollector.write(K key, V value) {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions)); // 调用分区函数
} MapOutputBuffer.collect(K, V, partion) {
...
kvmeta.put(kvindex + PARTITION, partition); // 把分区号一块写进keyvalue元数据
...
}

2-a)

MapTask.MapOutputBuffer.flush()->MapTask.MapOutputBuffer.sortAndSpill()->IndexedSortable.compare(final int mi, final int mj)


void MapTask.MapOutputBuffer.sortAndSpill() {
...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); // 对数据进行排序,默认采用快速排序。调用了下面的compare()方法
...
} // 比较 mi和mj所对应的两个key,这个方法先比较分区号,如果分区号相同,才有必要比较key,实现了按各个分区内的key进行排序
public int MapTask.MapOutputBuffer.compare(final int mi, final int mj) {
final int kvi = offsetFor(mi % maxRec);
final int kvj = offsetFor(mj % maxRec);
final int kvip = kvmeta.get(kvi + PARTITION); // 从keyvalue元数据取出mi的分区号
final int kvjp = kvmeta.get(kvj + PARTITION); // 从keyvalue元数据取出mj的分区号
// sort by partition
if (kvip != kvjp) { // 如果分区号不相同,直接比较分区号:分区号的大小决定了写磁盘时的先后顺序
return kvip - kvjp;
}
// sort by key // 分区号相同,再比较key,这个方法调用RawComparator.compare(buffer, s1, l1, s2, l2);
return comparator.compare(kvbuffer,
kvmeta.get(kvi + KEYSTART), // key1的开始位置
kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), // key1的结束位置
kvbuffer,
kvmeta.get(kvj + KEYSTART), //key2的开始位置
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); // key2的开始位置
}
2-b)

a和b都是在sortAndSpill()中


void MapTask.MapOutputBuffer.sortAndSpill() {
...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); // 对数据进行排序,默认采用快速排序。调用了下面的compare()方法
... // 按分区号从小到大,一个分区一个分区写进磁盘
for (int i = 0; i < partitions; ++i) {
...
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { // 从元数据读出kv分区号,如果是当前正在写磁盘的分区号,就把这个kv写到磁盘
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value); // 把kv写到磁盘
++spindex;
}
} ...
}

经过上面这些步骤,环形缓冲区内的kv,就按分区写到磁盘,并且每个分区内的数据是有序的。

当然,这并不能保证同一个分区内,先后溢写的数据是有序的。后面使用归并排序对磁盘上的分区数据再做一轮排序,这个以后再做分析。

最新文章

  1. HTML行为元素和块级元素及语义化
  2. Makefile使用库
  3. JAVA多线程和并发基础面试问答(转载)
  4. ProgressBar.js – 漂亮的响应式 SVG 进度条
  5. Maven-setting.xml详解
  6. POJ 1061 同余方程
  7. vs2010 rdlc .net4.0 卸载 Appdomain 时出错。 (异常来自 HRESULT:0x80131015) 解决办法
  8. java数据类型总结
  9. Infragistics UltraGrid的使用
  10. Javascript模块规范(CommonJS规范&amp;&amp;AMD规范)
  11. Visual Prolog 的 Web 专家系统 (9)
  12. .NET获取英文月份缩写名(可获取其他国家)
  13. Visual Studio 2015 Owin+MVC+WebAPI+ODataV4+EntityFrawork+Identity+Oauth2.0+AngularJS 1.x 学习笔记之&quot;坑&quot;
  14. LeetCode OJ 153. Find Minimum in Rotated Sorted Array
  15. HDU 2167 Pebbles(状压DP)
  16. mongodb备份恢复,数据导入导出
  17. flask中jinjia2模板使用详解2
  18. C#中的匿名函数使用,类名&lt;T&gt;
  19. Java异常处理-----java异常体系
  20. 【JDBC】java.sql.SQLException: The server time zone value &#39;&#214;&#208;&#185;&#250;&#177;&#234;&#215;&#188;&#202;&#177;&#188;&#228;&#39; is unrecognized or represents more than one time zone.

热门文章

  1. FAQ: Automatic Statistics Collection (文档 ID 1233203.1)
  2. (转)由Uploadify插件想到的Flash无法传递Session和Cookie的问题解决
  3. 解决Git报错:The current branch is not configured for pull No value for key branch.master.merge found in configuration
  4. g++/gcc 链接头文件 库 PATH
  5. css+js+html基础知识总结
  6. python中operator.itemgetter
  7. No.007 Reverse Integer
  8. 【EF 4】ORM框架及其流行产品之一EF介绍
  9. windbg配置问题汇总
  10. jquery的end(),addBack()方法example