Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery sender #1814

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ scripts/lib/
scripts/pip-selfcheck.json

.DS_Store
.bloop
.metals
.vim
bin
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.compile.nullAnalysis.mode": "automatic"
}
5 changes: 5 additions & 0 deletions hermes-consumers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
}
api group: 'com.google.cloud', name: 'google-cloud-pubsub', version: '1.128.1'
api group: 'org.apache.httpcomponents.core5', name: 'httpcore5', version: '5.2.4'
api(group: 'com.google.cloud', name: 'google-cloud-bigquery', version: '2.27.0')

testImplementation project(':hermes-test-helper')
testImplementation group: 'org.apache.curator', name: 'curator-test', version: versions.curator
Expand All @@ -39,6 +40,10 @@ dependencies {
testImplementation project(':hermes-common')

testImplementation(group: 'org.awaitility', name: 'awaitility-groovy', version: '4.2.1')
testImplementation(group: 'com.jayway.awaitility', name: 'awaitility', version: '1.6.1') {
exclude group: 'com.jayway.jsonpath', module: 'json-path'
}

testImplementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: versions.json2avro

testImplementation group: 'org.spockframework', name: 'spock-core', version: versions.spock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageBatchSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.ProtocolMessageSenderProvider;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.*;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroDataWriterPool;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroMessageTransformer;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonDataWriterPool;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonMessageTransformer;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlepubsub.GooglePubSubMessageSenderProvider;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlepubsub.GooglePubSubMessageTransformerCreator;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlepubsub.GooglePubSubSenderTargetResolver;
Expand Down Expand Up @@ -242,6 +247,22 @@ public ProtocolMessageSenderProvider pubSubMessageSenderProvider(
);
}

@Bean(name = "defaultGoogleBigQueryMessageSenderProvider")
public ProtocolMessageSenderProvider googleBigQueryMessageSenderProvider(
GoogleBigQuerySenderTargetResolver senderTargetResolver,
GoogleBigQueryJsonMessageTransformer jsonMessageTransformer,
GoogleBigQueryAvroMessageTransformer avroMessageTransformer,
GoogleBigQueryJsonDataWriterPool jsonDataWriterPool,
GoogleBigQueryAvroDataWriterPool avroDataWriterPool) {
return new GoogleBigQueryMessageSenderProvider(
senderTargetResolver,
jsonMessageTransformer,
avroMessageTransformer,
jsonDataWriterPool,
avroDataWriterPool
);
}

@Bean
@Conditional(OnGoogleDefaultCredentials.class)
public CredentialsProvider applicationDefaultCredentialsProvider() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pl.allegro.tech.hermes.consumers.config;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.*;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroDataWriterPool;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroMessageTransformer;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroStreamWriterFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroToProtoConverter;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonDataWriterPool;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonMessageTransformer;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonStreamWriterFactory;

import java.io.IOException;

