Skip to content

Commit

Permalink
Issue 556
Browse files Browse the repository at this point in the history
  • Loading branch information
shamblett committed Aug 6, 2024
1 parent 8ebb127 commit a62ad41
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ class MqttConnectionKeepAlive {
// Calculate latencies
lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime;
_cycleCount++;
averageCycleLatency =
(averageCycleLatency + lastCycleLatency) ~/ _cycleCount;
// Average latency calculation is
// new_avg = prev_avg + ((new_value − prev_avg) ~/ n + 1)
averageCycleLatency +=
(lastCycleLatency - averageCycleLatency) ~/ _cycleCount;

// Call the pong callback if not null
if (pongCallback != null) {
Expand Down
17 changes: 11 additions & 6 deletions test/mqtt_client_keep_alive_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void main() {
expect(ka.disconnectTimer, isNull);
});
test('Latency counts', () async {
final latencies = <int>[0, 0];
final latencies = <int>[0, 0, 0];
final clientEventBus = events.EventBus();
var disconnect = false;
void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) {
Expand All @@ -260,22 +260,27 @@ void main() {
verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1);
expect(ka.pingTimer?.isActive, isTrue);
expect(ka.disconnectTimer, isNull);
await MqttUtilities.asyncSleep(3);
await MqttUtilities.asyncSleep(4);
verify(() => ch.sendMessage(any())).called(1);
await MqttUtilities.asyncSleep(1);
final pingMessageRx = MqttPingResponseMessage();
ka.pingResponseReceived(pingMessageRx);
latencies[0] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
await MqttUtilities.asyncSleep(2);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
await MqttUtilities.asyncSleep(1);
ka.pingResponseReceived(pingMessageRx);
latencies[1] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency, (latencies[0] + latencies[1]) ~/ 2);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
ka.pingResponseReceived(pingMessageRx);
latencies[2] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency,
(latencies[0] + latencies[1] + latencies[2]) ~/ 3);
expect(disconnect, isFalse);
ka.stop();
expect(ka.averageCycleLatency, 0);
Expand Down

0 comments on commit a62ad41

Please sign in to comment.