Skip to content

Commit

Permalink
[eclipse-hono#3659] Fix Prometheus limit check to use read timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
harism committed Oct 9, 2024
1 parent 4a52110 commit 886b1c2
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

import org.eclipse.hono.client.registry.TenantClient;
Expand Down Expand Up @@ -113,32 +114,28 @@ public Future<Boolean> isConnectionLimitReached(final TenantObject tenant, final

final var value = connectionCountCache.get(new LimitedResourceKey(tenant.getTenantId(), tenantClient::get));

if (value.isDone()) {
try {
final var limitedResource = value.get();
TracingHelper.TAG_CACHE_HIT.set(span, Boolean.TRUE);
span.log(Map.of(
TenantConstants.FIELD_MAX_CONNECTIONS, Optional.ofNullable(limitedResource.getCurrentLimit())
.map(String::valueOf)
.orElse("N/A"),
"current-connections", limitedResource.getCurrentValue()));
final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit())
.map(limit -> limitedResource.getCurrentValue() >= limit)
.orElse(false);
result.complete(isExceeded);
} catch (InterruptedException | ExecutionException e) {
// this means that the query could not be run successfully
TracingHelper.logError(span, e);
// fall back to default value
result.complete(Boolean.FALSE);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
} else {
LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId());
span.log(EVENT_QUERY_STILL_RUNNING);
try {
final var limitedResource = value.get();
TracingHelper.TAG_CACHE_HIT.set(span, Boolean.TRUE);
span.log(Map.of(
TenantConstants.FIELD_MAX_CONNECTIONS, Optional.ofNullable(limitedResource.getCurrentLimit())
.map(String::valueOf)
.orElse("N/A"),
"current-connections", limitedResource.getCurrentValue()));
final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit())
.map(limit -> limitedResource.getCurrentValue() >= limit)
.orElse(false);
result.complete(isExceeded);
} catch (CancellationException | ExecutionException | InterruptedException e) {
// this means that the query could not be run successfully
TracingHelper.logError(span, e);
// fall back to default value
result.complete(Boolean.FALSE);
if (e instanceof InterruptedException) {
LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId());
span.log(EVENT_QUERY_STILL_RUNNING);
Thread.currentThread().interrupt();
}
}

