Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow accessing send queue count from WebSocketConnection #78

Merged
merged 4 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/GraphQL.AspNetCore3/WebSockets/AsyncMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ internal class AsyncMessagePump<T>
private readonly Func<T, Task> _callback;
private readonly Queue<ValueTask<T>> _queue = new();

/// <summary>
/// Returns the number of messages in the queue.
/// This count includes any message currently being processed.
/// </summary>
public int Count
{
get {
lock (_queue) {
return _queue.Count;
}
}
}

/// <summary>
/// Initializes a new instances with the specified asynchronous callback delegate.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/GraphQL.AspNetCore3/WebSockets/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ public class WebSocketConnection : IWebSocketConnection
/// <inheritdoc/>
public HttpContext HttpContext { get; }

/// <summary>
/// Returns the number of packets waiting in the send queue, including
/// messages, keep-alive packets, and the close message.
/// This count includes any packet currently being processed.
/// </summary>
protected int SendQueueCount => _pump.Count;

/// <summary>
/// Initializes an instance with the specified parameters.
/// </summary>
Expand Down Expand Up @@ -167,7 +174,7 @@ public Task CloseAsync(int eventId, string? description)
}

/// <inheritdoc/>
public Task SendMessageAsync(OperationMessage message)
public virtual Task SendMessageAsync(OperationMessage message)
{
_pump.Post(new Message { OperationMessage = message });
return Task.CompletedTask;
Expand Down
3 changes: 2 additions & 1 deletion src/Tests.ApiApprovals/GraphQL.AspNetCore3.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,15 @@ namespace GraphQL.AspNetCore3.WebSockets
public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; }
public System.DateTime LastMessageSentAt { get; }
public System.Threading.CancellationToken RequestAborted { get; }
protected int SendQueueCount { get; }
public System.Threading.Tasks.Task CloseAsync() { }
public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { }
public virtual void Dispose() { }
public virtual System.Threading.Tasks.Task ExecuteAsync(GraphQL.AspNetCore3.WebSockets.IOperationMessageProcessor operationMessageProcessor) { }
protected virtual System.Threading.Tasks.Task OnCloseOutputAsync(System.Net.WebSockets.WebSocketCloseStatus closeStatus, string? closeDescription) { }
protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.AspNetCore3.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { }
protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
}
}
namespace GraphQL.AspNetCore3.WebSockets.GraphQLWs
Expand Down
2 changes: 2 additions & 0 deletions src/Tests/WebSockets/TestWebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public Task Do_OnCloseOutputAsync(WebSocketCloseStatus closeStatus, string? clos

public TimeSpan Get_DefaultDisconnectionTimeout
=> DefaultDisconnectionTimeout;

public int Get_SendQueueCount => base.SendQueueCount;
}
26 changes: 26 additions & 0 deletions src/Tests/WebSockets/WebSocketConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,18 +459,43 @@ public async Task CloseConnectionAsync_Specific()
public async Task SendMessageAsync()
{
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(Task.CompletedTask).Verifiable();
await _connection.SendMessageAsync(message);
_mockConnection.Verify();
}

[Fact]
public async Task MessageCountAsync()
{
var tc = new TaskCompletionSource<bool>();
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(tc.Task).Verifiable();
await _connection.SendMessageAsync(message);
_connection.Get_SendQueueCount.ShouldBe(1);
await _connection.SendMessageAsync(message);
_connection.Get_SendQueueCount.ShouldBe(2);
tc.SetResult(true);
for (int i = 0; i < 100; i++) {
if (_connection.Get_SendQueueCount != 0)
await Task.Delay(100);
else
break;
}
_connection.Get_SendQueueCount.ShouldBe(0);
_mockConnection.Verify();
}

[Fact]
public async Task LastMessageSentAt()
{
var oldTime = _connection.LastMessageSentAt;
await Task.Delay(100);
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(Task.CompletedTask).Verifiable();
await _connection.SendMessageAsync(message);
Expand All @@ -485,6 +510,7 @@ public async Task DoNotSendMessagesAfterOutputIsClosed()
{
// send a message
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().SetupGet<TimeSpan>("DefaultDisconnectionTimeout").CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(Task.CompletedTask).Verifiable();
Expand Down
Loading