EditsDoubleBuffer是为edits准备的双缓冲区。新的编辑被写入第一个缓冲区,同时第二个缓冲区可以被flush。为edits准备的双缓冲区。新的编辑被写入第一个缓冲区,同时第二个缓冲区可以被flush。在其内部,有两个重要的缓冲区成员变量,如下:
// 当前被写入的缓冲区bufCurrent
private TxnBuffer bufCurrent; // current buffer for writing
// 正在进行flush的缓冲区bufReady
private TxnBuffer bufReady; // buffer ready for flushing
// 初始化缓冲区大小initBufferSize
private final int initBufferSize;
其中,bufCurrent是当前被写入的缓冲区,当前被写入的缓冲区是正在进行flush的缓冲区,而initBufferSize则是初始化缓冲区大小。我们再看下EditsDoubleBuffer的构造函数,如下:
// 构造函数
public EditsDoubleBuffer(int defaultBufferSize) {
// 根据入参赋值initBufferSizeinitBufferSize
initBufferSize = defaultBufferSize;
// 创建当前被写入的缓冲区bufCurrent
bufCurrent = new TxnBuffer(initBufferSize);
// 创建正在进行flush的缓冲区bufReady
bufReady = new TxnBuffer(initBufferSize);
}
根据入参赋值initBufferSizeinitBufferSize,然后分别创建上述两个缓冲区:创建当前被写入的缓冲区bufCurrent、创建正在进行flush的缓冲区bufReady。
而EditsDoubleBuffer最基本的写入功能有两个,一个是用于写入操作符的writeOp()方法,另外一个就是用于写入事务的writeRaw()方法,代码分别如下:
// 写入操作符至bufCurrent
public void writeOp(FSEditLogOp op) throws IOException {
bufCurrent.writeOp(op);
}
// 写入事务至bufCurrent
public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
bufCurrent.write(bytes, offset, length);
}
均是将操作符或事物写入bufCurrent缓冲区。而在准备flush前,需要先调用setReadyToFlush()方法,设置缓冲区可以进行flush,代码如下:
// 设置双缓冲区为可以进行flsuh
public void setReadyToFlush() {
// 确保之前的数据已经被flush完毕,调用isFlushed()方法判断bufReady的大小是否为0即可
assert isFlushed() : "previous data not flushed yet";
// 交换bufReady、bufCurrent
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
它首先会确保之前的数据已经被flush完毕,调用isFlushed()方法判断bufReady的大小是否为0即可,然后交换bufReady、bufCurrent。
接着,我们需要调用flushTo()方法,将bufReady的内容写入指定输出流,并清空bufReady。此时不交换任何缓冲区,代码如下:
/**
* Writes the content of the "ready" buffer to the given output stream,
* and resets it. Does not swap any buffers.
* 将bufReady的内容写入指定输出流,并清空bufReady。此时不交换任何缓冲区。
*/
public void flushTo(OutputStream out) throws IOException {
bufReady.writeTo(out); // write data to file
bufReady.reset(); // erase all data in the buffer
}
而bufCurrent、bufReady都是一个TxnBuffer类型的缓冲区,这个TxnBuffer是对DataOutputBuffer的一个封装,保存了第一个事务艾迪firstTxId、事务数量numTxns、写入者writer等变量,它主要的两个方法,一个是写入操作符的writeOp()方法,实现如下:
// 写入操作符
public void writeOp(FSEditLogOp op) throws IOException {
// 首次事务艾迪firstTxId被赋值为操作符的事务ID
if (firstTxId == HdfsConstants.INVALID_TXID) {
firstTxId = op.txid;
} else {
// 之后确保操作符的事务ID永远大于首次事务ID
assert op.txid > firstTxId;
}
// 调用writer写入操作符
writer.writeOp(op);
// 事务数量numTxns累加
numTxns++;
}
首次事务艾迪firstTxId被赋值为操作符的事务ID,之后确保操作符的事务ID永远大于首次事务ID,然后调用writer写入操作符,并将事务数量numTxns累加。