Skip to content

Commit

Permalink
Implement KeyStrategy interface
Browse files Browse the repository at this point in the history
  • Loading branch information
wavejumper committed Nov 12, 2024
1 parent ca8cb95 commit dd4dc7b
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 34 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,25 @@ In your application, just before you start your KafkaStreams instance:

```java
import io.factorhouse.kpow.StreamsRegistry;
import io.factorhouse.kpow.key_strategies.ClusterIDKeyStrategy;

// Your Kafka Streams topology
Topology topology = createMyTopology();
Topology topology = createMyTopology();

// Your Kafka Streams config
Properties props = new createMyStreamProperties();

// Your Kafka Streams instance
KafkaStreams streams = new KafkaStreams(topology, props);
KafkaStreams streams = new KafkaStreams(topology, props);

// Create a Kpow StreamsRegistry
StreamsRegistry registry = new StreamsRegistry(props);

// Specify the key strategy when writing metrics to the internal Kafka topic
KeyStrategy keyStrategy = new ClusterIDKeyStrategy(props);

// Register your KafkaStreams and Topology instances with the StreamsRegistry
registry.register(streams, topology);
registry.register(streams, topology, keyStrategy);

// Start your Kafka Streams application
streams.start();
Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
:dependencies [[org.clojure/clojure "1.11.3"]
[com.cognitect/transit-clj "1.0.333"]
[org.clojure/tools.logging "1.3.0"]
[org.apache.kafka/kafka-clients "3.6.1" :scope "provided"]
[org.apache.kafka/kafka-streams "3.6.1" :scope "provided"]]
:uberjar {:prep-tasks ["clean" "javac" "compile"]
:aot :all}
Expand Down
53 changes: 33 additions & 20 deletions src/clojure/io/factorhouse/kpow/agent.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojure.core.protocols :as p])
(:import (java.util UUID)
(:import (io.factorhouse.kpow.key_strategies KeyStrategy Taxon)
(java.util UUID)
(java.util.concurrent Executors TimeUnit ThreadFactory)
(org.apache.kafka.clients.producer Producer ProducerRecord)
(org.apache.kafka.common MetricName)
Expand All @@ -16,6 +17,15 @@
{:topic "__oprtr_snapshot_state"})

(extend-protocol p/Datafiable
Taxon
(datafy [v]
(into []
(filter identity)
[(keyword (.getDomain v))
(.getId v)
(keyword "kafka" (.getObject v))
(.getObjectId v)]))

KeyValue
(datafy [kv]
{:key (.key kv)
Expand Down Expand Up @@ -109,46 +119,46 @@
metrics)))

(defn snapshot-send
[{:keys [snapshot-topic ^Producer producer snapshot-id application-id job-id client-id captured]} data]
(let [snapshot {:type :kafka/streams-agent
[{:keys [snapshot-topic ^Producer producer taxon application-id job-id client-id captured]} data]
(let [taxon (p/datafy taxon)
snapshot {:type :kafka/streams-agent
:application-id application-id
:client-id client-id
:captured captured
:data data
:job/id job-id
:snapshot/id snapshot-id}
taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]
:snapshot/id {:domain (first taxon) :id (second taxon)}}
record (ProducerRecord. (:topic snapshot-topic) taxon snapshot)]
(.get (.send producer record))))

(defn metrics-send
[{:keys [snapshot-topic producer snapshot-id application-id job-id client-id captured]} metrics]
(let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]]
[{:keys [snapshot-topic producer taxon application-id job-id client-id captured]} metrics]
(let [taxon (p/datafy taxon)]
(doseq [data (partition-all 50 metrics)]
(let [value {:type :kafka/streams-agent-metrics
:application-id application-id
:client-id client-id
:captured captured
:data (vec data)
:job/id job-id
:snapshot/id snapshot-id}
:snapshot/id {:domain (first taxon) :id (second taxon)}}
record (ProducerRecord. (:topic snapshot-topic) taxon value)]
(.get (.send producer record))))
(log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id)))

(defn plan-send
[{:keys [snapshot-topic producer snapshot-id job-id captured]}]
(let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]
[{:keys [snapshot-topic producer job-id captured taxon]}]
(let [taxon (p/datafy taxon)
plan {:type :observation/plan
:captured captured
:snapshot/id snapshot-id
:snapshot/id {:domain (first taxon) :id (second taxon)}
:job/id job-id
:data {:type :observe/streams-agent}}
record (ProducerRecord. (:topic snapshot-topic) taxon plan)]
(.get (.send producer record))))

(defn snapshot-telemetry
[{:keys [streams ^Topology topology metrics-filter] :as ctx}]
[{:keys [streams ^Topology topology metrics-filter ^KeyStrategy key-strategy] :as ctx}]
(let [metrics (metrics streams)]
(if (empty? metrics)
(log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?")
Expand All @@ -157,11 +167,12 @@
snapshot {:topology topology :state state}
client-id (client-id metrics)
application-id (application-id metrics)
taxon (.getTaxon key-strategy client-id application-id)
ctx (assoc ctx
:captured (System/currentTimeMillis)
:client-id client-id
:application-id application-id
:snapshot-id {:domain :streams :id client-id})]
:taxon taxon)]
(when (nil? application-id)
(throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry.")))
(when (nil? client-id)
Expand All @@ -181,8 +192,8 @@
:snapshot-topic snapshot-topic
:producer producer
:metrics-filter metrics-filter}]
(doseq [[id [streams topology]] @registered-topologies]
(try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology))]
(doseq [[id [streams topology key-strategy]] @registered-topologies]
(try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology :key-strategy key-strategy))]
(Thread/sleep 2000)
(plan-send next-ctx))
(catch Throwable e
Expand All @@ -202,9 +213,9 @@
(log/info "Kpow: starting registry")
(let [registered-topologies (atom {})
pool (Executors/newSingleThreadScheduledExecutor thread-factory)
register-fn (fn [streams topology]
register-fn (fn [streams topology key-strategy]
(let [id (str (UUID/randomUUID))]
(swap! registered-topologies assoc id [streams topology])
(swap! registered-topologies assoc id [streams topology key-strategy])
id))
latch (promise)
task (snapshot-task snapshot-topic producer registered-topologies metrics-filter latch)
Expand All @@ -227,12 +238,14 @@

(defn init-registry
[producer metrics-filter]
(start-registry {:snapshot-topic kpow-snapshot-topic :producer producer :metrics-filter metrics-filter}))
(start-registry {:snapshot-topic kpow-snapshot-topic
:producer producer
:metrics-filter metrics-filter}))

(defn register
[agent streams topology]
[agent streams topology key-strategy]
(when-let [register-fn (:register agent)]
(let [id (register-fn streams topology)]
(let [id (register-fn streams topology key-strategy)]
(log/infof "Kpow: registring new streams agent %s" id)
id)))

Expand Down
15 changes: 8 additions & 7 deletions src/java/io/factorhouse/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import clojure.java.api.Clojure;
import clojure.lang.IFn;
import io.factorhouse.kpow.key_strategies.KeyStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -86,21 +87,21 @@ public StreamsRegistry(Properties props, MetricFilter metricsFilter) {
agent = agentFn.invoke(producer, metricsFilter);
}

public StreamsRegistry(Properties props) {
this(props, StreamsRegistry.defaultMetricFilter());
}

public static MetricFilter defaultMetricFilter() {
return new MetricFilter()
.acceptNameStartsWith("foo")
.deny();
}

public StreamsRegistry(Properties props) {
this(props, StreamsRegistry.defaultMetricFilter());
}

public StreamsAgent register(KafkaStreams streams, Topology topology) {
public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register");
String id = (String) registerFn.invoke(agent, streams, topology);
String id = (String) registerFn.invoke(agent, streams, topology, keyStrategy);
if (id != null) {
return new StreamsAgent(id);
} else {
Expand All @@ -124,4 +125,4 @@ public void close() {
IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry");
closeFn.invoke(agent);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.factorhouse.kpow.key_strategies;

public class ClientIDKeyStrategy implements KeyStrategy {
public ClientIDKeyStrategy() {}

@Override
public Taxon getTaxon(String clientId, String applicationId) {
return new Taxon("streams", clientId, "streams-agent", null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.factorhouse.kpow.key_strategies;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ClusterIDKeyStrategy implements KeyStrategy {
private final String clusterID;

public ClusterIDKeyStrategy(Properties props) throws InterruptedException, ExecutionException {
try (AdminClient adminClient = AdminClient.create(props)) {
DescribeClusterResult describeClusterResult = adminClient.describeCluster();
this.clusterID = describeClusterResult.clusterId().get();
}
}

@Override
public Taxon getTaxon(String clientId, String applicationId) {
return new Taxon("streams", clusterID, "streams-agent-cid", clientId);
}
}
5 changes: 5 additions & 0 deletions src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.factorhouse.kpow.key_strategies;

public interface KeyStrategy {
Taxon getTaxon(String clientId, String applicationId);
}
14 changes: 14 additions & 0 deletions src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.factorhouse.kpow.key_strategies;

public class ManualKeyStrategy implements KeyStrategy {
private final String envName;

public ManualKeyStrategy(String envName) {
this.envName = envName;
}

@Override
public Taxon getTaxon(String clientId, String applicationId) {
return new Taxon("streams", envName, "streams-agent-m", clientId);
}
}
32 changes: 32 additions & 0 deletions src/java/io/factorhouse/kpow/key_strategies/Taxon.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.factorhouse.kpow.key_strategies;

public class Taxon {
private final String domain;
private final String id;

private final String object;
private final String objectId;

public Taxon(String domain, String id, String object, String objectId) {
this.id = id;
this.domain = domain;
this.object = object;
this.objectId = objectId;
}

public String getDomain() {
return domain;
}

public String getObject() {
return object;
}

public String getObjectId() {
return objectId;
}

public String getId() {
return id;
}
}
8 changes: 5 additions & 3 deletions test/io/factorhouse/agent_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
(java.util Properties)
(org.apache.kafka.clients.producer Producer)
(org.apache.kafka.common Metric MetricName)
(org.apache.kafka.streams KafkaStreams$State StreamsBuilder Topology)))
(org.apache.kafka.streams KafkaStreams$State StreamsBuilder Topology)
(io.factorhouse.kpow.key_strategies ClientIDKeyStrategy)))

(defn ^Properties ->props [m]
(let [props (Properties.)]
Expand Down Expand Up @@ -86,7 +87,8 @@
(mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0)
(mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx")
(mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)])
(test-topology))]
(test-topology)
(ClientIDKeyStrategy.))]

(is agent)

Expand Down Expand Up @@ -123,4 +125,4 @@

(is (agent/unregister registry agent))

(is (empty? (agent/close-registry registry)))))
(is (empty? (agent/close-registry registry)))))

0 comments on commit dd4dc7b

Please sign in to comment.