return result.future()
Expand Down Expand Up @@ -201,32 +198,28 @@ public Future<Boolean> isMessageLimitReached(

final var value = dataVolumeCache.get(new LimitedResourceKey(tenant.getTenantId(), tenantClient::get));

if (value.isDone()) {
try {
final var limitedResource = value.get();
TracingHelper.TAG_CACHE_HIT.set(span, true);
span.log(Map.of(
"current period bytes limit", Optional.ofNullable(limitedResource.getCurrentLimit())
.map(String::valueOf)
.orElse("N/A"),
"current period bytes consumed", limitedResource.getCurrentValue()));
final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit())
.map(limit -> (limitedResource.getCurrentValue() + payloadSize) > limit)
.orElse(false);
result.complete(isExceeded);
} catch (InterruptedException | ExecutionException e) {
// this means that the query could not be run successfully
TracingHelper.logError(span, e);
// fall back to default value
result.complete(Boolean.FALSE);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
} else {
LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId());
span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING));
try {
final var limitedResource = value.get();
TracingHelper.TAG_CACHE_HIT.set(span, true);
span.log(Map.of(
"current period bytes limit", Optional.ofNullable(limitedResource.getCurrentLimit())
.map(String::valueOf)
.orElse("N/A"),
"current period bytes consumed", limitedResource.getCurrentValue()));
final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit())
.map(limit -> (limitedResource.getCurrentValue() + payloadSize) > limit)
.orElse(false);
result.complete(isExceeded);
} catch (CancellationException | ExecutionException | InterruptedException e) {
// this means that the query could not be run successfully
TracingHelper.logError(span, e);
// fall back to default value
result.complete(Boolean.FALSE);
if (e instanceof InterruptedException) {
LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId());
span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING));
Thread.currentThread().interrupt();
}
}
}
return result.future()
Expand All @@ -253,32 +246,28 @@ public Future<Boolean> isConnectionDurationLimitReached(
final var key = new LimitedResourceKey(tenant.getTenantId(), tenantClient::get);
final var value = connectionDurationCache.get(key);

if (value.isDone()) {
try {
final var limitedResource = value.get();
TracingHelper.TAG_CACHE_HIT.set(span, true);
span.log(Map.of(
"current period's connection duration limit", Optional.ofNullable(limitedResource.getCurrentLimit())
.map(String::valueOf)
.orElse("N/A"),
"current period's connection duration consumed", limitedResource.getCurrentValue()));
final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit())
.map(limit -> limitedResource.getCurrentValue().compareTo(limit) >= 0)
.orElse(false);
result.complete(isExceeded);
} catch (InterruptedException | ExecutionException e) {
// this means that the query could not be run successfully
TracingHelper.logError(span, e);
// fall back to default value
result.complete(Boolean.FALSE);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
} else {
LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId());
span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING));
try {
final var limitedResource = value.get();
TracingHelper.TAG_CACHE_HIT.set(span, true);
span.log(Map.of(
"current period's connection duration limit", Optional.ofNullable(limitedResource.getCurrentLimit())
.map(String::valueOf)
.orElse("N/A"),
"current period's connection duration consumed", limitedResource.getCurrentValue()));
final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit())
.map(limit -> limitedResource.getCurrentValue().compareTo(limit) >= 0)
.orElse(false);
result.complete(isExceeded);
} catch (CancellationException | ExecutionException | InterruptedException e) {
// this means that the query could not be run successfully
TracingHelper.logError(span, e);
// fall back to default value
result.complete(Boolean.FALSE);
if (e instanceof InterruptedException) {
LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId());
span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING));
Thread.currentThread().interrupt();
}
}
return result.future()
.onSuccess(b -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ public void testMessageLimitNotExceededIfNoResourceLimitsFound(final VertxTestCo
@Test
public void testMessageLimitFallsBackToDefaultValueIfQueryStillRunning(final VertxTestContext ctx) {

when(dataVolumeCache.get(any(LimitedResourceKey.class))).thenReturn(new CompletableFuture<LimitedResource<Long>>());
when(dataVolumeCache.get(any(LimitedResourceKey.class)))
.thenReturn(new CompletableFuture<LimitedResource<Long>>()
.orTimeout(2, TimeUnit.SECONDS));
final long incomingMessageSize = 20;
final TenantObject tenant = TenantObject.from(Constants.DEFAULT_TENANT);

Expand Down Expand Up @@ -388,23 +390,23 @@ public void testConnectionDurationLimitCheckSucceedsIfNoResourceLimitsFound(fina
private void givenCurrentConnections(final Long maxConnections, final Long currentConnections) {
when(connectionCountCache.get(any(LimitedResourceKey.class))).thenAnswer(i -> {
final var count = new CompletableFuture<LimitedResource<Long>>();
count.complete(new LimitedResource<>(maxConnections, currentConnections));
count.completeOnTimeout(new LimitedResource<>(maxConnections, currentConnections), 100, TimeUnit.MILLISECONDS);
return count;
});
}

private void givenDataVolumeUsageInBytes(final Long maxBytes, final Long consumedBytes) {
when(dataVolumeCache.get(any(LimitedResourceKey.class))).thenAnswer(i -> {
final var count = new CompletableFuture<LimitedResource<Long>>();
count.complete(new LimitedResource<>(maxBytes, consumedBytes.longValue()));
count.completeOnTimeout(new LimitedResource<>(maxBytes, consumedBytes.longValue()), 100, TimeUnit.MILLISECONDS);
return count;
});
}

private void givenDeviceConnectionDuration(final Duration maxDuration, final Duration consumedConnectionDuration) {
when(connectionDurationCache.get(any(LimitedResourceKey.class))).thenAnswer(i -> {
final var count = new CompletableFuture<LimitedResource<Duration>>();
count.complete(new LimitedResource<>(maxDuration, consumedConnectionDuration));
count.completeOnTimeout(new LimitedResource<>(maxDuration, consumedConnectionDuration), 100, TimeUnit.MILLISECONDS);
return count;
});
}
Expand Down

0 comments on commit 886b1c2

Please sign in to comment.