Skip to content

Commit

Permalink
Add KeepAliveMode and SupportedWebSocketSubProtocols options (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shane32 authored Oct 27, 2024
1 parent 60a8415 commit d8d46a5
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 20 deletions.
57 changes: 56 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,12 @@ endpoint.
| Property | Description | Default value |
|-----------------------------|----------------------|---------------|
| `ConnectionInitWaitTimeout` | The amount of time to wait for a GraphQL initialization packet before the connection is closed. | 10 seconds |
| `KeepAliveTimeout` | The amount of time to wait between sending keep-alive packets. | 30 seconds |
| `DisconnectionTimeout` | The amount of time to wait to attempt a graceful teardown of the WebSockets protocol. | 10 seconds |
| `DisconnectAfterErrorEvent` | Disconnects a subscription from the client if the subscription source dispatches an `OnError` event. | True |
| `DisconnectAfterAnyError` | Disconnects a subscription from the client there are any GraphQL errors during a subscription. | False |
| `KeepAliveMode` | The mode to use for sending keep-alive packets. | protocol-dependent |
| `KeepAliveTimeout` | The amount of time to wait between sending keep-alive packets. | disabled |
| `SupportedWebSocketSubProtocols` | A list of supported WebSocket sub-protocols. | `graphql-ws`, `graphql-transport-ws` |

### Multi-schema configuration

Expand Down Expand Up @@ -699,6 +701,59 @@ public class MySchema : Schema
}
```

### Keep-alive configuration

By default, the middleware will not send keep-alive packets to the client. As the underlying
operating system may not detect a disconnected client until a message is sent, you may wish to
enable keep-alive packets to be sent periodically. The default mode for keep-alive packets
differs depending on whether the client connected with the `graphql-ws` or `graphql-transport-ws`
sub-protocol. The `graphql-ws` sub-protocol will send a unidirectional keep-alive packet to the
client on a fixed schedule, while the `graphql-transport-ws` sub-protocol will only send
unidirectional keep-alive packets when the client has not sent a message within a certain time.
The differing behavior is due to the default implementation of the `graphql-ws` sub-protocol
client, which after receiving a single keep-alive packet, expects additional keep-alive packets
to be sent sooner than every 20 seconds, regardless of the client's activity.

To configure keep-alive packets, set the `KeepAliveMode` and `KeepAliveTimeout` properties
within the `GraphQLWebSocketOptions` object. Set the `KeepAliveTimeout` property to
enable keep-alive packets, or use `TimeSpan.Zero` or `Timeout.InfiniteTimeSpan` to disable it.

The `KeepAliveMode` property is only applicable to the `graphql-transport-ws` sub-protocol and
can be set to the options listed below:

| Keep-alive mode | Description |
|-----------------|-------------|
| `Default` | Same as `Timeout`. |
| `Timeout` | Sends a unidirectional keep-alive message when no message has been received within the specified timeout period. |
| `Interval` | Sends a unidirectional keep-alive message at a fixed interval, regardless of message activity. |
| `TimeoutWithPayload` | Sends a bidirectional keep-alive message with a payload on a fixed interval, and validates the payload matches in the response. |

The `TimeoutWithPayload` model is particularly useful when the server may send messages to the
client at a faster pace than the client can process them. In this case queued messages will be
limited to double the timeout period, as the keep-alive message is queued along with other
packets sent from the server to the client. The client will need to respond to process queued
messages and respond to the keep-alive message within the timeout period or the server will
disconnect the client. When the server forcibly disconnects the client, no graceful teardown
of the WebSocket protocol occurs, and any queued messages are discarded.

When using the `TimeoutWithPayload` keep-alive mode, you may wish to enforce that the
`graphql-transport-ws` sub-protocol is in use by the client, as the `graphql-ws` sub-protocol
does not support bidirectional keep-alive packets. This can be done by setting the
`SupportedWebSocketSubProtocols` property to only include the `graphql-transport-ws` sub-protocol.

```csharp
app.UseGraphQL("/graphql", options =>
{
// configure keep-alive packets
options.WebSockets.KeepAliveTimeout = TimeSpan.FromSeconds(10);
options.WebSockets.KeepAliveMode = KeepAliveMode.TimeoutWithPayload;
// set the supported sub-protocols to only include the graphql-transport-ws sub-protocol
options.WebSockets.SupportedWebSocketSubProtocols = [GraphQLWs.SubscriptionServer.SubProtocol];
});
```

Please note that the included UI packages are configured to use the `graphql-ws` sub-protocol.

### Customizing middleware behavior

GET/POST requests are handled directly by the `GraphQLHttpMiddleware`.
Expand Down
2 changes: 1 addition & 1 deletion src/GraphQL.AspNetCore3/GraphQLHttpMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ protected virtual Task WriteJsonResponseAsync<TResult>(HttpContext context, Http
/// <summary>
/// Gets a list of WebSocket sub-protocols supported.
/// </summary>
protected virtual IEnumerable<string> SupportedWebSocketSubProtocols => _supportedSubProtocols;
protected virtual IEnumerable<string> SupportedWebSocketSubProtocols => _options.WebSockets.SupportedWebSocketSubProtocols;

/// <summary>
/// Creates an <see cref="IWebSocketConnection"/>, a WebSocket message pump.
Expand Down
70 changes: 64 additions & 6 deletions src/GraphQL.AspNetCore3/WebSockets/BaseSubscriptionServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,32 @@ protected virtual Task OnNotAuthorizedPolicyAsync(OperationMessage message, Auth
/// <br/><br/>
/// Otherwise, the connection is acknowledged via <see cref="OnConnectionAcknowledgeAsync(OperationMessage)"/>,
/// <see cref="TryInitialize"/> is called to indicate that this WebSocket connection is ready to accept requests,
/// and keep-alive messages are sent via <see cref="OnSendKeepAliveAsync"/> if configured to do so.
/// Keep-alive messages are only sent if no messages have been sent over the WebSockets connection for the
/// length of time configured in <see cref="GraphQLWebSocketOptions.KeepAliveTimeout"/>.
/// and <see cref="OnSendKeepAliveAsync"/> is called to start sending keep-alive messages if configured to do so.
/// </summary>
protected virtual async Task OnConnectionInitAsync(OperationMessage message)
{
if (!await AuthorizeAsync(message)) {
return;
}
await OnConnectionAcknowledgeAsync(message);
if (!TryInitialize())
return;

_ = OnKeepAliveLoopAsync();
}

/// <summary>
/// Executes when the client is attempting to initialize the connection.
/// <br/><br/>
/// By default, this first checks <see cref="AuthorizeAsync(OperationMessage)"/> to validate that the
/// request has passed authentication. If validation fails, the connection is closed with an Access
/// Denied message.
/// <br/><br/>
/// Otherwise, the connection is acknowledged via <see cref="OnConnectionAcknowledgeAsync(OperationMessage)"/>,
/// <see cref="TryInitialize"/> is called to indicate that this WebSocket connection is ready to accept requests,
/// and <see cref="OnSendKeepAliveAsync"/> is called to start sending keep-alive messages if configured to do so.
/// </summary>
[Obsolete($"Please use the {nameof(OnConnectionInitAsync)}(message) and {nameof(OnKeepAliveLoopAsync)} methods instead. This method will be removed in a future version of this library.")]
protected virtual async Task OnConnectionInitAsync(OperationMessage message, bool smartKeepAlive)
{
if (!await AuthorizeAsync(message)) {
Expand All @@ -272,12 +294,48 @@ protected virtual async Task OnConnectionInitAsync(OperationMessage message, boo
var keepAliveTimeout = _options.KeepAliveTimeout ?? DefaultKeepAliveTimeout;
if (keepAliveTimeout > TimeSpan.Zero) {
if (smartKeepAlive)
_ = StartSmartKeepAliveLoopAsync();
_ = OnKeepAliveLoopAsync(keepAliveTimeout, KeepAliveMode.Timeout);
else
_ = StartKeepAliveLoopAsync();
_ = OnKeepAliveLoopAsync(keepAliveTimeout, KeepAliveMode.Interval);
}
}

/// <summary>
/// Starts sending keep-alive messages if configured to do so. Inspects the configured
/// <see cref="GraphQLWebSocketOptions"/> and passes control to <see cref="OnKeepAliveLoopAsync(TimeSpan, KeepAliveMode)"/>
/// if keep-alive messages are enabled.
/// </summary>
protected virtual Task OnKeepAliveLoopAsync()
{
return OnKeepAliveLoopAsync(
_options.KeepAliveTimeout ?? DefaultKeepAliveTimeout,
_options.KeepAliveMode);
}

/// <summary>
/// Sends keep-alive messages according to the specified timeout period and method.
/// See <see cref="KeepAliveMode"/> for implementation details for each supported mode.
/// </summary>
protected virtual async Task OnKeepAliveLoopAsync(TimeSpan keepAliveTimeout, KeepAliveMode keepAliveMode)
{
if (keepAliveTimeout <= TimeSpan.Zero)
return;

switch (keepAliveMode) {
case KeepAliveMode.Default:
case KeepAliveMode.Timeout:
await StartSmartKeepAliveLoopAsync();
break;
case KeepAliveMode.Interval:
await StartDumbKeepAliveLoopAsync();
break;
case KeepAliveMode.TimeoutWithPayload:
throw new NotImplementedException($"{nameof(KeepAliveMode.TimeoutWithPayload)} is not implemented within the {nameof(BaseSubscriptionServer)} class.");
default:
throw new ArgumentOutOfRangeException(nameof(keepAliveMode));
}

async Task StartKeepAliveLoopAsync()
async Task StartDumbKeepAliveLoopAsync()
{
while (!CancellationToken.IsCancellationRequested) {
await Task.Delay(keepAliveTimeout, CancellationToken);
Expand Down
19 changes: 19 additions & 0 deletions src/GraphQL.AspNetCore3/WebSockets/GraphQLWebSocketOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public class GraphQLWebSocketOptions
/// </summary>
public TimeSpan? KeepAliveTimeout { get; set; }

/// <summary>
/// Gets or sets the keep-alive mode used for websocket subscriptions.
/// This property is only applicable when using the GraphQLWs protocol.
/// </summary>
public KeepAliveMode KeepAliveMode { get; set; } = KeepAliveMode.Default;

/// <summary>
/// The amount of time to wait to attempt a graceful teardown of the WebSockets protocol.
/// The default is 10 seconds.
Expand All @@ -38,4 +44,17 @@ public class GraphQLWebSocketOptions
/// Disconnects a subscription from the client there are any GraphQL errors during a subscription.
/// </summary>
public bool DisconnectAfterAnyError { get; set; }

/// <summary>
/// The list of supported WebSocket sub-protocols.
/// Defaults to <see cref="GraphQLWs.SubscriptionServer.SubProtocol"/> and <see cref="SubscriptionsTransportWs.SubscriptionServer.SubProtocol"/>.
/// Adding other sub-protocols require the <see cref="GraphQLHttpMiddleware.CreateMessageProcessor(IWebSocketConnection, string)"/> method
/// to be overridden to handle the new sub-protocol.
/// </summary>
/// <remarks>
/// When the <see cref="KeepAliveMode"/> is set to <see cref="KeepAliveMode.TimeoutWithPayload"/>, you may wish to remove
/// <see cref="SubscriptionsTransportWs.SubscriptionServer.SubProtocol"/> from this list to prevent clients from using
/// protocols which do not support the <see cref="KeepAliveMode.TimeoutWithPayload"/> keep-alive mode.
/// </remarks>
public List<string> SupportedWebSocketSubProtocols { get; set; } = [GraphQLWs.SubscriptionServer.SubProtocol, SubscriptionsTransportWs.SubscriptionServer.SubProtocol];
}
12 changes: 12 additions & 0 deletions src/GraphQL.AspNetCore3/WebSockets/GraphQLWs/PingPayload.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace GraphQL.AspNetCore3.WebSockets.GraphQLWs;

/// <summary>
/// The payload of the ping message.
/// </summary>
public class PingPayload
{
/// <summary>
/// The unique identifier of the ping message.
/// </summary>
public string? id { get; set; }
}
97 changes: 95 additions & 2 deletions src/GraphQL.AspNetCore3/WebSockets/GraphQLWs/SubscriptionServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ namespace GraphQL.AspNetCore3.WebSockets.GraphQLWs;
public class SubscriptionServer : BaseSubscriptionServer
{
private readonly IWebSocketAuthenticationService? _authenticationService;
private readonly IGraphQLSerializer _serializer;
private readonly GraphQLWebSocketOptions _options;
private DateTime _lastPongReceivedUtc;
private string? _lastPingId;
private readonly object _lastPingLock = new();

/// <summary>
/// The WebSocket sub-protocol used for this protocol.
Expand Down Expand Up @@ -69,6 +74,8 @@ public SubscriptionServer(
UserContextBuilder = userContextBuilder ?? throw new ArgumentNullException(nameof(userContextBuilder));
Serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_authenticationService = authenticationService;
_serializer = serializer;
_options = options;
}

/// <inheritdoc/>
Expand All @@ -84,7 +91,9 @@ public override async Task OnMessageReceivedAsync(OperationMessage message)
if (Initialized) {
await ErrorTooManyInitializationRequestsAsync(message);
} else {
#pragma warning disable CS0618 // Type or member is obsolete
await OnConnectionInitAsync(message, true);
#pragma warning restore CS0618 // Type or member is obsolete
}
return;
}
Expand All @@ -105,6 +114,64 @@ public override async Task OnMessageReceivedAsync(OperationMessage message)
}
}

/// <inheritdoc/>
[Obsolete($"Please use the {nameof(OnConnectionInitAsync)} and {nameof(OnKeepAliveLoopAsync)} methods instead. This method will be removed in a future version of this library.")]
protected override Task OnConnectionInitAsync(OperationMessage message, bool smartKeepAlive)
{
if (smartKeepAlive)
return OnConnectionInitAsync(message);
else
return base.OnConnectionInitAsync(message, smartKeepAlive);
}

/// <inheritdoc/>
protected override Task OnKeepAliveLoopAsync(TimeSpan keepAliveTimeout, KeepAliveMode keepAliveMode)
{
if (keepAliveMode == KeepAliveMode.TimeoutWithPayload) {
if (keepAliveTimeout <= TimeSpan.Zero)
return Task.CompletedTask;
return SecureKeepAliveLoopAsync(keepAliveTimeout, keepAliveTimeout);
}
return base.OnKeepAliveLoopAsync(keepAliveTimeout, keepAliveMode);

// pingInterval is the time since the last pong was received before sending a new ping
// pongInterval is the time to wait for a pong after a ping was sent before forcibly closing the connection
async Task SecureKeepAliveLoopAsync(TimeSpan pingInterval, TimeSpan pongInterval)
{
lock (_lastPingLock)
_lastPongReceivedUtc = DateTime.UtcNow;
while (!CancellationToken.IsCancellationRequested) {
// Wait for the next ping interval
TimeSpan interval;
var now = DateTime.UtcNow;
DateTime lastPongReceivedUtc;
lock (_lastPingLock) {
lastPongReceivedUtc = _lastPongReceivedUtc;
}
var nextPing = lastPongReceivedUtc.Add(pingInterval);
interval = nextPing.Subtract(now);
if (interval > TimeSpan.Zero) // could easily be zero or less, if pongInterval is equal or greater than pingInterval
await Task.Delay(interval, CancellationToken);

// Send a new ping message
await OnSendKeepAliveAsync();

// Wait for the pong response
await Task.Delay(pongInterval, CancellationToken);
bool abort;
lock (_lastPingLock) {
abort = _lastPongReceivedUtc == lastPongReceivedUtc;
}
if (abort) {
// Forcibly close the connection if the client has not responded to the keep-alive message.
// Do not send a close message to the client or wait for a response.
Connection.HttpContext.Abort();
return;
}
}
}
}

/// <summary>
/// Pong is a required response to a ping, and also a unidirectional keep-alive packet,
/// whereas ping is a bidirectional keep-alive packet.
Expand All @@ -123,11 +190,37 @@ protected virtual Task OnPingAsync(OperationMessage message)
/// Executes when a pong message is received.
/// </summary>
protected virtual Task OnPongAsync(OperationMessage message)
=> Task.CompletedTask;
{
if (_options.KeepAliveMode == KeepAliveMode.TimeoutWithPayload) {
try {
var pingId = _serializer.ReadNode<PingPayload>(message.Payload)?.id;
lock (_lastPingLock) {
if (_lastPingId == pingId)
_lastPongReceivedUtc = DateTime.UtcNow;
}
} catch { } // ignore deserialization errors in case the pong message does not match the expected format
}
return Task.CompletedTask;
}

/// <inheritdoc/>
protected override Task OnSendKeepAliveAsync()
=> Connection.SendMessageAsync(_pongMessage);
{
if (_options.KeepAliveMode == KeepAliveMode.TimeoutWithPayload) {
var lastPingId = Guid.NewGuid().ToString("N");
lock (_lastPingLock) {
_lastPingId = lastPingId;
}
return Connection.SendMessageAsync(
new() {
Type = MessageType.Ping,
Payload = new PingPayload { id = lastPingId }
}
);
} else {
return Connection.SendMessageAsync(_pongMessage);
}
}

private static readonly OperationMessage _connectionAckMessage = new() { Type = MessageType.ConnectionAck };
/// <inheritdoc/>
Expand Down
36 changes: 36 additions & 0 deletions src/GraphQL.AspNetCore3/WebSockets/KeepAliveMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace GraphQL.AspNetCore3.WebSockets;

/// <summary>
/// Specifies the mode of keep-alive behavior.
/// </summary>
public enum KeepAliveMode
{
/// <summary>
/// Same as <see cref="Timeout"/>: Sends a unidirectional keep-alive message when no message has been received within the specified timeout period.
/// </summary>
Default = 0,

/// <summary>
/// Sends a unidirectional keep-alive message when no message has been received within the specified timeout period.
/// </summary>
Timeout = 1,

/// <summary>
/// Sends a unidirectional keep-alive message at a fixed interval, regardless of message activity.
/// </summary>
Interval = 2,

/// <summary>
/// Sends a Ping message with a payload after the specified timeout from the last received Pong,
/// and waits for a corresponding Pong response. Requires that the client reflects the payload
/// in the response. Forcibly disconnects the client if the client does not respond with a Pong
/// message within the specified timeout. This means that a dead connection will be closed after
/// a maximum of double the <see cref="GraphQLWebSocketOptions.KeepAliveTimeout"/> period.
/// </summary>
/// <remarks>
/// This mode is particularly useful when backpressure causes subscription messages to be delayed
/// due to a slow or unresponsive client connection. The server can detect that the client is not
/// processing messages in a timely manner and disconnect the client to free up resources.
/// </remarks>
TimeoutWithPayload = 3,
}
Loading

0 comments on commit d8d46a5

Please sign in to comment.