Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1569 | Changed default value of schema.repository.deleteSchemaPathSuffix to empty string. #1754

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
18 changes: 9 additions & 9 deletions docs/docs/configuration/schema-repository.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ Option | Description

### Management

Option | Description | Default value
---------------------------------------- | ------------------------------------------------------------------ | -------------
schema.repository.serverUrl | URL of a repository | `http://localhost:8888/`
schema.repository.validationEnabled | Allows to use validation API in schema repository | `false`
schema.repository.connectionTimeoutMillis| Connection timeout used in http client (specified in milliseconds) | 1000
schema.repository.socketTimeoutMillis | Read socket timeout used in http client (specified in milliseconds)| 3000
schema.repository.deleteSchemaPathSuffix | A suffix of the URL to delete all schema versions: `/subjects/{subject}/{deleteSchemaPathSuffix}| `versions`
schema.repository.subjectSuffixEnabled | Add `-value` suffix to every subject name | `false`
schema.repository.subjectNamespaceEnabled| Add `kafka.defaultNamespace` property value as a prefix to every subject name | `false`
Option | Description | Default value
---------------------------------------- |----------------------------------------------------------------------------------------------------| -------------
schema.repository.serverUrl | URL of a repository | `http://localhost:8888/`
schema.repository.validationEnabled | Allows to use validation API in schema repository | `false`
schema.repository.connectionTimeoutMillis| Connection timeout used in http client (specified in milliseconds) | 1000
schema.repository.socketTimeoutMillis | Read socket timeout used in http client (specified in milliseconds) | 3000
schema.repository.deleteSchemaPathSuffix | A suffix of the URL to delete all schema versions: `/subjects/{subject}/{deleteSchemaPathSuffix}` | `""`
schema.repository.subjectSuffixEnabled | Add `-value` suffix to every subject name | `false`
schema.repository.subjectNamespaceEnabled| Add `kafka.defaultNamespace` property value as a prefix to every subject name | `false`

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.schema.DirectCompiledSchemaRepository;
import pl.allegro.tech.hermes.schema.DirectSchemaVersionsRepository;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;
import pl.allegro.tech.hermes.schema.SchemaCompilersFactory;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;
Expand All @@ -56,11 +56,21 @@ static HermesServer provideHermesServer() throws IOException {
MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics);
TopicsCache topicsCache = new InMemoryTopicsCache(metricsFacade, topic);
BrokerMessageProducer brokerMessageProducer = new InMemoryBrokerMessageProducer();
RawSchemaClient rawSchemaClient = new InMemorySchemaClient(topic.getName(), loadMessageResource("schema"), 1, 1);
RawSchemaAdminClient rawSchemaAdminClient = new InMemorySchemaAdminClient(
topic.getName(),
loadMessageResource("schema"),
1,
1
);
Trackers trackers = new Trackers(Collections.emptyList());
AvroMessageContentWrapper avroMessageContentWrapper = new AvroMessageContentWrapper(Clock.systemDefaultZone());
HttpHandler httpHandler = provideHttpHandler(throughputLimiter, topicsCache,
brokerMessageProducer, rawSchemaClient, trackers, avroMessageContentWrapper);
HttpHandler httpHandler = provideHttpHandler(
throughputLimiter,
topicsCache,
brokerMessageProducer,
rawSchemaAdminClient,
trackers,
avroMessageContentWrapper);
SslProperties sslProperties = new SslProperties();
HermesServerProperties hermesServerProperties = new HermesServerProperties();
hermesServerProperties.setGracefulShutdownEnabled(false);
Expand All @@ -81,7 +91,7 @@ static HermesServer provideHermesServer() throws IOException {

private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimiter,
TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer,
RawSchemaClient rawSchemaClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) {
RawSchemaAdminClient rawSchemaAdminClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) {
HTTPHeadersProperties httpHeadersProperties = new HTTPHeadersProperties();
HandlersChainProperties handlersChainProperties = new HandlersChainProperties();
TrackingHeadersExtractor trackingHeadersExtractor = new DefaultTrackingHeaderExtractor();
Expand All @@ -98,8 +108,11 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite
new MessageValidators(Collections.emptyList()),
new MessageContentTypeEnforcer(),
new SchemaRepository(
new DirectSchemaVersionsRepository(rawSchemaClient),
new DirectCompiledSchemaRepository<>(rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler())
new DirectSchemaVersionsRepository(rawSchemaAdminClient),
new DirectCompiledSchemaRepository<>(
rawSchemaAdminClient,
SchemaCompilersFactory.avroSchemaCompiler()
)
),
new DefaultHeadersPropagator(httpHeadersProperties),
new BenchmarkMessageContentWrapper(avroMessageContentWrapper),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.RawSchemaWithMetadata;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;
import pl.allegro.tech.hermes.schema.SchemaId;
import pl.allegro.tech.hermes.schema.SchemaVersion;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class InMemorySchemaClient implements RawSchemaClient {
public class InMemorySchemaAdminClient implements RawSchemaAdminClient {

private final TopicName schemaTopicName;
private final RawSchemaWithMetadata rawSchemaWithMetadata;

public InMemorySchemaClient(TopicName schemaTopicName, String schemaSource, int id, int version) {
public InMemorySchemaAdminClient(TopicName schemaTopicName, String schemaSource, int id, int version) {
this.schemaTopicName = schemaTopicName;
rawSchemaWithMetadata = RawSchemaWithMetadata.of(schemaSource, id, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public AvroCompiledSchemaRepositoryFactory(RawSchemaClient rawSchemaClient,
}

public CompiledSchemaRepository<Schema> provide() {
CompiledSchemaRepository<Schema> repository = new DirectCompiledSchemaRepository<>(rawSchemaClient,
SchemaCompilersFactory.avroSchemaCompiler());
CompiledSchemaRepository<Schema> repository = new DirectCompiledSchemaRepository<>(
rawSchemaClient,
SchemaCompilersFactory.avroSchemaCompiler()
);

if (cacheEnabled) {
return new CachedCompiledSchemaRepository<>(repository,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package pl.allegro.tech.hermes.common.schema;

import com.fasterxml.jackson.databind.ObjectMapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.SubjectNamingStrategy;
Expand All @@ -13,27 +11,24 @@ public class RawSchemaClientFactory {
private final String kafkaNamespace;
private final String kafkaNamespaceSeparator;
private final MetricsFacade metricsFacade;
private final ObjectMapper objectMapper;
private final SchemaRepositoryInstanceResolver resolver;
private final boolean subjectSuffixEnabled;
private final boolean subjectNamespaceEnabled;

public RawSchemaClientFactory(String kafkaNamespace,
String kafkaNamespaceSeparator,
MetricsFacade metricsFacade,
ObjectMapper objectMapper,
SchemaRepositoryInstanceResolver resolver,
boolean subjectSuffixEnabled,
boolean subjectNamespaceEnabled) {
this.kafkaNamespace = kafkaNamespace;
this.kafkaNamespaceSeparator = kafkaNamespaceSeparator;
this.metricsFacade = metricsFacade;
this.objectMapper = objectMapper;
this.resolver = resolver;
this.subjectSuffixEnabled = subjectSuffixEnabled;
this.subjectNamespaceEnabled = subjectNamespaceEnabled;
}

public RawSchemaClient provide() {
SubjectNamingStrategy subjectNamingStrategy = SubjectNamingStrategy.qualifiedName
.withValueSuffixIf(subjectSuffixEnabled)
Expand All @@ -45,11 +40,11 @@ public RawSchemaClient provide() {
)
);
return createMetricsTrackingClient(
new SchemaRegistryRawSchemaClient(resolver, objectMapper, subjectNamingStrategy)
new SchemaRegistryRawSchemaClient(resolver, subjectNamingStrategy)
);
}

private RawSchemaClient createMetricsTrackingClient(RawSchemaClient rawSchemaClient) {
return new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, metricsFacade);
return new ReadMetricsTrackingRawSchemaClient(metricsFacade, rawSchemaClient);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pl.allegro.tech.hermes.common.schema;

import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;

public class ReadMetricsTrackingRawSchemaAdminClient extends ReadMetricsTrackingRawSchemaClient implements RawSchemaAdminClient {
private final RawSchemaAdminClient rawSchemaAdminClient;

public ReadMetricsTrackingRawSchemaAdminClient(
RawSchemaAdminClient rawSchemaAdminClient,
MetricsFacade metricsFacade) {
super(metricsFacade, rawSchemaAdminClient);
this.rawSchemaAdminClient = rawSchemaAdminClient;
}

@Override
public void registerSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaAdminClient.registerSchema(topic, rawSchema);
}

@Override
public void deleteAllSchemaVersions(TopicName topic) {
rawSchemaAdminClient.deleteAllSchemaVersions(topic);
}

@Override
public void validateSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaAdminClient.validateSchema(topic, rawSchema);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.allegro.tech.hermes.common.schema;

import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.RawSchemaWithMetadata;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
Expand All @@ -15,14 +14,14 @@
import java.util.function.Supplier;

public class ReadMetricsTrackingRawSchemaClient implements RawSchemaClient {
private final RawSchemaClient rawSchemaClient;
private final MetricsFacade metricsFacade;
protected final RawSchemaClient rawSchemaClient;

public ReadMetricsTrackingRawSchemaClient(
RawSchemaClient rawSchemaClient,
MetricsFacade metricsFacade) {
this.rawSchemaClient = rawSchemaClient;
MetricsFacade metricsFacade,
RawSchemaClient rawSchemaClient) {
this.metricsFacade = metricsFacade;
this.rawSchemaClient = rawSchemaClient;
}

@Override
Expand All @@ -45,30 +44,15 @@ public List<SchemaVersion> getVersions(TopicName topic) {
return timedVersions(() -> rawSchemaClient.getVersions(topic));
}

@Override
public void registerSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaClient.registerSchema(topic, rawSchema);
}

@Override
public void deleteAllSchemaVersions(TopicName topic) {
rawSchemaClient.deleteAllSchemaVersions(topic);
}

@Override
public void validateSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaClient.validateSchema(topic, rawSchema);
}

private <T> T timedSchema(Supplier<? extends T> callable) {
protected <T> T timedSchema(Supplier<? extends T> callable) {
return timed(callable, metricsFacade.schemaClient().schemaTimer());
}

private <T> T timedVersions(Supplier<? extends T> callable) {
protected <T> T timedVersions(Supplier<? extends T> callable) {
return timed(callable, metricsFacade.schemaClient().versionsTimer());
}

private <T> T timed(Supplier<? extends T> callable, HermesTimer timer) {
protected <T> T timed(Supplier<? extends T> callable, HermesTimer timer) {
try (HermesTimerContext time = timer.time()) {
return callable.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public SchemaVersionsRepositoryFactory(RawSchemaClient rawSchemaClient,
public SchemaVersionsRepository provide() {
if (schemaVersionsRepositoryParameters.isCacheEnabled()) {
CachedSchemaVersionsRepository cachedSchemaVersionsRepository = new CachedSchemaVersionsRepository(
rawSchemaClient,
getVersionsReloader(),
schemaVersionsRepositoryParameters.getRefreshAfterWrite(),
schemaVersionsRepositoryParameters.getExpireAfterWrite());
rawSchemaClient,
getVersionsReloader(),
schemaVersionsRepositoryParameters.getRefreshAfterWrite(),
schemaVersionsRepositoryParameters.getExpireAfterWrite());

notificationsBus.registerTopicCallback(
new SchemaCacheRefresherCallback<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import pl.allegro.tech.hermes.common.metric.HermesMetrics
import pl.allegro.tech.hermes.common.metric.MetricsFacade
import pl.allegro.tech.hermes.common.metric.Timers
import pl.allegro.tech.hermes.metrics.PathsCompiler
import pl.allegro.tech.hermes.schema.RawSchemaClient
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient
import pl.allegro.tech.hermes.schema.SchemaVersion
import pl.allegro.tech.hermes.test.helper.metrics.MicrometerUtils
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject

class ReadMetricsTrackingRawSchemaClientTest extends Specification {
class ReadMetricsTrackingRawSchemaAdminClientTest extends Specification {
@Shared
TopicName topicName = TopicName.fromQualifiedName("someGroup.someTopic")

Expand All @@ -33,10 +33,10 @@ class ReadMetricsTrackingRawSchemaClientTest extends Specification {

MetricsFacade metricsFacade = new MetricsFacade(meterRegistry, hermesMetrics)

RawSchemaClient rawSchemaClient = Mock()
RawSchemaAdminClient rawSchemaClient = Mock()

@Subject
RawSchemaClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, metricsFacade)
RawSchemaAdminClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaAdminClient(rawSchemaClient, metricsFacade)

def "should track latency metrics for schema retrieval"() {
expect:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public CompiledSchemaRepository<Schema> avroCompiledSchemaRepository(RawSchemaCl

@Bean
public RawSchemaClient rawSchemaClient(MetricsFacade metricsFacade,
ObjectMapper objectMapper,
SchemaRepositoryInstanceResolver resolver,
SchemaProperties schemaProperties,
KafkaClustersProperties kafkaProperties) {
return new RawSchemaClientFactory(kafkaProperties.getNamespace(), kafkaProperties.getNamespaceSeparator(), metricsFacade,
objectMapper, resolver,
schemaProperties.getRepository().isSubjectSuffixEnabled(),
schemaProperties.getRepository().isSubjectNamespaceEnabled()).provide();
return new RawSchemaClientFactory(
kafkaProperties.getNamespace(),
kafkaProperties.getNamespaceSeparator(),
metricsFacade,
resolver,
schemaProperties.getRepository().isSubjectSuffixEnabled(),
schemaProperties.getRepository().isSubjectNamespaceEnabled()
).provide();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ public CompiledSchemaRepository<Schema> avroCompiledSchemaRepository(RawSchemaCl
@Bean
public RawSchemaClient rawSchemaClient(KafkaClustersProperties kafkaClustersProperties,
MetricsFacade metricsFacade,
ObjectMapper objectMapper,
SchemaRepositoryInstanceResolver resolver,
SchemaProperties schemaProperties) {
return new RawSchemaClientFactory(
kafkaClustersProperties.getNamespace(),
kafkaClustersProperties.getNamespaceSeparator(),
metricsFacade,
objectMapper,
resolver,
schemaProperties.getRepository().isSubjectSuffixEnabled(),
schemaProperties.getRepository().isSubjectNamespaceEnabled()
Expand Down
Loading
Loading