Skip to content

Commit

Permalink
Add streaming BigQuery Loader modules v2
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jun 4, 2024
1 parent f548de3 commit acccb2f
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 334 deletions.
42 changes: 15 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# terraform-google-bigquery-loader-pubsub-ce

A Terraform module which deploys the requisite micro-services for loading BigQuery on Google running on top of Compute Engine. If you want to use a custom image for this deployment you will need to ensure it is based on top of Ubuntu 20.04.
A Terraform module which deploys the BigQuery Loader application on Google running on top of Compute Engine. If you want to use a custom image for this deployment you will need to ensure it is based on top of Ubuntu 20.04.

## Telemetry

Expand All @@ -20,13 +20,7 @@ For details on what information is collected please see this module: https://git

## Usage

This module will deploy three seperate instance groups:

1. `mutator`: Attempts to create the events table if it doesn't exist and then listens for new `types` to update the table with as custom events and entities are tracked
2. `repeater`: Events that were sent with custom `events` and `entities` that have not yet been added to the events table will be re-tried later by the repeater
3. `streamloader`: Core application which pulls data from an Enriched events topic and loads into BigQuery

The mutator is deployed as a `singleton` instance but both the `repeater` and `streamloader` can be scaled horizontally if higher throughput is needed.
The BigQuery Loader reads data from a Snowplow Enriched output PubSub topic and writes in realtime to BigQuery events table.

