-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance[MQB,BMQ]: return constructed blobs as pointers #471
base: main
Are you sure you want to change the base?
Conversation
84e6047
to
8bb562c
Compare
a72440a
to
d7ba3ef
Compare
d7ba3ef
to
8f53a30
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 344 of commit 8f53a30 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is bsl::deque<bdlbb::Blob> d_channelBufferQueue;
in ClientSession
. That is not in a performance-critical path, but we could change that as well (does not have to be in the same PR)
src/groups/bmq/bmqa/bmqa_session.h
Outdated
@@ -679,6 +680,13 @@ class Session : public AbstractSession { | |||
public: | |||
// TYPES | |||
|
|||
/// Pool of shared pointers to Blobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not typedef bmqp::BlobPoolUtil::BlobSpPool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't expose bmqp
package in bmqa
, since bmqa
is a client package. And we need to declare BlobSpPool
somewhere to be used in bmqa::MockSession
. I moved it directly to private types in bmqa::MockSession
d_blob_sp->buffer(0).data()); | ||
eh.setLength(d_blob_sp->length()); | ||
|
||
return *d_blob_sp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AckEventBuilder::blob()
can call AckEventBuilder::blob_sp()
to avoid duplicate code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got rid of blob_sp()
and we only have blob()
now
return *d_blob_sp; | ||
} | ||
|
||
bsl::shared_ptr<bdlbb::Blob> AckEventBuilder::blob_sp() const |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check, what is the guideline on calling accessor returning shared_ptr
? blob_sp
does not sound right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are basically 2 common things that could be done in a similar situation.
- Rename
shared_ptr<...> blob_sp()
->shared_ptr<...> blob()
and removeBlob& blob()
. It is okay sincebmqp
builders are not exposed to SDK clients. But there will be a difference withbmqa
builders that returnBlob& blob()
. We don't enforce interface compatibility betweenbmqp
andbmqa
builders, just usebmqp
ones as pointer with implementation. - Show the meaningful name of the operation that return shared pointer, like
shared_ptr<> bookClientInfo()
. This is not applicable to our use-case.
So I will go on with the 1st way.
UPD: I also revisited the idea of providing a copy of shared ptr and decided to provide a reference to the internal pointer, it's the user's responsibility to make a copy if needed
// skip writing the length until the blob | ||
// is retrieved. | ||
/// Blob pool to use. Held, not owned. | ||
BlobSpPool* d_blobSpPool_p; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess, this does not introduce more dependency than the previous use of bufferFactory
.
@@ -114,7 +114,7 @@ AdminSessionState::AdminSessionState(BlobSpPool* blobSpPool, | |||
, d_dispatcherClientData() | |||
, d_bufferFactory_p(bufferFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we do not need d_bufferFactory_p
@@ -306,9 +306,9 @@ ClientSessionState::ClientSessionState( | |||
, d_statContext_mp(clientStatContext) | |||
, d_bufferFactory_p(bufferFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does not look like we can get rid of d_bufferFactory_p
until we retire old code converting MessageProperties
(which we should retire).
Could you leave a comment please, reminding us to remove it?
bmqp::SchemaEventBuilder builder(d_bufferFactory_p, | ||
&localAllocator, | ||
encodingType); | ||
bmqp::SchemaEventBuilder builder(d_blobSpPool_p, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about removing SessionNegotiator::d_bufferFactory_p
@@ -1178,11 +1184,13 @@ IncoreClusterStateLedger::IncoreClusterStateLedger( | |||
ClusterData* clusterData, | |||
ClusterState* clusterState, | |||
bdlbb::BlobBufferFactory* bufferFactory, | |||
BlobSpPool* blobSpPool_p, | |||
bslma::Allocator* allocator) | |||
: d_allocator_p(allocator) | |||
, d_isFirstLeaderAdvisory(true) | |||
, d_isOpen(false) | |||
, d_bufferFactory_p(bufferFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need IncoreClusterStateLedger:;d_bufferFactory_p
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible to remove, removed
dsCfg, | ||
recoveryManagerAllocator), | ||
d_recoveryManager_mp.load(new (*recoveryManagerAllocator) | ||
RecoveryManager(d_blobSpPool_p, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we have two d_blobSpPool_p
: one in StorageManager
and another in ClusterData::d_resources
. Probably, an oversight in one of the previous commits, maybe we can fix it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed d_blobSpPool_p
field
In the future PR we need to revisit it if we have per-thread resources. We might need to remove it from ClusterData instead and cache it as a field instead
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
8f53a30
to
d330e16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's take a look at bmqp::Event::d_clonedBlob_sp
What do you think about possibility of removing clone
functionality in bmqp::Event
?
Perhaps, In another PR
@@ -1010,6 +1027,9 @@ class MockSession : public AbstractSession { | |||
/// Buffer factory | |||
bdlbb::PooledBlobBufferFactory d_blobBufferFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove d_blobBufferFactory
now?
@@ -259,7 +264,9 @@ class Application { | |||
/// instance. | |||
bdlbb::BlobBufferFactory* bufferFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove Application::bufferFactory()
now?
bslma::Allocator* allocator) | ||
: d_blobSpPool_p(blobSpPool_p) | ||
, d_blob_sp(0, allocator) // initialized in `reset()` | ||
, d_emptyBlob_sp(blobSpPool_p->getObject()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can SEGFAULT before the BSLS_ASSERT_SAFE(blobSpPool_p)
{ | ||
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { | ||
BSLS_PERFORMANCEHINT_UNLIKELY_HINT; | ||
return ProtocolUtil::emptyBlob(); // RETURN | ||
return d_emptyBlob_sp; // RETURN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This d_emptyBlob_sp
looks strange. Seems like in no case d_blob_sp == false
, so why not return it?
bslma::Allocator* allocator) | ||
: d_blobSpPool_p(blobSpPool_p) | ||
, d_blob_sp(0, allocator) // initialized in `reset()` | ||
, d_emptyBlob_sp(blobSpPool_p->getObject()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about d_emptyBlob_sp
. Do we really need it if d_blob_sp == true
?
bslma::Allocator* allocator) | ||
: d_blobSpPool_p(blobSpPool_p) | ||
, d_blob_sp(0, allocator) // initialized in `reset()` | ||
, d_emptyBlob_sp(blobSpPool_p->getObject()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about d_emptyBlob_sp
: d_allocator_p(allocator) | ||
, d_blob(bufferFactory, allocator) | ||
PushEventBuilder::PushEventBuilder(BlobSpPool* blobSpPool_p, | ||
bslma::Allocator* allocator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about d_emptyBlob_sp
bslma::Allocator* allocator) | ||
: d_blobSpPool_p(blobSpPool_p) | ||
, d_blob_sp(0, allocator) // initialized in `reset()` | ||
, d_emptyBlob_sp(blobSpPool_p->getObject()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about d_emptyBlob_sp
, d_eventType(eventType) | ||
, d_blob(bufferFactory, allocator) | ||
, d_blob_sp(0, allocator) // initialized in `reset()` | ||
, d_emptyBlob_sp(blobSpPool_p->getObject()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about d_emptyBlob_sp
// it's not safe to modify or replace the blob under this pointer. | ||
// Instead, we get another shared pointer to another blob. | ||
nodeContext->d_blob_sp = d_blobSpPool_p->getObject(); | ||
bmqp::ProtocolUtil::buildReceipt(nodeContext->d_blob_sp.get(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bmqp::ProtocolUtil::buildReceipt
can now assert fresh blob (no need for removeAll
)
Overview
In some code paths, we need to write similar Blobs of data to multiple cluster nodes at once (replication for example). Previously, we copied these Blobs for every
mqbnet::Channel
holding a single connection to another cluster node. Blob class has a vector of BlobBuffer objects, and when we copy a Blob, we reallocate a new vector of BlobBuffers, using the provided allocator. We spend a lot of time on this, and the situation becomes even worse if where are more nodes in the cluster or the throughput is high.Note that even if we write data even to one node, we still made a copy of a Blob when we pass it to
mqbnet::Channel
.The main change in this PR is that we store shared pointers to the same Blob when we enqueue Items to
mqbnet::Channel
to write. By doing this, we avoid making a vector copy for every Blob and do not allocate without a need.However, to make this change possible, we need to provide shared pointers to Blobs from the very level where these Blobs are constructed - in event builders.
Another change that affects performance is using Blob shared pointer pools for event builders. By reusing the already built Blobs from these pools, we make sure that vectors containing BlobBuffers have probably enough capacity to contain all the needed BlobBuffers for write after some warmup.
Changes
bmqp::BlobPoolUtil
that definesBlobSpPool
type and also provides an utility functioncreatePool
that simplifiesBlobSpPool
construction.BlobSpPool
as an argument, store the built Blob as a shared pointer, useBlobSpPool
to get new Blob shared pointers onreset
, provide Blob shared pointer copy via newblob_sp()
accessor.BlobSpPool
as an argument where needed.mqbnet::Channel
: change the class API so now it only accepts shared pointers to Blobs to enqueue, add independentBlobSpPool
per eachmqbnet::Channel
to use in its builders (do not want to share the globalBlobSpPool
between too many threads), remove legacy flavours how we store Items in Channel (before, we had Blobs by value, by weak pointer, by shared pointer - now we only store them by shared pointer).mqbnet::Channel
unit test: removed test for weak pointer Blob passed to Channel. This feature is not used in the code at all and so it's removed.bmqp::SchemaEventBuilder
: reorder arguments to met allocator usage guidelines, the allocator arg now is the last one as expected, had to explicitly add the default encoding type where it's needed. Also, got rid of the tmpMemOutStream
and cache it as a field to reduce allocations:bmqu::MemOutStream d_errorStream
.Stress test
Priority queue, 100k msgs/s, 1 consumer, 1 producer, 1 queue in strong consistency domain, 6 nodes cluster (3 datacenters) with 2 client proxies.
The graph shows the current number of unconfirmed messages currently stored in the queue over a 10 minute run. Blue line is the current PR's revision (near to 0 pending messages on every moment), orange line shows the commit just before this PR.
This PR behaves much better on 100k msgs/s than the previous revision.
Profiler
On the same stress scenario, there were 3.6% of samples across all threads taken within Blob constructor calls. With the new change, these 3.6% are freed. Since these 3.6% were within queue dispatcher thread, this thread will be less busy and be able to process higher throughput.
Before:
After:
Allocator stats
This PR reduces the number of unnecessary allocations by many millions (see
Channel
rows in before and after). Since we use counting allocator with tree-structure to report updates down to the root allocator, we save CPU on these updates.The only remaining place within
TransportManager
with many allocations isTCPSessionFactory
.Before:
After: