motivation 動(dòng)機(jī)
The various 2G limit in Spark. Spark中存在的各種2G限制問題.
- When reading the data block is stored in the hard disk, the following code fragment is called.
獲取緩存在本地硬盤的數(shù)據(jù)塊時(shí),會(huì)調(diào)用以下代碼片段
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskBytes.toInputStream(dispose = true))(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
.map {_.toInputStream(dispose = false)}
.getOrElse { diskBytes.toInputStream(dispose = true) }
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
def getBytes(blockId: BlockId): ChunkedByteBuffer = {
val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (file.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(file.length.toInt)
channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
new ChunkedByteBuffer(buf)
} else {
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
}
} {
channel.close()
}
}
The above code has the following problems: 上面的代碼存在以下問題:
* Channel.map(MapMode.READ_ONLY, 0, file.length)
returns an instance of MappedByteBuffer
. the size of MappedByteBuffer
can not exceed 2G. channel.map(MapMode.READ_ONLY, 0, file.length)
返回的實(shí)例是MappedByteBuffer
. MappedByteBuffer的大小不能超過2G
* When a Iterator[Any]
is generated, need to load all the data into the memory,this may take up a lot of memory. 獲取Iterator[Any]
時(shí)需要把全部數(shù)據(jù)加載到內(nèi)存中, 這可能會(huì)導(dǎo)致占用很多堆外內(nèi)存.
* MappedByteBuffer map a file to memory, and it's controlled by operator system, JVM can't control the memory. MappedByteBuffer 使用系統(tǒng)緩存,系統(tǒng)緩存不可控.
- When using kryo serialized data, the following code fragment is called:
在使用kryo序列化數(shù)據(jù)時(shí), 會(huì)調(diào)用以下代碼片段:
override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
val kryo = borrowKryo()
try {
kryo.writeClassAndObject(output, t)
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max value.")
} finally {
releaseKryo(kryo)
}
ByteBuffer.wrap(output.toBytes)
}
The above code has the following problems: 上面的代碼存在以下問題:
* The serialization data is stored in the output
internal byte[]
, the size of byte[]
can not exceed 2G. 序列化t時(shí)會(huì)把序列化后的數(shù)據(jù)存儲(chǔ)在output內(nèi)部byte[]里, byte[]的大小不能超過2G.
- When RPC writes data to be sent to a Channel, the following code fragment is called:
在RPC把要發(fā)送的數(shù)據(jù)寫入到Channel時(shí)會(huì)調(diào)用以下代碼片段:
public long transferTo(final WritableByteChannel target, final long position) throws IOException {
Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
// Bytes written for header in this call.
long writtenHeader = 0;
if (header.readableBytes() > 0) {
writtenHeader = copyByteBuf(header, target);
totalBytesTransferred += writtenHeader;
if (header.readableBytes() > 0) {
return writtenHeader;
}
}
// Bytes written for body in this call.
long writtenBody = 0;
if (body instanceof FileRegion) {
writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
} else if (body instanceof ByteBuf) {
writtenBody = copyByteBuf((ByteBuf) body, target);
}
totalBytesTransferred += writtenBody;
return writtenHeader + writtenBody;
}
The above code has the following problems: ~~上面的代碼存在以下問題: ~~
* the size of ByteBuf cannot exceed 2G. ByteBuf的大小不能超過2G
* cannot transfer data over 2G in memory. ~~無法傳輸內(nèi)存中超過2G的數(shù)據(jù) ~~
- When decodes the RPC message received, the following code fragment is called:
解碼RPC接收的消息時(shí)調(diào)用以下代碼片段:
public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
Message.Type msgType = Message.Type.decode(in);
Message decoded = decode(msgType, in);
assert decoded.type() == msgType;
logger.trace("Received message {}: {}", msgType, decoded);
out.add(decoded);
}
private Message decode(Message.Type msgType, ByteBuf in) {
switch (msgType) {
case ChunkFetchRequest:
return ChunkFetchRequest.decode(in);
case ChunkFetchSuccess:
return ChunkFetchSuccess.decode(in);
case ChunkFetchFailure:
return ChunkFetchFailure.decode(in);
default:
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
}
}
The above code has the following problems: 上面的代碼存在以下問題:
* the size of ByteBuf cannot exceed 2G. ByteBuf的大小不能超過2G
* Must be in the receiver to complete the data can be decoded. 必須在接收到全部數(shù)據(jù)時(shí)才能解碼.
Goals
- Setup for eliminating the various 2G limit in Spark.
解決Spark中存在的各種2G限制問題.(The 2G limit 1,2,3,4) - Support back-pressure flow control for remote data reading(experimental goal). ~~遠(yuǎn)程數(shù)據(jù)讀取支持back-pressure flow control(實(shí)驗(yàn)?zāi)繕?biāo)). ~~ (The 2G limit 4)
- Add buffer pool(long-range goal).
添加緩存池(遠(yuǎn)期目標(biāo)).
Design
Setup for eliminating the various 2G limit in Spark. 解決Spark中存在的各種2G限制問題.
Replace ByteBuffer with ChunkedByteBuffer. 使用 ChunkedByteBuffer 替換 ByteBuffer. (The 2G limit 1,2)
ChunkedByteBuffer Introduction: ChunkedByteBuffer 介紹:
- Store data with multiple
ByteBuffer
instance.用多個(gè)ByteBuffer存儲(chǔ)數(shù)據(jù) - Support reference counting, a necessary condition to the feature of the buffer pool.
支持引用計(jì)數(shù),實(shí)現(xiàn)資源池必要條件
Reference counted objects - Support serialization for easy transport.
支持序列化,方便傳輸 - Support slice duplicate and copy operation.
支持類似于ByteBuffer的切片(slice), 副本(duplicate)和復(fù)制(copy)等操作, 方便處理 - Can be efficiently converted to
InputStream
,ByteBuffer
,byte[]
andByteBuf
, etc.可以高效轉(zhuǎn)換成InputStream
,ByteBuffer
,byte[]
和ByteBuf
等,便于和其他接口對(duì)接 - 可以方便的寫入數(shù)據(jù)到
OutputStream
- Move the ChunkedByteBuffer class to
common/network-common/src/main/java/org/apache/spark/network/buffer/
. ~~把ChunkedByteBuffer類移動(dòng)到common/network-common/src/main/java/org/apache/spark/network/buffer/
. ~~ - Modify
ManagedBuffer.nioByteBuffer
's return value to ChunkedByteBuffer instance.修改ManagedBuffer.nioByteBuffer的返回值為ChunkedByteBuffer實(shí)例.(The 2G limit 1) - Further standardize the use of
ManagedBuffer
andChunkedByteBuffer
.進(jìn)一步規(guī)范ManagedBuffer
和ChunkedByteBuffer
的使用.
- Data in memory, network, disk and other sources are represented with
ManagedBuffer
,內(nèi)存,網(wǎng)絡(luò),硬盤和其他來源的數(shù)據(jù)使用ManagedBuffer
表示. - ChunkedByteBuffer only represents the data in the memory.
ChunkedByteBuffer只表示內(nèi)存中的數(shù)據(jù). -
ManagedBuffer.nioByteBuffer
is called only when there is sufficient memory.只有在確認(rèn)有足夠的內(nèi)存保存數(shù)據(jù)時(shí)才會(huì)調(diào)用ManagedBuffer.nioByteBuffer.
- Modify the parameter of
SerializerInstance.deserialize
and the return value ofSerializerInstance.serialize
to ChunkedByteBuffer instance.
修改SerializerInstance.deserialize方法的參數(shù)和SerializerInstance.serialize方法的返回值改為ChunkedByteBuffer實(shí)例.(The 2G limit 2)
def serialize[T: ClassTag](t: T): ChunkedByteBuffer = {
output.clear()
val out = ChunkedByteBufferOutputStream.newInstance()
// The data is output to the OutputStream, rather than the internal byte[] in the output object.
// ~~序列化后的數(shù)據(jù)輸出到OutputStream,而不是到output對(duì)象的內(nèi)部字節(jié)數(shù)組里.~~
output.setOutputStream(out)
val kryo = borrowKryo()
kryo.writeClassAndObject(output, t)
output.close()
out.toChunkedByteBuffer
}
- Other changes.
其他修改.
Replace ByteBuf with InputStream. 使用 InputStream 替換 ByteBuf.
- Add InputStreamManagedBuffer class, used to convert InputStream instance to ManagedBuffer instance.
添加InputStreamManagedBuffer類,用于把InputStream轉(zhuǎn)換成ManagedBuffer實(shí)例.(The 2G limit 4) - Modify
NioManagedBuffer.convertToNetty
method returns InputStream instances when the size of data is larger than Integer.MAX_VALUE.修改(The 2G limit 3)NioManagedBuffer.convertToNetty
方法在數(shù)據(jù)量大于Integer.MAX_VALUE時(shí)返回InputStream實(shí)例. - Modify MessageWithHeader classes, support processing InputStream instance (The 2G limit 3)
修改MessageWithHeader類, 支持處理InputStream類型的body對(duì)象
-
2.
和3.
的修改結(jié)合起來支持傳輸內(nèi)存中超過2G的數(shù)據(jù).
- Modify the parameters of the
Encodable.encode
method to OutputStream instance.修改Encodable.encode方法的參數(shù)為OutputStream實(shí)例.(The 2G limit 4)
5.It can handle mixed storage data. ~~UploadBlock添加toInputStream方法,支持處理混合存儲(chǔ)數(shù)據(jù)(The 2G limit 3) ~~
public InputStream toInputStream() throws IOException {
ChunkedByteBufferOutputStream out = ChunkedByteBufferOutputStream.newInstance();
Encoders.Bytes.encode(out, type().id());
encodeWithoutBlockData(out);
// out.toChunkedByteBuffer().toInputStream() data in memory
// blockData.createInputStream() data in hard disk(FileInputStream)
return new SequenceInputStream(out.toChunkedByteBuffer().toInputStream(),
blockData.createInputStream());
}
-
2
,3
,4
and5
are combined to resolve the 2G limit in RPC message encoding and sending process.2.
3.
4.
和5.
組合起來解決RPC消息編碼和發(fā)送過程中的2G限制.
- Modify the parameters of the decode method of the classes who implement the Encodable interface to InputStream instance. ~~修改實(shí)現(xiàn)Encodable接口子類的decode方法參數(shù)為InputStream實(shí)例. (The 2G limit 4) ~~
- Modify TransportFrameDecoder class, use
LinkedList<ByteBuf>
to represent the Frame, remove the size limit of Frame. ~~修改TransportFrameDecoder類,使用LinkedList<ByteBuf>
來表示Frame,移除Frame的大小限制. ~~ (The 2G limit 4) - Add ByteBufInputStream class, used to convert
LinkedList<ByteBuf>
instance to InputStream instance.添加ByteBufInputStream類,用于把LinkedList<ByteBuf>包裝成InputStream實(shí)例.在讀取完一個(gè)ByteBuf的數(shù)據(jù)時(shí)就會(huì)調(diào)用ByteBuf.release
方法釋放ByteBuf. (The 2G limit 4) - Modify the parameters of
RpcHandler.receive
method to InputStream instance.修改(The 2G limit 4)RpcHandler.receive
方法的參數(shù)為InputStream實(shí)例.
-
6
,7
,8
and9
are combined to resolve the 2G limit in RPC message receiving and decoding process.6.
7.
8.
和9.
組合起來解決RPC消息接收和解碼的過程中的2G限制
Read data
Local data
- Only the data stored in the memory is represented by ChunkedByteBuffer, the other is represented by ManagedBuffer.
只有存儲(chǔ)在內(nèi)存中的數(shù)據(jù)用 ChunkedByteBuffer 表示,其他的數(shù)據(jù)都使用 ManagedBuffer 表示.(The 2G limit 1)
- Modify
DiskStore.getBytes
's return value type to ManagedBuffer instance, which callsManagedBuffer.nioByteBuffer
only when the memory has enough space to store the ManagedBuffer data.修改DiskStore.getBytes
的返回值為ManagedBuffer實(shí)例, 只有在內(nèi)存有足夠的空間儲(chǔ)存ManagedBuffer數(shù)據(jù)時(shí)才會(huì)調(diào)用ManagedBuffer.nioByteBuffer
方法.
Remote Data (The 2G limit 4)
There are three options: 有三個(gè)可選方案:
- Add InputStreamInterceptor to support propagate back-pressure to shuffle server(The option has been implemented):
添加InputStreamInterceptor支持propagate back-pressure 到 shuffle server端(該方案已經(jīng)實(shí)現(xiàn)):
- When the number of ByteBuf in the cache exceeds a certain amount, call
channel.config ().SetAutoRead (false)
disable AUTO_READ, no longer automatically callchannle.read ()
. ~~在緩存的 ByteBuf 數(shù)量超過一定數(shù)量時(shí)調(diào)用channel.config().setAutoRead(false)
禁用AUTO_READ, 不再自動(dòng)調(diào)用channle.read()
. ~~ - When the number of ByteBuf in the cache is smaller than a certain amount, call
channel.config().setAutoRead(true)
enable AUTO_READ . ~~在緩存的 ByteBuf 數(shù)量小于一定數(shù)量時(shí)調(diào)用channel.config().setAutoRead(true)
激活A(yù)UTO_READ. ~~ - The advantage of this option is to support propagate back-pressure; drawback is that can lead semantic change the existing API, in some cases the IO retry function is invalid.
該方案的優(yōu)點(diǎn)是支持propagate back-pressure; 缺點(diǎn)是會(huì)導(dǎo)致現(xiàn)有API的語義改變, 某些情況下導(dǎo)致錯(cuò)誤重試功能失效. - 參考文檔:
- Netty的read事件與AUTO_READ模式
- TCP/IP詳解--舉例明白發(fā)送/接收緩沖區(qū)戒悠、滑動(dòng)窗口協(xié)議之間的關(guān)系
- TCP 滑動(dòng)窗口協(xié)議 詳解 - InputStreamInterceptor設(shè)計(jì)方案:
- 創(chuàng)建一固定大小線程安全緩存池
- netty線程接收到ByteBuf放到緩存池, 如果緩存的ByteBuf超過緩存容量的90%時(shí),調(diào)用
channel.config().setAutoRead(false)
, 不在自動(dòng)接收數(shù)據(jù). 對(duì)端寫入堵塞. - 數(shù)據(jù)處理線程從緩沖池中取出ByteBuf, 如果緩存的ByteBuf數(shù)量少于緩存池容量的10%,調(diào)用
channel.config().setAutoRead(true)
, 激活數(shù)據(jù)自動(dòng)讀取. - 如果處理完一個(gè)ByteBuf,釋放該ByteBuf, 并調(diào)用
channle.read()
接收數(shù)據(jù).
- When the size of message is greater than a certain value, the message is written to disk, not take up memory. ~~在消息大小大于一定值時(shí),把消息寫到硬盤上,不再占用內(nèi)存. ~~
- The advantage of this options is to take up very little memory, the disadvantage is to increase the disk IO.
該方案的優(yōu)點(diǎn)是占用很少的內(nèi)存,缺點(diǎn)是增加磁盤IO.
- Combined with buffer pool, qs far as possible stores data in memory. ~~結(jié)合緩存池,盡可能的把數(shù)據(jù)存儲(chǔ)在內(nèi)存里. ~~
- Write message to the buffer pool when there has enough memory, otherwise write on disk. ~~把消息寫到緩存池, 在緩存池中有足夠的內(nèi)存時(shí),內(nèi)存不足時(shí)才寫到硬盤上. ~~
Add buffer pool
The buffer pool can reduce memory allocation, reduce GC time, improve the performance of spark core. 緩存池能夠減少內(nèi)存分配占用, 減少GC時(shí)間,提升程序性能
- Reduce the number of large objects created in the Eden area, according to experience twitter using buffer pools can significantly reduce the number of GC.
減少在eden區(qū)創(chuàng)建大對(duì)象的次數(shù),根據(jù)twitter的經(jīng)驗(yàn),使用緩存池能顯著減少GC次數(shù).
Netty 4 Reduces GC Overhead by 5x at Twitter - Use buffer pool to reduce the number of memory allocations and wiping zero.
使用緩存池能夠減少內(nèi)存分配和抹零次數(shù).
Using as a generic library
實(shí)現(xiàn)該功能的難點(diǎn)有:
- Spark在使用ByteBuffer時(shí)沒有考慮釋放問題, 由java GC回收.
- 添加引用計(jì)數(shù)主動(dòng)釋放, 減少GC壓力, 需要添加引用計(jì)數(shù)和內(nèi)存泄露檢測(cè)相關(guān)代碼, 改動(dòng)大.
- 復(fù)用netty buffer代碼,支持內(nèi)存泄露檢查和動(dòng)態(tài)調(diào)整大小.