From f8fa305fe16bc2079f22780727e8bc936aaf32bf Mon Sep 17 00:00:00 2001 From: Adrian Sobolewski Date: Wed, 18 Oct 2023 12:45:27 +0200 Subject: [PATCH 1/4] #1569 | Changed default value of schema.repository.deleteSchemaPathSuffix to empty string. This has been done to make requests to default Confluent schema registry match its delete url. --- docs/docs/configuration/schema-repository.md | 18 +++++++++--------- .../config/SchemaRepositoryProperties.java | 2 +- .../SchemaRegistryRawSchemaClient.java | 4 ++-- .../SchemaRegistryRawSchemaClientTest.groovy | 8 ++++++-- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/docs/docs/configuration/schema-repository.md b/docs/docs/configuration/schema-repository.md index 72ebbd1fe1..04fd61849c 100644 --- a/docs/docs/configuration/schema-repository.md +++ b/docs/docs/configuration/schema-repository.md @@ -37,13 +37,13 @@ Option | Description ### Management -Option | Description | Default value ----------------------------------------- | ------------------------------------------------------------------ | ------------- -schema.repository.serverUrl | URL of a repository | `http://localhost:8888/` -schema.repository.validationEnabled | Allows to use validation API in schema repository | `false` -schema.repository.connectionTimeoutMillis| Connection timeout used in http client (specified in milliseconds) | 1000 -schema.repository.socketTimeoutMillis | Read socket timeout used in http client (specified in milliseconds)| 3000 -schema.repository.deleteSchemaPathSuffix | A suffix of the URL to delete all schema versions: `/subjects/{subject}/{deleteSchemaPathSuffix}| `versions` -schema.repository.subjectSuffixEnabled | Add `-value` suffix to every subject name | `false` -schema.repository.subjectNamespaceEnabled| Add `kafka.defaultNamespace` property value as a prefix to every subject name | `false` +Option | Description | Default value +---------------------------------------- |----------------------------------------------------------------------------------------------------| ------------- +schema.repository.serverUrl | URL of a repository | `http://localhost:8888/` +schema.repository.validationEnabled | Allows to use validation API in schema repository | `false` +schema.repository.connectionTimeoutMillis| Connection timeout used in http client (specified in milliseconds) | 1000 +schema.repository.socketTimeoutMillis | Read socket timeout used in http client (specified in milliseconds) | 3000 +schema.repository.deleteSchemaPathSuffix | A suffix of the URL to delete all schema versions: `/subjects/{subject}/{deleteSchemaPathSuffix}` | `""` +schema.repository.subjectSuffixEnabled | Add `-value` suffix to every subject name | `false` +schema.repository.subjectNamespaceEnabled| Add `kafka.defaultNamespace` property value as a prefix to every subject name | `false` diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryProperties.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryProperties.java index 085cfac8c9..68a7bd6107 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryProperties.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryProperties.java @@ -12,7 +12,7 @@ public class SchemaRepositoryProperties { private int connectionTimeoutMillis = 1000; private int socketTimeoutMillis = 3000; - private String deleteSchemaPathSuffix = "versions"; + private String deleteSchemaPathSuffix = ""; private boolean subjectSuffixEnabled = false; diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java index ced558137c..b33e4b7575 100644 --- a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java @@ -30,7 +30,7 @@ /** * This implementation of RawSchemaClient is compatible with Confluent Schema Registry API - * except for the deleteAllSchemaVersions and validation endpoint which are not fully supported by the Confluent project. + * except for the validation endpoint which is not fully supported by the Confluent project. */ public class SchemaRegistryRawSchemaClient implements RawSchemaClient { @@ -48,7 +48,7 @@ public class SchemaRegistryRawSchemaClient implements RawSchemaClient { public SchemaRegistryRawSchemaClient(SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, ObjectMapper objectMapper, SubjectNamingStrategy subjectNamingStrategy) { - this(schemaRepositoryInstanceResolver, objectMapper, false, "versions", subjectNamingStrategy); + this(schemaRepositoryInstanceResolver, objectMapper, false, "", subjectNamingStrategy); } public SchemaRegistryRawSchemaClient(SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, diff --git a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy index 9f45f674d5..3db9686bb8 100644 --- a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy +++ b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy @@ -244,13 +244,13 @@ class SchemaRegistryRawSchemaClientTest extends Specification { def "should delete all schema versions"() { given: - wireMock.stubFor(WireMock.delete(versionsUrl(topicName, subjectNamingStrategy)).willReturn(okResponse())) + wireMock.stubFor(WireMock.delete(deleteUrl(topicName, subjectNamingStrategy)).willReturn(okResponse())) when: client.deleteAllSchemaVersions(topicName) then: - wireMock.verify(1, deleteRequestedFor(versionsUrl(topicName, subjectNamingStrategy))) + wireMock.verify(1, deleteRequestedFor(deleteUrl(topicName, subjectNamingStrategy))) where: client << clients @@ -409,6 +409,10 @@ class SchemaRegistryRawSchemaClientTest extends Specification { urlEqualTo("/subjects/${subjectNamingStrategy.apply(topic)}/versions") } + private UrlPattern deleteUrl(TopicName topic, SubjectNamingStrategy subjectNamingStrategy) { + urlEqualTo("/subjects/${subjectNamingStrategy.apply(topic)}") + } + private UrlPattern schemaVersionUrl(TopicName topic, int version, SubjectNamingStrategy subjectNamingStrategy) { urlEqualTo("/subjects/${subjectNamingStrategy.apply(topic)}/versions/$version") } From 23b4ba86bba3429ecb59a7d01a7d96d6a877af0f Mon Sep 17 00:00:00 2001 From: Adrian Sobolewski Date: Wed, 18 Oct 2023 14:41:28 +0200 Subject: [PATCH 2/4] #1569 | Introduced RawSchemaAdminClient interface, which provides methods to read and modify the schema. Modified RawSchemaClient to provide only read methods. --- .../environment/HermesServerFactory.java | 21 +- ...nt.java => InMemorySchemaAdminClient.java} | 6 +- .../AvroCompiledSchemaRepositoryFactory.java | 6 +- .../common/schema/RawSchemaClientFactory.java | 11 +- ...adMetricsTrackingRawSchemaAdminClient.java | 32 +++ .../ReadMetricsTrackingRawSchemaClient.java | 37 +-- .../SchemaVersionsRepositoryFactory.java | 8 +- ...csTrackingRawSchemaAdminClientTest.groovy} | 8 +- .../consumers/config/SchemaConfiguration.java | 13 +- .../frontend/config/SchemaConfiguration.java | 2 - .../config/SchemaRepositoryConfiguration.java | 18 +- .../domain/topic/schema/SchemaService.java | 20 +- .../hermes/schema/RawSchemaAdminClient.java | 13 + .../tech/hermes/schema/RawSchemaClient.java | 12 +- .../SchemaRegistryRawSchemaAdminClient.java | 190 ++++++++++++++ .../SchemaRegistryRawSchemaClient.java | 247 +++--------------- .../CachedSchemaVersionsRepositoryTest.groovy | 2 +- .../DirectCompiledSchemaRepositoryTest.groovy | 2 +- ...maRegistryRawSchemaAdminClientTest.groovy} | 14 +- 19 files changed, 355 insertions(+), 307 deletions(-) rename hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/{InMemorySchemaClient.java => InMemorySchemaAdminClient.java} (89%) create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClient.java rename hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/{ReadMetricsTrackingRawSchemaClientTest.groovy => ReadMetricsTrackingRawSchemaAdminClientTest.groovy} (91%) create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaAdminClient.java create mode 100644 hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java rename hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/{SchemaRegistryRawSchemaClientTest.groovy => SchemaRegistryRawSchemaAdminClientTest.groovy} (96%) diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java index 356fe6afb1..20b9df303f 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java @@ -30,7 +30,7 @@ import pl.allegro.tech.hermes.metrics.PathsCompiler; import pl.allegro.tech.hermes.schema.DirectCompiledSchemaRepository; import pl.allegro.tech.hermes.schema.DirectSchemaVersionsRepository; -import pl.allegro.tech.hermes.schema.RawSchemaClient; +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient; import pl.allegro.tech.hermes.schema.SchemaCompilersFactory; import pl.allegro.tech.hermes.schema.SchemaRepository; import pl.allegro.tech.hermes.tracker.frontend.Trackers; @@ -55,11 +55,16 @@ static HermesServer provideHermesServer() throws IOException { MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics); TopicsCache topicsCache = new InMemoryTopicsCache(metricsFacade, topic); BrokerMessageProducer brokerMessageProducer = new InMemoryBrokerMessageProducer(); - RawSchemaClient rawSchemaClient = new InMemorySchemaClient(topic.getName(), loadMessageResource("schema"), 1, 1); + RawSchemaAdminClient rawSchemaAdminClient = new InMemorySchemaAdminClient(topic.getName(), loadMessageResource("schema"), 1, 1); Trackers trackers = new Trackers(Collections.emptyList()); AvroMessageContentWrapper avroMessageContentWrapper = new AvroMessageContentWrapper(Clock.systemDefaultZone()); - HttpHandler httpHandler = provideHttpHandler(throughputLimiter, topicsCache, - brokerMessageProducer, rawSchemaClient, trackers, avroMessageContentWrapper); + HttpHandler httpHandler = provideHttpHandler( + throughputLimiter, + topicsCache, + brokerMessageProducer, + rawSchemaAdminClient, + trackers, + avroMessageContentWrapper); SslProperties sslProperties = new SslProperties(); HermesServerProperties hermesServerProperties = new HermesServerProperties(); hermesServerProperties.setGracefulShutdownEnabled(false); @@ -79,8 +84,8 @@ static HermesServer provideHermesServer() throws IOException { } private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimiter, - TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer, - RawSchemaClient rawSchemaClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) { + TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer, + RawSchemaAdminClient rawSchemaAdminClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) { HeaderPropagationProperties headerPropagationProperties = new HeaderPropagationProperties(); HandlersChainProperties handlersChainProperties = new HandlersChainProperties(); TrackingHeadersExtractor trackingHeadersExtractor = new DefaultTrackingHeaderExtractor(); @@ -94,8 +99,8 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite new MessageValidators(Collections.emptyList()), new MessageContentTypeEnforcer(), new SchemaRepository( - new DirectSchemaVersionsRepository(rawSchemaClient), - new DirectCompiledSchemaRepository<>(rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler()) + new DirectSchemaVersionsRepository(rawSchemaAdminClient), + new DirectCompiledSchemaRepository<>(rawSchemaAdminClient, SchemaCompilersFactory.avroSchemaCompiler()) ), new DefaultHeadersPropagator(headerPropagationProperties.isEnabled(), headerPropagationProperties.getAllowFilter()), new BenchmarkMessageContentWrapper(avroMessageContentWrapper), diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemorySchemaClient.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemorySchemaAdminClient.java similarity index 89% rename from hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemorySchemaClient.java rename to hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemorySchemaAdminClient.java index 1c2ec6ec38..feb8f5bedf 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemorySchemaClient.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/InMemorySchemaAdminClient.java @@ -4,7 +4,7 @@ import pl.allegro.tech.hermes.api.RawSchema; import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; import pl.allegro.tech.hermes.api.TopicName; -import pl.allegro.tech.hermes.schema.RawSchemaClient; +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient; import pl.allegro.tech.hermes.schema.SchemaId; import pl.allegro.tech.hermes.schema.SchemaVersion; @@ -12,12 +12,12 @@ import java.util.Objects; import java.util.Optional; -public class InMemorySchemaClient implements RawSchemaClient { +public class InMemorySchemaAdminClient implements RawSchemaAdminClient { private final TopicName schemaTopicName; private final RawSchemaWithMetadata rawSchemaWithMetadata; - public InMemorySchemaClient(TopicName schemaTopicName, String schemaSource, int id, int version) { + public InMemorySchemaAdminClient(TopicName schemaTopicName, String schemaSource, int id, int version) { this.schemaTopicName = schemaTopicName; rawSchemaWithMetadata = RawSchemaWithMetadata.of(schemaSource, id, version); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java index a3e8a49fe7..85035fe190 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/AvroCompiledSchemaRepositoryFactory.java @@ -27,8 +27,10 @@ public AvroCompiledSchemaRepositoryFactory(RawSchemaClient rawSchemaClient, } public CompiledSchemaRepository provide() { - CompiledSchemaRepository repository = new DirectCompiledSchemaRepository<>(rawSchemaClient, - SchemaCompilersFactory.avroSchemaCompiler()); + CompiledSchemaRepository repository = new DirectCompiledSchemaRepository<>( + rawSchemaClient, + SchemaCompilersFactory.avroSchemaCompiler() + ); if (cacheEnabled) { return new CachedCompiledSchemaRepository<>(repository, diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/RawSchemaClientFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/RawSchemaClientFactory.java index e4bf8933cc..8a949376de 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/RawSchemaClientFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/RawSchemaClientFactory.java @@ -1,7 +1,5 @@ package pl.allegro.tech.hermes.common.schema; -import com.fasterxml.jackson.databind.ObjectMapper; -import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.schema.RawSchemaClient; import pl.allegro.tech.hermes.schema.SubjectNamingStrategy; @@ -13,7 +11,6 @@ public class RawSchemaClientFactory { private final String kafkaNamespace; private final String kafkaNamespaceSeparator; private final MetricsFacade metricsFacade; - private final ObjectMapper objectMapper; private final SchemaRepositoryInstanceResolver resolver; private final boolean subjectSuffixEnabled; private final boolean subjectNamespaceEnabled; @@ -21,19 +18,17 @@ public class RawSchemaClientFactory { public RawSchemaClientFactory(String kafkaNamespace, String kafkaNamespaceSeparator, MetricsFacade metricsFacade, - ObjectMapper objectMapper, SchemaRepositoryInstanceResolver resolver, boolean subjectSuffixEnabled, boolean subjectNamespaceEnabled) { this.kafkaNamespace = kafkaNamespace; this.kafkaNamespaceSeparator = kafkaNamespaceSeparator; this.metricsFacade = metricsFacade; - this.objectMapper = objectMapper; this.resolver = resolver; this.subjectSuffixEnabled = subjectSuffixEnabled; this.subjectNamespaceEnabled = subjectNamespaceEnabled; } - + public RawSchemaClient provide() { SubjectNamingStrategy subjectNamingStrategy = SubjectNamingStrategy.qualifiedName .withValueSuffixIf(subjectSuffixEnabled) @@ -45,11 +40,11 @@ public RawSchemaClient provide() { ) ); return createMetricsTrackingClient( - new SchemaRegistryRawSchemaClient(resolver, objectMapper, subjectNamingStrategy) + new SchemaRegistryRawSchemaClient(resolver, subjectNamingStrategy) ); } private RawSchemaClient createMetricsTrackingClient(RawSchemaClient rawSchemaClient) { - return new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, metricsFacade); + return new ReadMetricsTrackingRawSchemaClient(metricsFacade, rawSchemaClient); } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClient.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClient.java new file mode 100644 index 0000000000..6bd7c294d2 --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClient.java @@ -0,0 +1,32 @@ +package pl.allegro.tech.hermes.common.schema; + +import pl.allegro.tech.hermes.api.RawSchema; +import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient; + +public class ReadMetricsTrackingRawSchemaAdminClient extends ReadMetricsTrackingRawSchemaClient implements RawSchemaAdminClient { + private final RawSchemaAdminClient rawSchemaAdminClient; + + public ReadMetricsTrackingRawSchemaAdminClient( + RawSchemaAdminClient rawSchemaAdminClient, + MetricsFacade metricsFacade) { + super(metricsFacade, rawSchemaAdminClient); + this.rawSchemaAdminClient = rawSchemaAdminClient; + } + + @Override + public void registerSchema(TopicName topic, RawSchema rawSchema) { + rawSchemaAdminClient.registerSchema(topic, rawSchema); + } + + @Override + public void deleteAllSchemaVersions(TopicName topic) { + rawSchemaAdminClient.deleteAllSchemaVersions(topic); + } + + @Override + public void validateSchema(TopicName topic, RawSchema rawSchema) { + rawSchemaAdminClient.validateSchema(topic, rawSchema); + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java index 10b792c9c7..16fa089de8 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java @@ -1,6 +1,8 @@ package pl.allegro.tech.hermes.common.schema; -import pl.allegro.tech.hermes.api.RawSchema; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.metric.MetricsFacade; @@ -10,19 +12,15 @@ import pl.allegro.tech.hermes.schema.SchemaId; import pl.allegro.tech.hermes.schema.SchemaVersion; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; - public class ReadMetricsTrackingRawSchemaClient implements RawSchemaClient { - private final RawSchemaClient rawSchemaClient; private final MetricsFacade metricsFacade; + protected final RawSchemaClient rawSchemaClient; public ReadMetricsTrackingRawSchemaClient( - RawSchemaClient rawSchemaClient, - MetricsFacade metricsFacade) { - this.rawSchemaClient = rawSchemaClient; + MetricsFacade metricsFacade, + RawSchemaClient rawSchemaClient) { this.metricsFacade = metricsFacade; + this.rawSchemaClient = rawSchemaClient; } @Override @@ -45,30 +43,15 @@ public List getVersions(TopicName topic) { return timedVersions(() -> rawSchemaClient.getVersions(topic)); } - @Override - public void registerSchema(TopicName topic, RawSchema rawSchema) { - rawSchemaClient.registerSchema(topic, rawSchema); - } - - @Override - public void deleteAllSchemaVersions(TopicName topic) { - rawSchemaClient.deleteAllSchemaVersions(topic); - } - - @Override - public void validateSchema(TopicName topic, RawSchema rawSchema) { - rawSchemaClient.validateSchema(topic, rawSchema); - } - - private T timedSchema(Supplier callable) { + protected T timedSchema(Supplier callable) { return timed(callable, metricsFacade.schemaClient().schemaTimer()); } - private T timedVersions(Supplier callable) { + protected T timedVersions(Supplier callable) { return timed(callable, metricsFacade.schemaClient().versionsTimer()); } - private T timed(Supplier callable, HermesTimer timer) { + protected T timed(Supplier callable, HermesTimer timer) { try (HermesTimerContext time = timer.time()) { return callable.get(); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java index 90eaf20685..48c18ed503 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/SchemaVersionsRepositoryFactory.java @@ -32,10 +32,10 @@ public SchemaVersionsRepositoryFactory(RawSchemaClient rawSchemaClient, public SchemaVersionsRepository provide() { if (schemaVersionsRepositoryParameters.isCacheEnabled()) { CachedSchemaVersionsRepository cachedSchemaVersionsRepository = new CachedSchemaVersionsRepository( - rawSchemaClient, - getVersionsReloader(), - schemaVersionsRepositoryParameters.getRefreshAfterWrite(), - schemaVersionsRepositoryParameters.getExpireAfterWrite()); + rawSchemaClient, + getVersionsReloader(), + schemaVersionsRepositoryParameters.getRefreshAfterWrite(), + schemaVersionsRepositoryParameters.getExpireAfterWrite()); notificationsBus.registerTopicCallback( new SchemaCacheRefresherCallback<>( diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClientTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClientTest.groovy similarity index 91% rename from hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClientTest.groovy rename to hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClientTest.groovy index 8e8d3d4e8a..56cb69c9f7 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClientTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaAdminClientTest.groovy @@ -11,14 +11,14 @@ import pl.allegro.tech.hermes.common.metric.HermesMetrics import pl.allegro.tech.hermes.common.metric.MetricsFacade import pl.allegro.tech.hermes.common.metric.Timers import pl.allegro.tech.hermes.metrics.PathsCompiler -import pl.allegro.tech.hermes.schema.RawSchemaClient +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient import pl.allegro.tech.hermes.schema.SchemaVersion import pl.allegro.tech.hermes.test.helper.metrics.MicrometerUtils import spock.lang.Shared import spock.lang.Specification import spock.lang.Subject -class ReadMetricsTrackingRawSchemaClientTest extends Specification { +class ReadMetricsTrackingRawSchemaAdminClientTest extends Specification { @Shared TopicName topicName = TopicName.fromQualifiedName("someGroup.someTopic") @@ -33,10 +33,10 @@ class ReadMetricsTrackingRawSchemaClientTest extends Specification { MetricsFacade metricsFacade = new MetricsFacade(meterRegistry, hermesMetrics) - RawSchemaClient rawSchemaClient = Mock() + RawSchemaAdminClient rawSchemaClient = Mock() @Subject - RawSchemaClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, metricsFacade) + RawSchemaAdminClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaAdminClient(rawSchemaClient, metricsFacade) def "should track latency metrics for schema retrieval"() { expect: diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SchemaConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SchemaConfiguration.java index fb3bcc9c82..dbd68f3aad 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SchemaConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SchemaConfiguration.java @@ -49,14 +49,17 @@ public CompiledSchemaRepository avroCompiledSchemaRepository(RawSchemaCl @Bean public RawSchemaClient rawSchemaClient(MetricsFacade metricsFacade, - ObjectMapper objectMapper, SchemaRepositoryInstanceResolver resolver, SchemaProperties schemaProperties, KafkaClustersProperties kafkaProperties) { - return new RawSchemaClientFactory(kafkaProperties.getNamespace(), kafkaProperties.getNamespaceSeparator(), metricsFacade, - objectMapper, resolver, - schemaProperties.getRepository().isSubjectSuffixEnabled(), - schemaProperties.getRepository().isSubjectNamespaceEnabled()).provide(); + return new RawSchemaClientFactory( + kafkaProperties.getNamespace(), + kafkaProperties.getNamespaceSeparator(), + metricsFacade, + resolver, + schemaProperties.getRepository().isSubjectSuffixEnabled(), + schemaProperties.getRepository().isSubjectNamespaceEnabled() + ).provide(); } @Bean diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/SchemaConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/SchemaConfiguration.java index 59f6dd5163..7c512d0b38 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/SchemaConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/SchemaConfiguration.java @@ -50,14 +50,12 @@ public CompiledSchemaRepository avroCompiledSchemaRepository(RawSchemaCl @Bean public RawSchemaClient rawSchemaClient(KafkaClustersProperties kafkaClustersProperties, MetricsFacade metricsFacade, - ObjectMapper objectMapper, SchemaRepositoryInstanceResolver resolver, SchemaProperties schemaProperties) { return new RawSchemaClientFactory( kafkaClustersProperties.getNamespace(), kafkaClustersProperties.getNamespaceSeparator(), metricsFacade, - objectMapper, resolver, schemaProperties.getRepository().isSubjectSuffixEnabled(), schemaProperties.getRepository().isSubjectNamespaceEnabled() diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java index ccbd520dc1..81d8c45929 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; +import java.net.URI; import org.apache.avro.Schema; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -19,17 +20,16 @@ import pl.allegro.tech.hermes.schema.CompiledSchemaRepository; import pl.allegro.tech.hermes.schema.DirectCompiledSchemaRepository; import pl.allegro.tech.hermes.schema.DirectSchemaVersionsRepository; +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient; import pl.allegro.tech.hermes.schema.RawSchemaClient; import pl.allegro.tech.hermes.schema.SchemaCompilersFactory; import pl.allegro.tech.hermes.schema.SchemaRepository; import pl.allegro.tech.hermes.schema.SchemaVersionsRepository; import pl.allegro.tech.hermes.schema.SubjectNamingStrategy; -import pl.allegro.tech.hermes.schema.confluent.SchemaRegistryRawSchemaClient; +import pl.allegro.tech.hermes.schema.confluent.SchemaRegistryRawSchemaAdminClient; import pl.allegro.tech.hermes.schema.resolver.DefaultSchemaRepositoryInstanceResolver; import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver; -import java.net.URI; - import static pl.allegro.tech.hermes.schema.SubjectNamingStrategy.qualifiedName; @Configuration @@ -64,15 +64,15 @@ public SubjectNamingStrategy subjectNamingStrategy(KafkaClustersProperties kafka } @Bean - @ConditionalOnMissingBean(RawSchemaClient.class) - public RawSchemaClient schemaRegistryRawSchemaClient( + @ConditionalOnMissingBean(RawSchemaAdminClient.class) + public RawSchemaAdminClient schemaRegistryRawSchemaAdminClient( SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, ObjectMapper objectMapper, SubjectNamingStrategy subjectNamingStrategy ) { - return new SchemaRegistryRawSchemaClient(schemaRepositoryInstanceResolver, objectMapper, - schemaRepositoryProperties.isValidationEnabled(), schemaRepositoryProperties.getDeleteSchemaPathSuffix(), - subjectNamingStrategy); + return new SchemaRegistryRawSchemaAdminClient(schemaRepositoryInstanceResolver, objectMapper, + schemaRepositoryProperties.isValidationEnabled(), schemaRepositoryProperties.getDeleteSchemaPathSuffix(), + subjectNamingStrategy); } @Bean @@ -85,7 +85,7 @@ public SchemaRepositoryInstanceResolver defaultSchemaRepositoryInstanceResolver( public SchemaRepository aggregateSchemaRepository(RawSchemaClient rawSchemaClient) { SchemaVersionsRepository versionsRepository = new DirectSchemaVersionsRepository(rawSchemaClient); CompiledSchemaRepository avroSchemaRepository = new DirectCompiledSchemaRepository<>( - rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler()); + rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler()); return new SchemaRepository(versionsRepository, avroSchemaRepository); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java index 315cdd4cb1..020b01241b 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java @@ -8,7 +8,7 @@ import pl.allegro.tech.hermes.management.config.TopicProperties; import pl.allegro.tech.hermes.management.infrastructure.schema.validator.SchemaValidator; import pl.allegro.tech.hermes.management.infrastructure.schema.validator.SchemaValidatorProvider; -import pl.allegro.tech.hermes.schema.RawSchemaClient; +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient; import pl.allegro.tech.hermes.schema.SchemaId; import pl.allegro.tech.hermes.schema.SchemaVersion; @@ -20,33 +20,33 @@ @Component public class SchemaService { - private final RawSchemaClient rawSchemaClient; + private final RawSchemaAdminClient rawSchemaAdminClient; private final SchemaValidatorProvider validatorProvider; private final TopicProperties topicProperties; @Autowired - public SchemaService(RawSchemaClient rawSchemaClient, + public SchemaService(RawSchemaAdminClient rawSchemaAdminClient, SchemaValidatorProvider validatorProvider, TopicProperties topicProperties) { - this.rawSchemaClient = rawSchemaClient; + this.rawSchemaAdminClient = rawSchemaAdminClient; this.validatorProvider = validatorProvider; this.topicProperties = topicProperties; } public Optional getSchema(String qualifiedTopicName) { - return rawSchemaClient + return rawSchemaAdminClient .getLatestRawSchemaWithMetadata(fromQualifiedName(qualifiedTopicName)) .map(RawSchemaWithMetadata::getSchema); } public Optional getSchema(String qualifiedTopicName, SchemaVersion version) { - return rawSchemaClient + return rawSchemaAdminClient .getRawSchemaWithMetadata(fromQualifiedName(qualifiedTopicName), version) .map(RawSchemaWithMetadata::getSchema); } public Optional getSchema(String qualifiedTopicName, SchemaId id) { - return rawSchemaClient + return rawSchemaAdminClient .getRawSchemaWithMetadata(fromQualifiedName(qualifiedTopicName), id) .map(RawSchemaWithMetadata::getSchema); } @@ -61,14 +61,14 @@ public void registerSchema(Topic topic, String schema, boolean validate) { SchemaValidator validator = validatorProvider.provide(topic.getContentType()); validator.check(schema); } - rawSchemaClient.registerSchema(topic.getName(), RawSchema.valueOf(schema)); + rawSchemaAdminClient.registerSchema(topic.getName(), RawSchema.valueOf(schema)); } public void deleteAllSchemaVersions(String qualifiedTopicName) { if (!topicProperties.isRemoveSchema()) { throw new SchemaRemovalDisabledException(); } - rawSchemaClient.deleteAllSchemaVersions(fromQualifiedName(qualifiedTopicName)); + rawSchemaAdminClient.deleteAllSchemaVersions(fromQualifiedName(qualifiedTopicName)); } public void validateSchema(Topic topic, String schema) { @@ -76,6 +76,6 @@ public void validateSchema(Topic topic, String schema) { SchemaValidator validator = validatorProvider.provide(AVRO); validator.check(schema); } - rawSchemaClient.validateSchema(topic.getName(), RawSchema.valueOf(schema)); + rawSchemaAdminClient.validateSchema(topic.getName(), RawSchema.valueOf(schema)); } } diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaAdminClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaAdminClient.java new file mode 100644 index 0000000000..8c54947956 --- /dev/null +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaAdminClient.java @@ -0,0 +1,13 @@ +package pl.allegro.tech.hermes.schema; + +import pl.allegro.tech.hermes.api.RawSchema; +import pl.allegro.tech.hermes.api.TopicName; + +public interface RawSchemaAdminClient extends RawSchemaClient { + + void registerSchema(TopicName topic, RawSchema rawSchema); + + void deleteAllSchemaVersions(TopicName topic); + + void validateSchema(TopicName topic, RawSchema rawSchema); +} diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java index 4d012e0c1f..511b0a0015 100644 --- a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java @@ -1,11 +1,9 @@ package pl.allegro.tech.hermes.schema; -import pl.allegro.tech.hermes.api.RawSchema; -import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; -import pl.allegro.tech.hermes.api.TopicName; - import java.util.List; import java.util.Optional; +import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; +import pl.allegro.tech.hermes.api.TopicName; public interface RawSchemaClient { @@ -16,10 +14,4 @@ public interface RawSchemaClient { Optional getLatestRawSchemaWithMetadata(TopicName topic); List getVersions(TopicName topic); - - void registerSchema(TopicName topic, RawSchema rawSchema); - - void deleteAllSchemaVersions(TopicName topic); - - void validateSchema(TopicName topic, RawSchema rawSchema); } diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java new file mode 100644 index 0000000000..32f98c786b --- /dev/null +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java @@ -0,0 +1,190 @@ +package pl.allegro.tech.hermes.schema.confluent; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.RawSchema; +import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.schema.BadSchemaRequestException; +import pl.allegro.tech.hermes.schema.InternalSchemaRepositoryException; +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient; +import pl.allegro.tech.hermes.schema.SubjectNamingStrategy; +import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver; + +import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; +import static jakarta.ws.rs.core.Response.Status.Family.SUCCESSFUL; + +/** + * This implementation of RawSchemaClient is compatible with Confluent Schema Registry API + * except for the validation endpoint which is not fully supported by the Confluent project. + */ +public final class SchemaRegistryRawSchemaAdminClient extends SchemaRegistryRawSchemaClient implements RawSchemaAdminClient { + + private static final Logger logger = LoggerFactory.getLogger(SchemaRegistryRawSchemaAdminClient.class); + + private static final String SCHEMA_REPO_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; + + private final ObjectMapper objectMapper; + private final boolean validationEndpointEnabled; + private final String deleteSchemaPathSuffix; + + public SchemaRegistryRawSchemaAdminClient(SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, + ObjectMapper objectMapper, + SubjectNamingStrategy subjectNamingStrategy) { + this(schemaRepositoryInstanceResolver, objectMapper, false, "", subjectNamingStrategy); + } + + public SchemaRegistryRawSchemaAdminClient(SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, + ObjectMapper objectMapper, + boolean validationEndpointEnabled, + String deleteSchemaPathSuffix, + SubjectNamingStrategy subjectNamingStrategy) { + super(schemaRepositoryInstanceResolver, subjectNamingStrategy); + this.validationEndpointEnabled = validationEndpointEnabled; + this.deleteSchemaPathSuffix = deleteSchemaPathSuffix; + this.objectMapper = objectMapper; + } + + @Override + public void registerSchema(TopicName topic, RawSchema rawSchema) { + String subject = subjectNamingStrategy.apply(topic); + Response response = schemaRepositoryInstanceResolver.resolve(subject) + .path("subjects") + .path(subject) + .path("versions") + .request() + .accept(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(SchemaRegistryRequestResponse.fromRawSchema(rawSchema), SCHEMA_REPO_CONTENT_TYPE)); + checkSchemaRegistration(subject, response); + } + + private void checkSchemaRegistration(String subject, Response response) { + switch (response.getStatusInfo().getFamily()) { + case SUCCESSFUL -> logger.info("Successful write to schema registry for subject {}", subject); + case CLIENT_ERROR -> throw new BadSchemaRequestException(subject, response); + default -> throw new InternalSchemaRepositoryException(subject, response); + } + } + + @Override + public void deleteAllSchemaVersions(TopicName topic) { + String subject = subjectNamingStrategy.apply(topic); + Response response = schemaRepositoryInstanceResolver.resolve(subject) + .path("subjects") + .path(subject) + .path(deleteSchemaPathSuffix) + .request() + .delete(); + checkSchemaRemoval(subject, response); + } + + private void checkSchemaRemoval(String subject, Response response) { + switch (response.getStatusInfo().getFamily()) { + case SUCCESSFUL -> logger.info("Successful removed schema subject {}", subject); + case CLIENT_ERROR -> throw new BadSchemaRequestException(subject, response); + default -> { + int statusCode = response.getStatus(); + String responseBody = response.readEntity(String.class); + logger.warn("Could not remove schema of subject {}. Reason: {} {}", subject, statusCode, responseBody); + throw new InternalSchemaRepositoryException(subject, statusCode, responseBody); + } + } + } + + @Override + public void validateSchema(TopicName topic, RawSchema schema) { + String subject = subjectNamingStrategy.apply(topic); + checkCompatibility(subject, schema); + if (validationEndpointEnabled) { + checkValidation(subject, schema); + } + } + + private void checkCompatibility(String subject, RawSchema schema) { + Response response = schemaRepositoryInstanceResolver.resolve(subject) + .path("compatibility") + .path("subjects") + .path(subject) + .path("versions") + .path("latest") + .request() + .accept(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(SchemaRegistryRequestResponse.fromRawSchema(schema), SCHEMA_REPO_CONTENT_TYPE)); + checkSchemaCompatibilityResponse(subject, response); + } + + private void checkValidation(String subject, RawSchema schema) { + Response response = schemaRepositoryInstanceResolver.resolve(subject) + .path("subjects") + .path(subject) + .path("validation") + .request() + .accept(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(SchemaRegistryRequestResponse.fromRawSchema(schema), SCHEMA_REPO_CONTENT_TYPE)); + + checkValidationResponse(subject, response); + } + + private void checkValidationResponse(String subject, Response response) { + if (response.getStatusInfo().getFamily() == SUCCESSFUL) { + validateSuccessfulValidationResult(subject, response); + } else { + handleErrorResponse(subject, response); + } + } + + private void validateSuccessfulValidationResult(String subject, Response response) { + SchemaRegistryValidationResponse validationResponse = response.readEntity(SchemaRegistryValidationResponse.class); + + if (!validationResponse.isValid()) { + throw new BadSchemaRequestException(subject, BAD_REQUEST.getStatusCode(), + validationResponse.getErrorsMessage()); + } + } + + private void checkSchemaCompatibilityResponse(String subject, Response response) { + if (response.getStatusInfo().getFamily() == SUCCESSFUL) { + validateSuccessfulCompatibilityResult(subject, response); + } else { + handleErrorResponse(subject, response); + } + } + + private void handleErrorResponse(String subject, Response response) { + switch (response.getStatusInfo().getFamily()) { + case CLIENT_ERROR -> { + if (response.getStatus() == 422) { // for other cases we assume the schema is valid + throw new BadSchemaRequestException(subject, response); + } + } + default -> { + int statusCode = response.getStatus(); + String responseBody = response.readEntity(String.class); + logger.warn("Could not validate schema of subject {}. Reason: {} {}", subject, statusCode, responseBody); + throw new InternalSchemaRepositoryException(subject, statusCode, responseBody); + } + } + } + + private void validateSuccessfulCompatibilityResult(String subject, Response response) { + String validationResultStr = response.readEntity(String.class); + SchemaRegistryCompatibilityResponse validationResponse = + toSchemaRegistryValidationResponse(subject, validationResultStr, response.getStatus()); + if (!validationResponse.isCompatible()) { + throw new BadSchemaRequestException(subject, response.getStatus(), validationResultStr); + } + } + + private SchemaRegistryCompatibilityResponse toSchemaRegistryValidationResponse(String subject, String validationResultStr, int status) { + try { + return objectMapper.readValue(validationResultStr, SchemaRegistryCompatibilityResponse.class); + } catch (IOException e) { + logger.error("Could not parse schema validation response from schema registry", e); + throw new InternalSchemaRepositoryException(subject, status, validationResultStr); + } + } +} diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java index b33e4b7575..3b3fc7e01c 100644 --- a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java @@ -1,15 +1,19 @@ package pl.allegro.tech.hermes.schema.confluent; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.RawSchema; import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; import pl.allegro.tech.hermes.api.TopicName; -import pl.allegro.tech.hermes.schema.BadSchemaRequestException; import pl.allegro.tech.hermes.schema.InternalSchemaRepositoryException; import pl.allegro.tech.hermes.schema.RawSchemaClient; import pl.allegro.tech.hermes.schema.SchemaId; @@ -17,20 +21,8 @@ import pl.allegro.tech.hermes.schema.SubjectNamingStrategy; import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; -import static jakarta.ws.rs.core.Response.Status.Family.SUCCESSFUL; - /** * This implementation of RawSchemaClient is compatible with Confluent Schema Registry API - * except for the validation endpoint which is not fully supported by the Confluent project. */ public class SchemaRegistryRawSchemaClient implements RawSchemaClient { @@ -38,28 +30,12 @@ public class SchemaRegistryRawSchemaClient implements RawSchemaClient { private static final String SCHEMA_REPO_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; - private final SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver; - - private final ObjectMapper objectMapper; - private final boolean validationEndpointEnabled; - private final String deleteSchemaPathSuffix; - private final SubjectNamingStrategy subjectNamingStrategy; + protected final SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver; + protected final SubjectNamingStrategy subjectNamingStrategy; public SchemaRegistryRawSchemaClient(SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, - ObjectMapper objectMapper, - SubjectNamingStrategy subjectNamingStrategy) { - this(schemaRepositoryInstanceResolver, objectMapper, false, "", subjectNamingStrategy); - } - - public SchemaRegistryRawSchemaClient(SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver, - ObjectMapper objectMapper, - boolean validationEndpointEnabled, - String deleteSchemaPathSuffix, SubjectNamingStrategy subjectNamingStrategy) { this.schemaRepositoryInstanceResolver = schemaRepositoryInstanceResolver; - this.validationEndpointEnabled = validationEndpointEnabled; - this.deleteSchemaPathSuffix = deleteSchemaPathSuffix; - this.objectMapper = objectMapper; this.subjectNamingStrategy = subjectNamingStrategy; } @@ -124,56 +100,62 @@ public Optional getRawSchema(String subject, SchemaId schemaId) { private Optional extractSchema(Response response, String subject, SchemaId schemaId) { switch (response.getStatusInfo().getFamily()) { - case SUCCESSFUL: + case SUCCESSFUL -> { logger.info("Found schema for subject {} and id {}", subject, schemaId.value()); SchemaRegistryRequestResponse schemaRegistryResponse = response.readEntity(SchemaRegistryRequestResponse.class); return Optional.of(RawSchema.valueOf(schemaRegistryResponse.getSchema())); - case CLIENT_ERROR: + } + case CLIENT_ERROR -> { logger.error("Could not find schema for subject {} and id {}, reason: {}", subject, schemaId.value(), response.getStatus()); return Optional.empty(); - case SERVER_ERROR: - default: + } + default -> { logger.error("Could not find schema for subject {} and id {}, reason: {}", subject, schemaId.value(), response.getStatus()); throw new InternalSchemaRepositoryException(subject, response); + } } } private Optional extractRawSchemaWithMetadata(String subject, String version, Response response) { switch (response.getStatusInfo().getFamily()) { - case SUCCESSFUL: + case SUCCESSFUL -> { logger.info("Found schema metadata for subject {} at version {}", subject, version); SchemaRegistryResponse schemaRegistryResponse = response.readEntity(SchemaRegistryResponse.class); return Optional.of(schemaRegistryResponse.toRawSchemaWithMetadata()); - case CLIENT_ERROR: + } + case CLIENT_ERROR -> { logger.error("Could not find schema metadata for subject {} at version {}, reason: {}", - subject, - version, - response.getStatus()); + subject, + version, + response.getStatus()); return Optional.empty(); - case SERVER_ERROR: - default: + } + default -> { logger.error("Could not find schema metadata for subject {} at version {}, reason: {}", - subject, - version, - response.getStatus()); + subject, + version, + response.getStatus()); throw new InternalSchemaRepositoryException(subject, response); + } } } private Optional extractRawSchemaWithMetadata(String subject, SchemaId schemaId, Response response) { Integer id = schemaId.value(); switch (response.getStatusInfo().getFamily()) { - case SUCCESSFUL: + case SUCCESSFUL -> { logger.info("Found schema metadata for subject {} and id {}", subject, id); SchemaRegistryResponse schemaRegistryResponse = response.readEntity(SchemaRegistryResponse.class); return Optional.of(schemaRegistryResponse.toRawSchemaWithMetadata()); - case CLIENT_ERROR: + } + case CLIENT_ERROR -> { logger.error("Could not find schema metadata for subject {} and id {}, reason: {}", subject, id, response.getStatus()); return Optional.empty(); - case SERVER_ERROR: - default: + } + default -> { logger.error("Could not find schema metadata for subject {} and id {}, reason: {}", subject, id, response.getStatus()); throw new InternalSchemaRepositoryException(subject, response); + } } } @@ -191,165 +173,18 @@ public List getVersions(TopicName topic) { private List extractSchemaVersions(String subject, Response response) { switch (response.getStatusInfo().getFamily()) { - case SUCCESSFUL: + case SUCCESSFUL -> { return Arrays.stream(response.readEntity(Integer[].class)) - .sorted(Comparator.reverseOrder()) - .map(SchemaVersion::valueOf) - .collect(Collectors.toList()); - case CLIENT_ERROR: + .sorted(Comparator.reverseOrder()) + .map(SchemaVersion::valueOf) + .collect(Collectors.toList()); + } + case CLIENT_ERROR -> { logger.error("Could not find schema versions for subject {}, reason: {} {}", subject, response.getStatus(), - response.readEntity(String.class)); + response.readEntity(String.class)); return Collections.emptyList(); - case SERVER_ERROR: - default: - throw new InternalSchemaRepositoryException(subject, response); - } - } - - @Override - public void registerSchema(TopicName topic, RawSchema rawSchema) { - String subject = subjectNamingStrategy.apply(topic); - Response response = schemaRepositoryInstanceResolver.resolve(subject) - .path("subjects") - .path(subject) - .path("versions") - .request() - .accept(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(SchemaRegistryRequestResponse.fromRawSchema(rawSchema), SCHEMA_REPO_CONTENT_TYPE)); - checkSchemaRegistration(subject, response); - } - - private void checkSchemaRegistration(String subject, Response response) { - switch (response.getStatusInfo().getFamily()) { - case SUCCESSFUL: - logger.info("Successful write to schema registry for subject {}", subject); - break; - case CLIENT_ERROR: - throw new BadSchemaRequestException(subject, response); - case SERVER_ERROR: - default: - throw new InternalSchemaRepositoryException(subject, response); - } - } - - @Override - public void deleteAllSchemaVersions(TopicName topic) { - String subject = subjectNamingStrategy.apply(topic); - Response response = schemaRepositoryInstanceResolver.resolve(subject) - .path("subjects") - .path(subject) - .path(deleteSchemaPathSuffix) - .request() - .delete(); - checkSchemaRemoval(subject, response); - } - - private void checkSchemaRemoval(String subject, Response response) { - switch (response.getStatusInfo().getFamily()) { - case SUCCESSFUL: - logger.info("Successful removed schema subject {}", subject); - break; - case CLIENT_ERROR: - throw new BadSchemaRequestException(subject, response); - case SERVER_ERROR: - default: - int statusCode = response.getStatus(); - String responseBody = response.readEntity(String.class); - logger.warn("Could not remove schema of subject {}. Reason: {} {}", subject, statusCode, responseBody); - throw new InternalSchemaRepositoryException(subject, statusCode, responseBody); - } - } - - @Override - public void validateSchema(TopicName topic, RawSchema schema) { - String subject = subjectNamingStrategy.apply(topic); - checkCompatibility(subject, schema); - if (validationEndpointEnabled) { - checkValidation(subject, schema); - } - } - - private void checkCompatibility(String subject, RawSchema schema) { - Response response = schemaRepositoryInstanceResolver.resolve(subject) - .path("compatibility") - .path("subjects") - .path(subject) - .path("versions") - .path("latest") - .request() - .accept(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(SchemaRegistryRequestResponse.fromRawSchema(schema), SCHEMA_REPO_CONTENT_TYPE)); - checkSchemaCompatibilityResponse(subject, response); - } - - private void checkValidation(String subject, RawSchema schema) { - Response response = schemaRepositoryInstanceResolver.resolve(subject) - .path("subjects") - .path(subject) - .path("validation") - .request() - .accept(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(SchemaRegistryRequestResponse.fromRawSchema(schema), SCHEMA_REPO_CONTENT_TYPE)); - - checkValidationResponse(subject, response); - } - - private void checkValidationResponse(String subject, Response response) { - if (response.getStatusInfo().getFamily() == SUCCESSFUL) { - validateSuccessfulValidationResult(subject, response); - } else { - handleErrorResponse(subject, response); - } - } - - private void validateSuccessfulValidationResult(String subject, Response response) { - SchemaRegistryValidationResponse validationResponse = response.readEntity(SchemaRegistryValidationResponse.class); - - if (!validationResponse.isValid()) { - throw new BadSchemaRequestException(subject, BAD_REQUEST.getStatusCode(), - validationResponse.getErrorsMessage()); - } - } - - private void checkSchemaCompatibilityResponse(String subject, Response response) { - if (response.getStatusInfo().getFamily() == SUCCESSFUL) { - validateSuccessfulCompatibilityResult(subject, response); - } else { - handleErrorResponse(subject, response); - } - } - - private void handleErrorResponse(String subject, Response response) { - switch (response.getStatusInfo().getFamily()) { - case CLIENT_ERROR: - if (response.getStatus() == 422) { // for other cases we assume the schema is valid - throw new BadSchemaRequestException(subject, response); - } - break; - case SERVER_ERROR: - default: - int statusCode = response.getStatus(); - String responseBody = response.readEntity(String.class); - logger.warn("Could not validate schema of subject {}. Reason: {} {}", subject, statusCode, responseBody); - throw new InternalSchemaRepositoryException(subject, statusCode, responseBody); - } - } - - private void validateSuccessfulCompatibilityResult(String subject, Response response) { - String validationResultStr = response.readEntity(String.class); - SchemaRegistryCompatibilityResponse validationResponse = - toSchemaRegistryValidationResponse(subject, validationResultStr, response.getStatus()); - if (!validationResponse.isCompatible()) { - throw new BadSchemaRequestException(subject, response.getStatus(), validationResultStr); - } - } - - private SchemaRegistryCompatibilityResponse toSchemaRegistryValidationResponse(String subject, String validationResultStr, int status) { - try { - return objectMapper.readValue(validationResultStr, SchemaRegistryCompatibilityResponse.class); - } catch (IOException e) { - logger.error("Could not parse schema validation response from schema registry", e); - throw new InternalSchemaRepositoryException(subject, status, validationResultStr); + } + default -> throw new InternalSchemaRepositoryException(subject, response); } } } diff --git a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/CachedSchemaVersionsRepositoryTest.groovy b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/CachedSchemaVersionsRepositoryTest.groovy index f02b605e7e..9c95b2d52e 100644 --- a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/CachedSchemaVersionsRepositoryTest.groovy +++ b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/CachedSchemaVersionsRepositoryTest.groovy @@ -19,7 +19,7 @@ class CachedSchemaVersionsRepositoryTest extends Specification { static final SchemaVersion v1 = SchemaVersion.valueOf(1) static final SchemaVersion v0 = SchemaVersion.valueOf(0) - def rawSchemaClient = Stub(RawSchemaClient) + def rawSchemaClient = Stub(RawSchemaAdminClient) def ticker = new FakeTicker() def versionsRepository = new CachedSchemaVersionsRepository(rawSchemaClient, MoreExecutors.newDirectExecutorService(), REFRESH_TIME, EXPIRE_TIME, ticker) diff --git a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/DirectCompiledSchemaRepositoryTest.groovy b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/DirectCompiledSchemaRepositoryTest.groovy index 848f82bf25..d59465bfcb 100644 --- a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/DirectCompiledSchemaRepositoryTest.groovy +++ b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/DirectCompiledSchemaRepositoryTest.groovy @@ -11,7 +11,7 @@ class DirectCompiledSchemaRepositoryTest extends Specification { static final SchemaVersion v1 = SchemaVersion.valueOf(1) static final SchemaId id1 = SchemaId.valueOf(13) - def rawSchemaClient = Stub(RawSchemaClient) + def rawSchemaClient = Stub(RawSchemaAdminClient) def schemaCompiler = { it.value().toUpperCase() } diff --git a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClientTest.groovy similarity index 96% rename from hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy rename to hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClientTest.groovy index 3db9686bb8..f6c337ce7d 100644 --- a/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClientTest.groovy +++ b/hermes-schema/src/test/groovy/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClientTest.groovy @@ -10,7 +10,7 @@ import pl.allegro.tech.hermes.api.RawSchema import pl.allegro.tech.hermes.api.TopicName import pl.allegro.tech.hermes.schema.BadSchemaRequestException import pl.allegro.tech.hermes.schema.InternalSchemaRepositoryException -import pl.allegro.tech.hermes.schema.RawSchemaClient +import pl.allegro.tech.hermes.schema.RawSchemaAdminClient import pl.allegro.tech.hermes.schema.SchemaVersion import pl.allegro.tech.hermes.schema.SubjectNamingStrategy import pl.allegro.tech.hermes.schema.resolver.DefaultSchemaRepositoryInstanceResolver @@ -29,8 +29,8 @@ import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo import static pl.allegro.tech.hermes.schema.SubjectNamingStrategy.qualifiedName -@Subject(SchemaRegistryRawSchemaClient.class) -class SchemaRegistryRawSchemaClientTest extends Specification { +@Subject(SchemaRegistryRawSchemaAdminClient.class) +class SchemaRegistryRawSchemaAdminClientTest extends Specification { @Shared String schemaRegistryContentType = "application/vnd.schemaregistry.v1+json" @@ -46,7 +46,7 @@ class SchemaRegistryRawSchemaClientTest extends Specification { @Shared SubjectNamingStrategy[] subjectNamingStrategies - @Shared RawSchemaClient[] clients + @Shared RawSchemaAdminClient[] clients def setupSpec() { port = Ports.nextAvailable() @@ -60,7 +60,7 @@ class SchemaRegistryRawSchemaClientTest extends Specification { qualifiedName.withNamespacePrefixIf(true, namespace), qualifiedName.withValueSuffixIf(true).withNamespacePrefixIf(true, namespace) ] - clients = subjectNamingStrategies.collect { new SchemaRegistryRawSchemaClient(resolver, new ObjectMapper(), it) } + clients = subjectNamingStrategies.collect { new SchemaRegistryRawSchemaAdminClient(resolver, new ObjectMapper(), it) } } def cleanupSpec() { @@ -347,7 +347,7 @@ class SchemaRegistryRawSchemaClientTest extends Specification { boolean validationEnabled = true String deleteSchemaPathSuffix = "" resolver = new DefaultSchemaRepositoryInstanceResolver(ClientBuilder.newClient(), URI.create("http://localhost:$port")) - def client = new SchemaRegistryRawSchemaClient(resolver, new ObjectMapper(), validationEnabled, deleteSchemaPathSuffix, subjectNamingStrategy) + def client = new SchemaRegistryRawSchemaAdminClient(resolver, new ObjectMapper(), validationEnabled, deleteSchemaPathSuffix, subjectNamingStrategy) wireMock.stubFor(post(schemaCompatibilityUrl(topicName, subjectNamingStrategy)) .willReturn(okResponse() @@ -374,7 +374,7 @@ class SchemaRegistryRawSchemaClientTest extends Specification { boolean validationEnabled = true String deleteSchemaPathSuffix = "" resolver = new DefaultSchemaRepositoryInstanceResolver(ClientBuilder.newClient(), URI.create("http://localhost:$port")) - def client = new SchemaRegistryRawSchemaClient(resolver, new ObjectMapper(), validationEnabled, deleteSchemaPathSuffix, subjectNamingStrategy) + def client = new SchemaRegistryRawSchemaAdminClient(resolver, new ObjectMapper(), validationEnabled, deleteSchemaPathSuffix, subjectNamingStrategy) wireMock.stubFor(post(schemaCompatibilityUrl(topicName, subjectNamingStrategy)) .willReturn(okResponse() From 6867e5f4a2c6bcb5c2edd539159cd4a32ea8dd75 Mon Sep 17 00:00:00 2001 From: Adrian Sobolewski Date: Wed, 18 Oct 2023 14:58:36 +0200 Subject: [PATCH 3/4] 1569 | Import order fix. --- .../environment/HermesServerFactory.java | 20 +++++++++++++++---- .../ReadMetricsTrackingRawSchemaClient.java | 7 ++++--- .../config/SchemaRepositoryConfiguration.java | 9 ++++++--- .../tech/hermes/schema/RawSchemaClient.java | 5 +++-- .../SchemaRegistryRawSchemaAdminClient.java | 3 ++- .../SchemaRegistryRawSchemaClient.java | 15 +++++++------- 6 files changed, 39 insertions(+), 20 deletions(-) diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java index 20b9df303f..ac8ad395bb 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java @@ -55,7 +55,12 @@ static HermesServer provideHermesServer() throws IOException { MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics); TopicsCache topicsCache = new InMemoryTopicsCache(metricsFacade, topic); BrokerMessageProducer brokerMessageProducer = new InMemoryBrokerMessageProducer(); - RawSchemaAdminClient rawSchemaAdminClient = new InMemorySchemaAdminClient(topic.getName(), loadMessageResource("schema"), 1, 1); + RawSchemaAdminClient rawSchemaAdminClient = new InMemorySchemaAdminClient( + topic.getName(), + loadMessageResource("schema"), + 1, + 1 + ); Trackers trackers = new Trackers(Collections.emptyList()); AvroMessageContentWrapper avroMessageContentWrapper = new AvroMessageContentWrapper(Clock.systemDefaultZone()); HttpHandler httpHandler = provideHttpHandler( @@ -85,7 +90,8 @@ static HermesServer provideHermesServer() throws IOException { private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimiter, TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer, - RawSchemaAdminClient rawSchemaAdminClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) { + RawSchemaAdminClient rawSchemaAdminClient, Trackers trackers, + AvroMessageContentWrapper avroMessageContentWrapper) { HeaderPropagationProperties headerPropagationProperties = new HeaderPropagationProperties(); HandlersChainProperties handlersChainProperties = new HandlersChainProperties(); TrackingHeadersExtractor trackingHeadersExtractor = new DefaultTrackingHeaderExtractor(); @@ -100,9 +106,15 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite new MessageContentTypeEnforcer(), new SchemaRepository( new DirectSchemaVersionsRepository(rawSchemaAdminClient), - new DirectCompiledSchemaRepository<>(rawSchemaAdminClient, SchemaCompilersFactory.avroSchemaCompiler()) + new DirectCompiledSchemaRepository<>( + rawSchemaAdminClient, + SchemaCompilersFactory.avroSchemaCompiler() + ) + ), + new DefaultHeadersPropagator( + headerPropagationProperties.isEnabled(), + headerPropagationProperties.getAllowFilter() ), - new DefaultHeadersPropagator(headerPropagationProperties.isEnabled(), headerPropagationProperties.getAllowFilter()), new BenchmarkMessageContentWrapper(avroMessageContentWrapper), Clock.systemDefaultZone(), schemaProperties.isIdHeaderEnabled() diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java index 16fa089de8..21c6f709be 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/schema/ReadMetricsTrackingRawSchemaClient.java @@ -1,8 +1,5 @@ package pl.allegro.tech.hermes.common.schema; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.metric.MetricsFacade; @@ -12,6 +9,10 @@ import pl.allegro.tech.hermes.schema.SchemaId; import pl.allegro.tech.hermes.schema.SchemaVersion; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + public class ReadMetricsTrackingRawSchemaClient implements RawSchemaClient { private final MetricsFacade metricsFacade; protected final RawSchemaClient rawSchemaClient; diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java index 81d8c45929..35d861057a 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SchemaRepositoryConfiguration.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; -import java.net.URI; import org.apache.avro.Schema; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -30,6 +29,8 @@ import pl.allegro.tech.hermes.schema.resolver.DefaultSchemaRepositoryInstanceResolver; import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver; +import java.net.URI; + import static pl.allegro.tech.hermes.schema.SubjectNamingStrategy.qualifiedName; @Configuration @@ -70,8 +71,10 @@ public RawSchemaAdminClient schemaRegistryRawSchemaAdminClient( ObjectMapper objectMapper, SubjectNamingStrategy subjectNamingStrategy ) { - return new SchemaRegistryRawSchemaAdminClient(schemaRepositoryInstanceResolver, objectMapper, - schemaRepositoryProperties.isValidationEnabled(), schemaRepositoryProperties.getDeleteSchemaPathSuffix(), + return new SchemaRegistryRawSchemaAdminClient(schemaRepositoryInstanceResolver, + objectMapper, + schemaRepositoryProperties.isValidationEnabled(), + schemaRepositoryProperties.getDeleteSchemaPathSuffix(), subjectNamingStrategy); } diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java index 511b0a0015..fe86d3fd14 100644 --- a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/RawSchemaClient.java @@ -1,10 +1,11 @@ package pl.allegro.tech.hermes.schema; -import java.util.List; -import java.util.Optional; import pl.allegro.tech.hermes.api.RawSchemaWithMetadata; import pl.allegro.tech.hermes.api.TopicName; +import java.util.List; +import java.util.Optional; + public interface RawSchemaClient { Optional getRawSchemaWithMetadata(TopicName topic, SchemaVersion version); diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java index 32f98c786b..d439e9f68f 100644 --- a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaAdminClient.java @@ -4,7 +4,6 @@ import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.RawSchema; @@ -15,6 +14,8 @@ import pl.allegro.tech.hermes.schema.SubjectNamingStrategy; import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver; +import java.io.IOException; + import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; import static jakarta.ws.rs.core.Response.Status.Family.SUCCESSFUL; diff --git a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java index 3b3fc7e01c..0e0d6ab074 100644 --- a/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java +++ b/hermes-schema/src/main/java/pl/allegro/tech/hermes/schema/confluent/SchemaRegistryRawSchemaClient.java @@ -3,12 +3,6 @@ import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.RawSchema; @@ -21,8 +15,15 @@ import pl.allegro.tech.hermes.schema.SubjectNamingStrategy; import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + /** - * This implementation of RawSchemaClient is compatible with Confluent Schema Registry API + * This implementation of RawSchemaClient is compatible with Confluent Schema Registry API. */ public class SchemaRegistryRawSchemaClient implements RawSchemaClient { From 930693c06949f280f16043723f42ec18d55a0632 Mon Sep 17 00:00:00 2001 From: Adrian Sobolewski Date: Tue, 5 Dec 2023 16:16:59 +0100 Subject: [PATCH 4/4] Import orders fix. --- .../hermes/benchmark/environment/HermesServerFactory.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java index d19f4cc6b2..98c10cc1af 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java @@ -4,9 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.undertow.server.HttpHandler; -import java.io.IOException; -import java.time.Clock; -import java.util.Collections; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.message.wrapper.AvroMessageContentWrapper; import pl.allegro.tech.hermes.common.metric.HermesMetrics; @@ -38,6 +35,10 @@ import pl.allegro.tech.hermes.schema.SchemaRepository; import pl.allegro.tech.hermes.tracker.frontend.Trackers; +import java.io.IOException; +import java.time.Clock; +import java.util.Collections; + import static pl.allegro.tech.hermes.api.ContentType.AVRO; import static pl.allegro.tech.hermes.benchmark.environment.HermesServerEnvironment.loadMessageResource; import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed;