diff --git a/.github/workflows/run-lib-injection.yml b/.github/workflows/run-lib-injection.yml index 9b44d08997..d547471f3f 100644 --- a/.github/workflows/run-lib-injection.yml +++ b/.github/workflows/run-lib-injection.yml @@ -28,6 +28,7 @@ jobs: matrix: ${{ steps.compute-matrix.outputs.matrix }} matrix_supported_langs: ${{ steps.compute-matrix.outputs.matrix_supported_langs }} matrix_profiling_supported: ${{ steps.compute-matrix.outputs.matrix_profiling_supported }} + matrix_skip_basic: ${{ steps.compute-matrix.outputs.matrix_skip_basic }} init_image: ${{ steps.compute-matrix.outputs.init_image }} steps: - name: Compute matrix @@ -41,7 +42,9 @@ jobs: "cpp": [], "dotnet": [{"name":"dd-lib-dotnet-init-test-app","supported":"true"}], "golang": [], - "java": [{"name":"dd-lib-java-init-test-app","supported":"true"},{"name":"jdk7-app","supported":"false"}], + "java": [{"name":"dd-lib-java-init-test-app","supported":"true"}, + {"name":"jdk7-app","supported":"false"}, + {"name":"dd-djm-spark-test-app", "supported":"true", "skip-profiling":"true", "skip-basic":"true"}],], "nodejs": [{"name":"sample-app","supported":"true"},{"name":"sample-app-node13","supported":"false"}], "php": [], "python": [{"name":"dd-lib-python-init-test-django","supported":"true"}, @@ -80,11 +83,14 @@ jobs: #Only supported weblog variants results_supported_langs = [] results_profiling_supported = [] + results_skip_basic = [] for weblog in weblogs["${{ inputs.library }}"]: if weblog["supported"] == "true": results_supported_langs.append(weblog["name"]) if "skip-profiling" not in weblog or weblog["skip-profiling"] != "true": results_profiling_supported.append(weblog["name"]) + if "skip-basic" in weblog and weblog["skip-basic"] == "true": + results_skip_basic.append(weblog["name"]) #Use the latest init image for prod version, latest_snapshot init image for dev version if "${{ inputs.version }}" == 'prod': @@ -97,11 +103,13 @@ jobs: print(f'init_image={json.dumps(result_init_image)}', file=fh) print(f'matrix_supported_langs={json.dumps(results_supported_langs)}', file=fh) print(f'matrix_profiling_supported={json.dumps(results_profiling_supported)}', file=fh) + print(f'matrix_skip_basic={json.dumps(results_skip_basic)}', file=fh) print(json.dumps(result, indent=2)) print(json.dumps(result_init_image, indent=2)) print(json.dumps(results_supported_langs, indent=2)) print(json.dumps(results_profiling_supported, indent=2)) + print(json.dumps(results_skip_basic, indent=2)) lib-injection-init-image-validator: if: inputs.library == 'dotnet' || inputs.library == 'java' || inputs.library == 'python' || inputs.library == 'ruby' || inputs.library == 'nodejs' @@ -231,6 +239,7 @@ jobs: - name: Kubernetes lib-injection tests id: k8s-lib-injection-tests + if: ${{ !contains(fromJson(needs.compute-matrix.outputs.matrix_skip_basic), matrix.weblog) }} run: ./run.sh K8S_LIBRARY_INJECTION_BASIC - name: Kubernetes lib-injection profiling tests @@ -238,6 +247,11 @@ jobs: if: ${{ contains(fromJson(needs.compute-matrix.outputs.matrix_profiling_supported), matrix.weblog) }} run: ./run.sh K8S_LIBRARY_INJECTION_PROFILING + - name: Kubernetes lib-injection DJM tests + id: k8s-lib-injection-tests-djm + if: ${{ matrix.weblog == 'dd-djm-spark-test-app' }} + run: ./run.sh K8S_LIBRARY_INJECTION_DJM + - name: Compress logs id: compress_logs if: always() && steps.build.outcome == 'success' diff --git a/lib-injection/build/docker/java/dd-djm-spark-test-app/Dockerfile b/lib-injection/build/docker/java/dd-djm-spark-test-app/Dockerfile new file mode 100644 index 0000000000..96cc099bd5 --- /dev/null +++ b/lib-injection/build/docker/java/dd-djm-spark-test-app/Dockerfile @@ -0,0 +1,11 @@ +FROM apache/spark:3.4.4 + +WORKDIR /opt/spark/work-dir + +USER root +COPY launch.sh /opt/spark/work-dir/launch.sh +RUN chown spark:spark /opt/spark/work-dir/launch.sh +RUN chmod +x /opt/spark/work-dir/launch.sh +USER spark + +CMD ["/opt/spark/work-dir/launch.sh"] diff --git a/lib-injection/build/docker/java/dd-djm-spark-test-app/launch.sh b/lib-injection/build/docker/java/dd-djm-spark-test-app/launch.sh new file mode 100644 index 0000000000..cc18052139 --- /dev/null +++ b/lib-injection/build/docker/java/dd-djm-spark-test-app/launch.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# Submit a example spark job with DJM enabled +$SPARK_HOME/bin/spark-submit \ +--class org.apache.spark.examples.SparkPi \ + --master k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT \ + --conf spark.kubernetes.container.image=apache/spark:3.4.4 \ + --deploy-mode cluster \ + --conf spark.kubernetes.namespace=default \ + --conf spark.kubernetes.executor.deleteOnTermination=false \ + --conf spark.kubernetes.driver.label.admission.datadoghq.com/enabled=true \ + --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ + --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \ + --conf spark.kubernetes.driver.annotation.admission.datadoghq.com/java-lib.version=latest \ + --conf spark.kubernetes.driverEnv.DD_APM_INSTRUMENTATION_DEBUG=true \ + --conf spark.kubernetes.driver.annotation.admission.datadoghq.com/apm-inject.version=latest \ + --conf spark.driver.extraJavaOptions="-Ddd.integrations.enabled=false -Ddd.data.jobs.enabled=true -Ddd.service=spark-pi-example -Ddd.env=test -Ddd.version=0.1.0 -Ddd.tags=team:djm" \ + --conf spark.kubernetes.driverEnv.HADOOP_HOME=/opt/hadoop/ \ + local:///opt/spark/examples/jars/spark-examples.jar 20 + +# start a long running server to keep the web-log up. +python3 -m http.server ${SERVER_PORT:-18080} \ No newline at end of file diff --git a/tests/k8s_lib_injection/conftest.py b/tests/k8s_lib_injection/conftest.py index 235e1cf17b..b83f6e3789 100644 --- a/tests/k8s_lib_injection/conftest.py +++ b/tests/k8s_lib_injection/conftest.py @@ -96,12 +96,13 @@ def deploy_test_agent(self): def deploy_agent(self): self.test_agent.deploy_agent() - def deploy_weblog_as_pod(self, with_admission_controller=True, use_uds=False, env=None): + def deploy_weblog_as_pod(self, with_admission_controller=True, use_uds=False, env=None, service_account=None): if with_admission_controller: - self.test_weblog.install_weblog_pod_with_admission_controller(env=env) + self.test_weblog.install_weblog_pod_with_admission_controller(env=env, service_account=service_account) else: self.test_weblog.install_weblog_pod_without_admission_controller(use_uds, env=env) def export_debug_info(self): self.test_agent.export_debug_info() self.test_weblog.export_debug_info() + diff --git a/tests/k8s_lib_injection/test_k8s_djm_with_ssi.py b/tests/k8s_lib_injection/test_k8s_djm_with_ssi.py new file mode 100644 index 0000000000..ade326cd26 --- /dev/null +++ b/tests/k8s_lib_injection/test_k8s_djm_with_ssi.py @@ -0,0 +1,62 @@ +import time + +import requests +import json + +from utils import scenarios, features, context, irrelevant +from utils.tools import logger +from utils import scenarios, features +from utils.k8s_lib_injection.k8s_command_utils import execute_command_sync + +@features.k8s_admission_controller +@scenarios.k8s_library_injection_djm +@irrelevant(condition=(context.library!="java"), reason="Data Jobs Monitoring requires Java library only.") +@irrelevant(condition=(context.weblog_variant!="dd-djm-spark-test-app"), reason="Data Jobs Monitoring tests are only applicable when using dd-djm-spark-test-app web-log variant.") +class TestK8sDJMWithSSI: + """ This test case validates java lib injection for Data Jobs Monitoring on k8s. + The tracer is injected using admission controller via annotations on submitted Spark application. + We then use the dev test agent to check if the Spark application is instrumented. + """ + + def _get_dev_agent_traces(self, k8s_kind_cluster, retry=10): + for _ in range(retry): + logger.info(f"[Check traces] Checking traces:") + response = requests.get( + f"http://{k8s_kind_cluster.cluster_host_name}:{k8s_kind_cluster.get_agent_port()}/test/traces" + ) + traces_json = response.json() + if len(traces_json) > 0: + logger.debug(f"Test traces response: {traces_json}") + return traces_json + time.sleep(2) + return [] + + def _get_spark_application_traces(self, test_k8s_instance): + traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster) + logger.debug(f"Traces received: {traces_json}") + return [ + trace for trace in traces_json + if any(span.get("name") == "spark.application" and span.get("type") == "spark" for span in trace) + ] + + def test_spark_instrumented_with_ssi(self, test_k8s_instance): + logger.info( + f"Launching test test_spark_instrumented_with_ssi: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]" + ) + + # create service account for launching spark application in k8s + execute_command_sync(f"kubectl create serviceaccount spark", test_k8s_instance.k8s_kind_cluster) + execute_command_sync(f"kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default", test_k8s_instance.k8s_kind_cluster) + + test_k8s_instance.deploy_test_agent() + test_k8s_instance.deploy_datadog_cluster_agent() + test_k8s_instance.deploy_weblog_as_pod(service_account="spark") + + spark_traces = self._get_spark_application_traces(test_k8s_instance) + + logger.info(f"Spark application traces received: {spark_traces}") + with open(f"{test_k8s_instance.output_folder}/spark_traces.json", "w") as f: + f.write(json.dumps(spark_traces, indent=4)) + assert len(spark_traces) > 0, "No Data Jobs Monitoring Spark application traces found" + + logger.info(f"Test test_spark_instrumented_with_ssi finished") diff --git a/utils/_context/_scenarios/__init__.py b/utils/_context/_scenarios/__init__.py index 37d4cacaf8..28993c17b3 100644 --- a/utils/_context/_scenarios/__init__.py +++ b/utils/_context/_scenarios/__init__.py @@ -698,6 +698,13 @@ def all_endtoend_scenarios(test_object): github_workflow="libinjection", scenario_groups=[ScenarioGroup.ALL, ScenarioGroup.LIB_INJECTION], ) + + k8s_library_injection_djm = KubernetesScenario( + "K8S_LIBRARY_INJECTION_DJM", + doc="Kubernetes Instrumentation with Data Jobs Monitoring", + github_workflow="libinjection", + scenario_groups=[ScenarioGroup.ALL, ScenarioGroup.LIB_INJECTION], + ) k8s_library_injection_profiling = KubernetesScenario( "K8S_LIBRARY_INJECTION_PROFILING", diff --git a/utils/k8s_lib_injection/k8s_weblog.py b/utils/k8s_lib_injection/k8s_weblog.py index 946b087c6b..eddc9641b9 100644 --- a/utils/k8s_lib_injection/k8s_weblog.py +++ b/utils/k8s_lib_injection/k8s_weblog.py @@ -36,7 +36,7 @@ def configure(self, k8s_kind_cluster, k8s_wrapper): self.k8s_wrapper = k8s_wrapper self.logger = k8s_logger(self.output_folder, self.test_name, "k8s_logger") - def _get_base_weblog_pod(self, env=None): + def _get_base_weblog_pod(self, env=None, service_account=None): """ Installs a target app for manual library injection testing. It returns when the app pod is ready.""" @@ -106,15 +106,17 @@ def _get_base_weblog_pod(self, env=None): containers.append(container1) - pod_spec = client.V1PodSpec(containers=containers) + pod_spec = client.V1PodSpec( + containers=containers, + service_account=service_account) pod_body = client.V1Pod(api_version="v1", kind="Pod", metadata=pod_metadata, spec=pod_spec) self.logger.info("[Deploy weblog] Weblog pod configuration done.") return pod_body - def install_weblog_pod_with_admission_controller(self, env=None): + def install_weblog_pod_with_admission_controller(self, env=None, service_account=None): self.logger.info("[Deploy weblog] Installing weblog pod using admission controller") - pod_body = self._get_base_weblog_pod(env=env) + pod_body = self._get_base_weblog_pod(env=env, service_account=service_account) self.k8s_wrapper.create_namespaced_pod(body=pod_body) self.logger.info("[Deploy weblog] Weblog pod using admission controller created. Waiting for it to be ready!") self.wait_for_weblog_ready_by_label_app("my-app", timeout=200)