Skip to content

Commit

Permalink
Update frame size in codec to avoid unnecessary frame size errors
Browse files Browse the repository at this point in the history
  • Loading branch information
thedodd committed Jun 7, 2024
1 parent 535c454 commit 91016e3
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
8 changes: 4 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ impl Admin {
let cluster = self._client.get_cluster_metadata_cache().await?;
let (tx, rx) = oneshot::channel();

return if let Some(leader) = &cluster.controller {
if let Some(leader) = &cluster.controller {
let uid = uuid::Uuid::new_v4();
leader.conn.create_topics(uid, request, tx).await;
unpack_broker_response(rx).await.and_then(|(_, res)| {
Expand All @@ -529,7 +529,7 @@ impl Admin {
})
} else {
Err(ClientError::NoControllerFound)
};
}
}

/// Delete topics from the Kafka Cluster.
Expand All @@ -540,7 +540,7 @@ impl Admin {
let cluster = self._client.get_cluster_metadata_cache().await?;
let (tx, rx) = oneshot::channel();

return if let Some(leader) = &cluster.controller {
if let Some(leader) = &cluster.controller {
let uid = uuid::Uuid::new_v4();
leader.conn.delete_topics(uid, request, tx).await;
unpack_broker_response(rx).await.and_then(|(_, res)| {
Expand All @@ -552,6 +552,6 @@ impl Admin {
})
} else {
Err(ClientError::NoControllerFound)
};
}
}
}
5 changes: 4 additions & 1 deletion src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use tokio::net::TcpStream;
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite, LengthDelimitedCodec};

/// The default max size for API messages sent to Kafka.
const DEFAULT_MAX_SIZE: usize = 1024usize.pow(2) * 16; // 16MiB.
///
/// This is set to 32Mi to allow for some overhead when the broker returns a payload slight larger
/// than the requested max, due to large batches and the like.
const DEFAULT_MAX_SIZE: usize = 1024usize.pow(2) * 32;

pub(crate) type KafkaReader = FramedRead<OwnedReadHalf, KafkaCodecReader>;
pub(crate) type KafkaWriter = FramedWrite<OwnedWriteHalf, KafkaCodecWriter>;
Expand Down

0 comments on commit 91016e3

Please sign in to comment.