@Configuration
public class GoogleBigQueryConfiguration {

@Bean
public BigQueryWriteSettings bigQueryWriteSettings(CredentialsProvider credentialsProvider) throws IOException {
return BigQueryWriteSettings.create(BigQueryWriteStubSettings
.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build());
}

@Bean
public GoogleBigQuerySenderTargetResolver bigQuerySenderTargetResolver() {
return new GoogleBigQuerySenderTargetResolver();
}

@Bean
public GoogleBigQueryJsonMessageTransformer jsonBigQueryMessageTransformer() {
return new GoogleBigQueryJsonMessageTransformer();
}

@Bean
public GoogleBigQueryAvroMessageTransformer avroGoogleBigQueryMessageTransformer() {
return new GoogleBigQueryAvroMessageTransformer();
}

@Bean
public GoogleBigQueryJsonDataWriterPool jsonDataWriterPool(GoogleBigQueryJsonStreamWriterFactory factory) {
return new GoogleBigQueryJsonDataWriterPool(factory);
}

@Bean
public GoogleBigQueryAvroDataWriterPool avroDataWriterPool(GoogleBigQueryAvroStreamWriterFactory factory) {
return new GoogleBigQueryAvroDataWriterPool(factory);
}

@Bean
public GoogleBigQueryAvroToProtoConverter avroToProtoConverter() {
return new GoogleBigQueryAvroToProtoConverter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pl.allegro.tech.hermes.consumers.consumer.sender;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public interface SenderClient<T> {

void publish(T message, CompletableFuture<MessageSendingResult> resultFuture)
throws IOException, ExecutionException, InterruptedException;

void shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package pl.allegro.tech.hermes.consumers.consumer.sender;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public abstract class SenderClientsPool<T extends SenderTarget, C extends SenderClient> {

private static final Logger logger = LoggerFactory.getLogger(SenderClientsPool.class);

private final Map<T, C> clients = new HashMap<>();
private final Map<T, Integer> counters = new HashMap<>();

public synchronized C acquire(T resolvedTarget) throws IOException {
C client = clients.get(resolvedTarget);
if (client == null) {
client = createClient(resolvedTarget);
}
clients.put(resolvedTarget, client);
Integer counter = counters.getOrDefault(resolvedTarget, 0);
counters.put(resolvedTarget, ++counter);
return client;
}

public synchronized void release(T resolvedTarget) {
Integer counter = counters.getOrDefault(resolvedTarget, 0);
if (counter == 0) {
logger.warn("Attempt to release client that is not acquired");
} else if (counter == 1) {
counters.remove(resolvedTarget);
C client = clients.remove(resolvedTarget);
client.shutdown();
} else if (counter > 1) {
counters.put(resolvedTarget, --counter);
}
}

public synchronized void shutdown() {
clients.values().forEach(SenderClient::shutdown);
clients.clear();
counters.clear();
}

protected abstract C createClient(T resolvedTarget) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package pl.allegro.tech.hermes.consumers.consumer.sender;

public interface SenderTarget {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public SingleRecipientMessageSenderAdapter(CompletableFutureAwareMessageSender a
this.adaptee = adaptee;
}

public CompletableFutureAwareMessageSender getAdaptee() {
return adaptee;
}

@Override
public CompletableFuture<MessageSendingResult> send(Message message) {
return resilientMessageSender.send(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery;

import com.google.api.core.ApiFutureCallback;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class GoogleBigQueryAppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

private final CompletableFuture<MessageSendingResult> resultFuture;

public GoogleBigQueryAppendCompleteCallback(CompletableFuture<MessageSendingResult> resultFuture) {
this.resultFuture = resultFuture;
}

@Override
public void onFailure(Throwable t) {
Exceptions.StorageException storageException = Exceptions.toStorageException(t);
resultFuture.complete(MessageSendingResult.failedResult(Objects.requireNonNullElse(storageException, t)));
}

@Override
public void onSuccess(AppendRowsResponse result) {
resultFuture.complete(MessageSendingResult.succeededResult());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.SenderClient;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public abstract class GoogleBigQueryDataWriter<
T,
S extends AutoCloseable,
F extends GoogleBigQueryStreamWriterFactory<S>> implements SenderClient<T> {

private static final Logger logger = LoggerFactory.getLogger(GoogleBigQueryDataWriter.class);

protected final S streamWriter;

public GoogleBigQueryDataWriter(String streamName, F factory) {
streamWriter = factory.getWriterForStream(streamName);
}

protected abstract ApiFuture<AppendRowsResponse> append(T message) throws Descriptors.DescriptorValidationException, IOException;
protected abstract String getWriterId();
protected abstract String getStreamName();

@Override
public void publish(T message, CompletableFuture<MessageSendingResult> resultFuture) throws IOException, ExecutionException, InterruptedException {
try {
ApiFuture<AppendRowsResponse> appendFuture = append(message);
ApiFutures.addCallback(appendFuture, new GoogleBigQueryAppendCompleteCallback(resultFuture), MoreExecutors.directExecutor());
} catch (Exceptions.AppendSerializationError e) {
logger.warn("Writer {} has failed to append rows to stream {}", getWriterId(), getStreamName(), e);
resultFuture.complete(MessageSendingResult.failedResult(new GoogleBigQueryFailedAppendException(e)));
} catch (Exception e) {
logger.warn("Writer {} has failed to append rows to stream {}", getWriterId(), getStreamName(), e);
resultFuture.complete(MessageSendingResult.failedResult(e));
}
}

@Override
public void shutdown() {
try {
streamWriter.close();
} catch (Exception e) {
logger.error("Error during closing stream writer of id {} and name {}", getWriterId(), getStreamName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery;

import com.google.cloud.bigquery.storage.v1.Exceptions;

public class GoogleBigQueryFailedAppendException extends RuntimeException {

public GoogleBigQueryFailedAppendException(Exceptions.AppendSerializtionError cause) {
super(String.join("\n", cause.getRowIndexToErrorMessage().values()), cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery;

import com.google.common.collect.ImmutableSet;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.CompletableFutureAwareMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.ProtocolMessageSenderProvider;
import pl.allegro.tech.hermes.consumers.consumer.sender.SingleRecipientMessageSenderAdapter;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroDataWriterPool;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroMessageTransformer;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonDataWriterPool;
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonMessageTransformer;

import java.util.Set;

public class GoogleBigQueryMessageSenderProvider implements ProtocolMessageSenderProvider {

public static final String SUPPORTED_PROTOCOL = "googlebigquery";

private final GoogleBigQuerySenderTargetResolver targetResolver;
private final GoogleBigQueryJsonMessageTransformer jsonMessageTransformer;
private final GoogleBigQueryAvroMessageTransformer avroMessageTransformer;
private final GoogleBigQueryAvroDataWriterPool avroDataWriterPool;
private final GoogleBigQueryJsonDataWriterPool jsonDataWriterPool;

public GoogleBigQueryMessageSenderProvider(GoogleBigQuerySenderTargetResolver targetResolver,
GoogleBigQueryJsonMessageTransformer jsonMessageTransformer,
GoogleBigQueryAvroMessageTransformer avroMessageTransformer,
GoogleBigQueryJsonDataWriterPool jsonDataWriterPool,
GoogleBigQueryAvroDataWriterPool avroDataWriterPool) {
this.targetResolver = targetResolver;
this.jsonMessageTransformer = jsonMessageTransformer;
this.jsonDataWriterPool = jsonDataWriterPool;
this.avroMessageTransformer = avroMessageTransformer;
this.avroDataWriterPool = avroDataWriterPool;
}

@Override
public MessageSender create(Subscription subscription, ResilientMessageSender resilientMessageSender) {
GoogleBigQuerySenderTarget target = targetResolver.resolve(subscription.getEndpoint());
CompletableFutureAwareMessageSender sender;
if (subscription.getContentType().equals(ContentType.JSON)) {
sender = new GoogleBigQueryJsonSender(jsonMessageTransformer, target, jsonDataWriterPool);
} else {
sender = new GoogleBigQueryAvroSender(avroMessageTransformer, target, avroDataWriterPool);
}

return new SingleRecipientMessageSenderAdapter(sender, resilientMessageSender);
}

@Override
public Set<String> getSupportedProtocols() {
return ImmutableSet.of(SUPPORTED_PROTOCOL);
}

@Override
public void start() throws Exception {

}

@Override
public void stop() throws Exception {
jsonDataWriterPool.shutdown();
avroDataWriterPool.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery;

import org.json.JSONObject;
import pl.allegro.tech.hermes.consumers.consumer.Message;

public interface GoogleBigQueryMessageTransformer<T> {

T fromHermesMessage(Message message);
}
Loading
Loading