Skip to content

Commit

Permalink
module distributedlog-common/distributedlog-protocol: refactor ByteBu…
Browse files Browse the repository at this point in the history
…f release usage (#3693)

Co-authored-by: lushiji <[email protected]>
(cherry picked from commit a71c7a7)
  • Loading branch information
StevenLuMT authored and hangc0276 committed Dec 8, 2022
1 parent bf18fc5 commit 8e352d4
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import org.junit.Test;

Expand Down Expand Up @@ -70,9 +71,9 @@ private void testCompressionCodec(CompressionCodec codec) throws Exception {
decompressedBuf.readBytes(decompressedData);
assertArrayEquals("The decompressed bytes should be same as the original bytes",
data, decompressedData);
buf.release();
compressedBuf.release();
decompressedBuf.release();
ReferenceCountUtil.safeRelease(buf);
ReferenceCountUtil.safeRelease(compressedBuf);
ReferenceCountUtil.safeRelease(decompressedBuf);
}

private void testCompressionCodec2(CompressionCodec codec) throws Exception {
Expand All @@ -93,9 +94,9 @@ private void testCompressionCodec2(CompressionCodec codec) throws Exception {
byte[] decompressedData = new byte[decompressedBuf.readableBytes()];
decompressedBuf.slice().readBytes(decompressedData);

buffer.release();
compressedBuf.release();
decompressedBuf.release();
ReferenceCountUtil.safeRelease(buffer);
ReferenceCountUtil.safeRelease(compressedBuf);
ReferenceCountUtil.safeRelease(decompressedBuf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.distributedlog.LogRecordSet.VERSION;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.io.CompressionCodec;
Expand Down Expand Up @@ -80,10 +81,10 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
this.reader = codec.decompress(compressedBuf, decompressedDataLen);
} finally {
compressedBuf.release();
ReferenceCountUtil.safeRelease(compressedBuf);
}
if (numRecords == 0) {
this.reader.release();
ReferenceCountUtil.safeRelease(this.reader);
}
}

Expand All @@ -110,7 +111,7 @@ public LogRecordWithDLSN nextRecord() throws IOException {

// release the record set buffer when exhausting the reader
if (0 == numRecords) {
this.reader.release();
ReferenceCountUtil.safeRelease(this.reader);
}

return record;
Expand All @@ -120,7 +121,7 @@ public LogRecordWithDLSN nextRecord() throws IOException {
public void release() {
if (0 != numRecords) {
numRecords = 0;
reader.release();
ReferenceCountUtil.safeRelease(reader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ ByteBuf createBuffer() {
@Override
public synchronized void completeTransmit(long lssn, long entryId, long startSlotId) {
satisfyPromises(lssn, entryId, startSlotId);
buffer.release();
ReferenceCountUtil.release(recordSetBuffer);
ReferenceCountUtil.safeRelease(buffer);
ReferenceCountUtil.safeRelease(recordSetBuffer);
}

@Override
public synchronized void abortTransmit(Throwable reason) {
cancelPromises(reason);
buffer.release();
ReferenceCountUtil.release(recordSetBuffer);
ReferenceCountUtil.safeRelease(buffer);
ReferenceCountUtil.safeRelease(recordSetBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -225,7 +226,7 @@ public ByteBuf getPayloadBuf() {

void setPayloadBuf(ByteBuf payload, boolean copyData) {
if (null != this.payload) {
this.payload.release();
ReferenceCountUtil.safeRelease(this.payload);
}
if (copyData) {
this.payload = Unpooled.copiedBuffer(payload);
Expand Down

0 comments on commit 8e352d4

Please sign in to comment.