本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。
- Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
- Spark商业环境实战-Spark事件监听总线流程分析
- Spark商业环境实战-Spark存储体系底层架构剖析
- Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
- Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
- Spark商业环境实战-Spark任务延迟调度及调度池Pool架构剖析
- Spark商业环境实战-Task粒度的缓存聚合排序结构AppendOnlyMap详细剖析
- Spark商业环境实战-ExternalSorter 外部排序器在Spark Shuffle过程中设计思路剖析
- Spark商业环境实战-ShuffleExternalSorter外部排序器在Spark Shuffle过程中的设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器UnsafeShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析
- [Spark商业环境实战-Spark Shuffle 聚合拉取读数据(Reduce Task)过程深入剖析]
- Spark商业环境实战-StreamingContext启动流程及Dtream 模板源码剖析
- Spark商业环境实战-ReceiverTracker与BlockGenerator数据流接收过程剖析
1 从ShuffeManager讲起
一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:
- 目前只有一个实现 SortShuffleManager。
- SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
- SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
- SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。
官方英文介绍如下:
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the
* driver and on each executor, based on the spark.shuffle.manager setting. The driver
* registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
复制代码
2 UnsafeShuffleWriter的骨干成员
static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;
-
private final BlockManager blockManager;
-
private final IndexShuffleBlockResolver shuffleBlockResolver;
-
private final TaskMemoryManager memoryManager;
-
private final SerializerInstance serializer;
-
private final Partitioner partitioner;
-
private final ShuffleWriteMetrics writeMetrics;
-
private final int shuffleId;
-
private final int mapId;
-
private final TaskContext taskContext;
-
private final SparkConf sparkConf;
-
private final boolean transferToEnabled => 是否采用NIO的从文件流待文件流的复制方式,spark.file.transferTo属性配置,默认是true。
-
private final int initialSortBufferSize =>初始化的排序缓冲大小,可以通过spark.shuffle.sort.initialBuffer.size属性设置,默认是4096
-
private final int inputBufferSizeInBytes;
-
private final int outputBufferSizeInBytes;
-
@Nullable private MapStatus mapStatus;
-
@Nullable private ShuffleExternalSorter sorter;
-
private long peakMemoryUsedBytes = 0; =>使用内存的峰值
3 UnsafeShuffleWriter核心实现方法Writer
看看精彩的代码段:
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next()); <=点睛之笔(将mapTask数据写入排序器)
}
closeAndWriteOutput(); <=点睛之笔(将mapTask数据持久化到磁盘)
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
复制代码
4 UnsafeShuffleWriter核心实现方法insertRecordIntoSorter
将mapTask数据写入排序器,实现内存中排序,但是无聚合
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key); <=点睛之笔
serBuffer.reset();
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeva lue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
sorter.insertRecord( <=点睛之笔,将serBuffer字节数组写入Tungsten
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
复制代码
5 UnsafeShuffleWriter核心实现方法closeAndWriteOutput
将mapTask数据持久化到磁盘
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output);
try {
try {
partitionLengths = mergeSpills(spills, tmp); <=点睛之笔(合并所有溢出文件为正式Block文件)
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); <=点睛之笔(写索引)
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
复制代码
5 总结
UnsafeShuffleWriter内部主要使用Tungsten缓存,当然也可能使用JVM内存。和ExternalSortWriter有明显的区别。
秦凯新 于深圳 1:19