diff --git a/README.md b/README.md index 2d8d31a..e3cc698 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,18 @@ Learn more by following this sequence of Katas: 1. Third Kata: [Importing Pre-Existing Event Data](katas/003_import_preexisting_event_data.md) 1. Fourth Kata: [Build a Basic Dashboard](katas/004_build_a_basic_dashboard.md) +# Ports + +These are the ports that Chorus uses: + +* `4000` - Chorus UI +* `9090` - Search "middleware" that handles queries and User Behavior Insights (UBI) data. +* `2021` - Data Prepper - HTTP source for UBI events +* `4900` - Data Prepper +* `21890` - Data Prepper - OTEL trace source for UBI events +* `9200` - OpenSearch +* `5601` - OpenSearch Dashboards + # Useful Commands for Chorus To start your environment, but still run each command to set up the integrations manually, run: diff --git a/chorus_ui/src/App.js b/chorus_ui/src/App.js index 5f7ba64..5a88dfd 100644 --- a/chorus_ui/src/App.js +++ b/chorus_ui/src/App.js @@ -19,9 +19,8 @@ var UbiPosition = require('./ts/UbiEvent.ts').UbiPosition; //###################################### // global variables const event_server = "http://127.0.0.1:9090"; // Middleware -//const event_server = "http://localhost:2021"; // DataPrepper -//const search_server = "http://localhost:9200"; // OpenSearch const search_server = "http://localhost:9090"; // Proxy queries through Middleware + const search_credentials = "*:*"; const search_index = 'ecommerce' const object_id_field = 'primary_ean' diff --git a/docker-compose.yml b/docker-compose.yml index 3cd6dc4..078ee94 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,8 @@ services: middleware: container_name: middleware build: ./middleware + environment: + OTEL_EXPORT_ENABLED: "false" ports: - "9090:9090" volumes: diff --git a/middleware/app.py b/middleware/app.py index 55ee0b3..14c917d 100644 --- a/middleware/app.py +++ b/middleware/app.py @@ -13,6 +13,7 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +OTEL_EXPORT_ENABLED = os.getenv("OTEL_EXPORT_ENABLED", "false") OTEL_COLLECTOR_ENDPOINT = os.getenv("OTEL_COLLECTOR_ENDPOINT", "http://dataprepper:21890/opentelemetry.proto.collector.trace.v1.TraceService/Export") OPENSEARCH_ENDPOINT = os.getenv("OPENSEARCH_ENDPOINT", "http://opensearch:9200") OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST", "opensearch") @@ -177,58 +178,83 @@ def ubi_events(): # Index the UBI event to OpenSearch. client = OpenSearch(hosts=[{"host": OPENSEARCH_HOST, "port": 9200}]) - # Make OTel traces from UBI events in the request body. + for event in events: + + ubi_query_id = event["query_id"] + # Add back the sensitive information (cost) to the data being sent to the UBI Events datastore. + cost = None + + # couldn't get this to work so doing a more painful approach below. + # ean = event["event_attributes"]["object"]["object_id"] + + event_attributes = event["event_attributes"] + if event_attributes is not None: + obj = event_attributes["object"] + if obj is not None: + ean = obj["object_id"] + if ean is not None: + if f"{ubi_query_id}-{ean}" in cache: + cost = cache[f"{ubi_query_id}-{ean}"] + else: + cost = None - resource = Resource(attributes={ - "service.name": "ubi" - }) + if cost is not None: + event['event_attributes']['cost'] = cost - traceProvider = TracerProvider(resource=resource) - processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=OTEL_COLLECTOR_ENDPOINT)) - traceProvider.add_span_processor(processor) - trace.set_tracer_provider(traceProvider) + # Index directly into ubi_events index + client.index( + index="ubi_events", + body=event, + id=str(uuid.uuid4()), + refresh=True + ) + + # If OTel is enabled, send trace events. + if OTEL_EXPORT_ENABLED == "true": - tracer = trace.get_tracer(__name__) + # Make OTel traces from UBI events in the request body. - for event in events: - - ubi_query_id = event["query_id"] - # Add back the sensitive information (cost) to the data being sent to the UBI Events datastore. - cost = None - - # couldn't get this to work so doing a more painful approach below. - # ean = event["event_attributes"]["object"]["object_id"] - - event_attributes = event["event_attributes"] - if event_attributes is not None: - obj = event_attributes["object"] - if obj is not None: - ean = obj["object_id"] - if ean is not None: - if f"{ubi_query_id}-{ean}" in cache: - cost = cache[f"{ubi_query_id}-{ean}"] - else: - cost = None - - if cost is not None: - event['event_attributes']['cost'] = cost - - - # First we demonstrate indexing directly into ubi_events index - client.index( - index="ubi_events", - body=event, - id=str(uuid.uuid4()), - refresh=True - ) + resource = Resource(attributes={ + "service.name": "ubi" + }) + + traceProvider = TracerProvider(resource=resource) + processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=OTEL_COLLECTOR_ENDPOINT)) + traceProvider.add_span_processor(processor) + trace.set_tracer_provider(traceProvider) + + tracer = trace.get_tracer(__name__) + + for event in events: + + ubi_query_id = event["query_id"] + # Add back the sensitive information (cost) to the data being sent to the UBI Events datastore. + cost = None + + # couldn't get this to work so doing a more painful approach below. + # ean = event["event_attributes"]["object"]["object_id"] - # Now we demonstrate indexing via OTEL into otel_ubi_events index - with tracer.start_as_current_span("ubi_event") as span: + event_attributes = event["event_attributes"] + if event_attributes is not None: + obj = event_attributes["object"] + if obj is not None: + ean = obj["object_id"] + if ean is not None: + if f"{ubi_query_id}-{ean}" in cache: + cost = cache[f"{ubi_query_id}-{ean}"] + else: + cost = None + + if cost is not None: + event['event_attributes']['cost'] = cost + + # Now we demonstrate indexing via OTEL into otel_ubi_events index + with tracer.start_as_current_span("ubi_event") as span: - for key, value in event.items(): - if value is not None and key != "event_attributes": - span.set_attribute("ubi." + key, value) - span.set_attribute("ubi.event_attributes", json.dumps(event['event_attributes'])) + for key, value in event.items(): + if value is not None and key != "event_attributes": + span.set_attribute("ubi." + key, value) + span.set_attribute("ubi.event_attributes", json.dumps(event['event_attributes'])) return '{"status": "submitted"}' @@ -239,4 +265,4 @@ def dump_cache(): if __name__ == "__main__": - app.run(host="0.0.0.0", port=9090, debug=True) + app.run(host="0.0.0.0", port=9090, debug=True) \ No newline at end of file diff --git a/quickstart.sh b/quickstart.sh index 7d87bdb..860af8a 100755 --- a/quickstart.sh +++ b/quickstart.sh @@ -29,6 +29,8 @@ offline_lab=false local_deploy=true stop=false +hostname_or_public_ip="chorus-opensearch-edition.dev.o19s.com" + while [ ! $# -eq 0 ] do case "$1" in @@ -53,7 +55,7 @@ do ;; --online-deployment | -online) local_deploy=false - echo -e "${MAJOR}Configuring Chorus for chorus-opensearch-edition.dev.o19s.com environment\n${RESET}" + echo -e "${MAJOR}Configuring Chorus for ${hostname_or_public_ip} environment\n${RESET}" ;; esac shift @@ -67,9 +69,13 @@ fi if ! $local_deploy; then echo -e "${MAJOR}Updating configuration files for online deploy${RESET}" - sed -i.bu 's/localhost/chorus-opensearch-edition.dev.o19s.com/g' ./chorus_ui/src/Logs.js - sed -i.bu 's/localhost/chorus-opensearch-edition.dev.o19s.com/g' ./chorus_ui/src/App.js - sed -i.bu 's/localhost/chorus-opensearch-edition.dev.o19s.com/g' ./opensearch/wait-for-os.sh + sed -i.bu "s/localhost/${hostname_or_public_ip}/g" ./chorus_ui/src/Logs.js + sed -i.bu "s/localhost/${hostname_or_public_ip}/g" ./chorus_ui/src/App.js + sed -i.bu "s/127.0.0.1/${hostname_or_public_ip}/g" ./chorus_ui/src/App.js + sed -i.bu "s/localhost/${hostname_or_public_ip}/g" ./opensearch/wait-for-os.sh + sed -i.bu "s/localhost/${hostname_or_public_ip}/g" ./middleware/app.py +else + sed -i.bu "s/localhost/opensearch/g" ./middleware/app.py fi if $stop; then