```hcl
# NOTE: Needs to be fed by the enrich module with valid Snowplow Events
Expand All @@ -49,12 +43,6 @@ resource "google_bigquery_dataset" "pipeline_db" {
location = var.region
}
resource "google_storage_bucket" "dead_letter_bucket" {
name = "bq-loader-dead-letter"
location = var.region
force_destroy = true
}
module "bigquery_loader_pubsub" {
source = "snowplow-devops/bigquery-loader-pubsub-ce/google"
Expand All @@ -69,7 +57,6 @@ module "bigquery_loader_pubsub" {
input_topic_name = module.enriched_topic.name
bad_rows_topic_name = module.bad_rows_topic.name
gcs_dead_letter_bucket_name = google_storage_bucket.dead_letter_bucket.name
bigquery_dataset_id = google_bigquery_dataset.pipeline_db.dataset_id
ssh_key_pairs = []
Expand Down Expand Up @@ -115,19 +102,14 @@ module "bigquery_loader_pubsub" {
| [google_bigquery_dataset_iam_member.dataset_bigquery_data_editor_binding](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_dataset_iam_member) | resource |
| [google_compute_firewall.egress](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource |
| [google_compute_firewall.ingress_ssh](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource |
| [google_compute_firewall.ingress_health_check](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource |
| [google_project_iam_member.sa_bigquery_data_editor](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_logging_log_writer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_pubsub_publisher](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_pubsub_subscriber](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_pubsub_viewer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_storage_object_viewer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_pubsub_subscription.failed_inserts](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource |
| [google_pubsub_subscription.input](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource |
| [google_pubsub_subscription.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource |
| [google_pubsub_topic.failed_inserts](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource |
| [google_pubsub_topic.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource |
| [google_service_account.sa](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource |
| [google_storage_bucket_iam_binding.dead_letter_storage_object_admin_binding](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_binding) | resource |

## Inputs

Expand All @@ -147,31 +129,37 @@ module "bigquery_loader_pubsub" {
| <a name="input_bigquery_partition_column"></a> [bigquery\_partition\_column](#input\_bigquery\_partition\_column) | The partition column to use in the dataset | `string` | `"collector_tstamp"` | no |
| <a name="input_bigquery_require_partition_filter"></a> [bigquery\_require\_partition\_filter](#input\_bigquery\_require\_partition\_filter) | Whether to require a filter on the partition column in all queries | `bool` | `true` | no |
| <a name="input_bigquery_table_id"></a> [bigquery\_table\_id](#input\_bigquery\_table\_id) | The ID of the table within a dataset to load data into (will be created if it doesn't exist) | `string` | `"events"` | no |
| <a name="input_service_account_json_b64"></a> [bigquery\_service\_account\_json\_b64](#input\_bigquery\_service\_account\_json\_b64) | Custom credentials (as base64 encoded service account key) instead of default service account assigned to the loader's compute group | `string` | `""` | no |
| <a name="input_custom_iglu_resolvers"></a> [custom\_iglu\_resolvers](#input\_custom\_iglu\_resolvers) | The custom Iglu Resolvers that will be used by the loader to resolve and validate events | <pre>list(object({<br> name = string<br> priority = number<br> uri = string<br> api_key = string<br> vendor_prefixes = list(string)<br> }))</pre> | `[]` | no |
| <a name="input_default_iglu_resolvers"></a> [default\_iglu\_resolvers](#input\_default\_iglu\_resolvers) | The default Iglu Resolvers that will be used by the loader to resolve and validate events | <pre>list(object({<br> name = string<br> priority = number<br> uri = string<br> api_key = string<br> vendor_prefixes = list(string)<br> }))</pre> | <pre>[<br> {<br> "api_key": "",<br> "name": "Iglu Central",<br> "priority": 10,<br> "uri": "http://iglucentral.com",<br> "vendor_prefixes": []<br> },<br> {<br> "api_key": "",<br> "name": "Iglu Central - Mirror 01",<br> "priority": 20,<br> "uri": "http://mirror01.iglucentral.com",<br> "vendor_prefixes": []<br> }<br>]</pre> | no |
| <a name="input_gcp_logs_enabled"></a> [gcp\_logs\_enabled](#input\_gcp\_logs\_enabled) | Whether application logs should be reported to GCP Logging | `bool` | `true` | no |
| <a name="input_iglu_cache_size"></a> [iglu\_cache\_size](#input\_iglu\_cache\_size) | The size of cache used by Iglu Resolvers | `number` | `500` | no |
| <a name="input_iglu_cache_ttl_seconds"></a> [iglu\_cache\_ttl\_seconds](#input\_iglu\_cache\_ttl\_seconds) | Duration in seconds, how long should entries be kept in Iglu Resolvers cache before they expire | `number` | `600` | no |
| <a name="input_java_opts"></a> [java\_opts](#input\_java\_opts) | Custom JAVA Options | `string` | `"-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"` | no |
| <a name="input_labels"></a> [labels](#input\_labels) | The labels to append to this resource | `map(string)` | `{}` | no |
| <a name="input_machine_type_mutator"></a> [machine\_type\_mutator](#input\_machine\_type\_mutator) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_machine_type_repeater"></a> [machine\_type\_repeater](#input\_machine\_type\_repeater) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_machine_type_streamloader"></a> [machine\_type\_streamloader](#input\_machine\_type\_streamloader) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_machine_type"></a> [machine\_type](#input\_machine\_type) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_ssh_block_project_keys"></a> [ssh\_block\_project\_keys](#input\_ssh\_block\_project\_keys) | Whether to block project wide SSH keys | `bool` | `true` | no |
| <a name="input_ssh_ip_allowlist"></a> [ssh\_ip\_allowlist](#input\_ssh\_ip\_allowlist) | The list of CIDR ranges to allow SSH traffic from | `list(any)` | <pre>[<br> "0.0.0.0/0"<br>]</pre> | no |
| <a name="input_ssh_key_pairs"></a> [ssh\_key\_pairs](#input\_ssh\_key\_pairs) | The list of SSH key-pairs to add to the servers | <pre>list(object({<br> user_name = string<br> public_key = string<br> }))</pre> | `[]` | no |
| <a name="input_subnetwork"></a> [subnetwork](#input\_subnetwork) | The name of the sub-network to deploy within; if populated will override the 'network' setting | `string` | `""` | no |
| <a name="input_target_size_repeater"></a> [target\_size\_repeater](#input\_target\_size\_repeater) | The number of servers to deploy | `number` | `1` | no |
| <a name="input_target_size_streamloader"></a> [target\_size\_streamloader](#input\_target\_size\_streamloader) | The number of servers to deploy | `number` | `1` | no |
| <a name="input_target_size"></a> [target\_size](#input\_target\_size) | The number of servers to deploy | `number` | `1` | no |
| <a name="input_telemetry_enabled"></a> [telemetry\_enabled](#input\_telemetry\_enabled) | Whether or not to send telemetry information back to Snowplow Analytics Ltd | `bool` | `true` | no |
| <a name="input_ubuntu_20_04_source_image"></a> [ubuntu\_20\_04\_source\_image](#input\_ubuntu\_20\_04\_source\_image) | The source image to use which must be based of of Ubuntu 20.04; by default the latest community version is used | `string` | `""` | no |
| <a name="input_user_provided_id"></a> [user\_provided\_id](#input\_user\_provided\_id) | An optional unique identifier to identify the telemetry events emitted by this stack | `string` | `""` | no |
| <a name="input_webhook_collector"></a> [webhook\_collector](#input\_webhook\_collector) | Collector address used to gather monitoring alerts | `string` | `""` | no |
| <a name="input_skip_schemas"></a> [skip\_schemas](#input\_skip\_schemas) | The list of schema keys which should be skipped (not loaded) to the warehouse | `list(string)` | `[]` | no |
| <a name="input_healthcheck_enabled"></a> [healthcheck\_enabled](#input\_healthcheck\_enabled) | Whether or not to enable health check probe for GCP instance group | `bool` | `true` | no |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_health_check_id"></a> [health\_check\_id](#output\_health\_check\_id) | Identifier for the health check on the instance group |
| <a name="output_health_check_self_link"></a> [health\_check\_self\_link](#output\_health\_check\_self\_link) | The URL for the health check on the instance group |
| <a name="output_instance_group_url"></a> [instance\_group\_url](#output\_instance\_group\_url) | The full URL of the instance group created by the manager |
| <a name="output_manager_id"></a> [manager\_id](#output\_manager\_id) | Identifier for the instance group manager |
| <a name="output_manager_self_link"></a> [manager\_self\_link](#output\_manager\_self\_link) | The URL for the instance group manager |
| <a name="output_named_port_http"></a> [named\_port\_http](#output\_named\_port\_http) | The name of the port exposed by the instance group |
| <a name="output_named_port_value"></a> [named\_port\_value](#output\_named\_port\_value) | The named port value (e.g. 8080) |

# Copyright and license

Expand Down
Loading

0 comments on commit acccb2f

Please sign in to comment.