Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal for platform feature #340

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import pl.allegro.tech.servicemesh.envoycontrol.groups.EmptyCustomGroupDataMapper
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.GroupChangeWatcher
import pl.allegro.tech.servicemesh.envoycontrol.groups.MetadataNodeGroup
Expand Down Expand Up @@ -54,21 +55,21 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class ControlPlane private constructor(
class ControlPlane<T> private constructor(
val grpcServer: Server,
val snapshotUpdater: SnapshotUpdater,
val nodeGroup: NodeGroup<Group>,
val cache: SnapshotCache<Group, Snapshot>,
val nodeGroup: NodeGroup<Group<T>>,
val cache: SnapshotCache<Group<T>, Snapshot>,
private val changes: Flux<MultiClusterState>
) : AutoCloseable {

private var servicesDisposable: Disposable? = null

companion object {
fun builder(
fun <T> builder(
properties: EnvoyControlProperties,
meterRegistry: MeterRegistry
) = ControlPlaneBuilder(properties, meterRegistry)
) = ControlPlaneBuilder<T>(properties, meterRegistry)
}

fun start() {
Expand All @@ -84,7 +85,7 @@ class ControlPlane private constructor(
grpcServer.awaitTermination()
}

class ControlPlaneBuilder(
class ControlPlaneBuilder<T>(
val properties: EnvoyControlProperties,
val meterRegistry: MeterRegistry
) {
Expand All @@ -99,11 +100,12 @@ class ControlPlane private constructor(
var envoyHttpFilters: EnvoyHttpFilters = EnvoyHttpFilters.emptyFilters
var snapshotChangeAuditor: SnapshotChangeAuditor = NoopSnapshotChangeAuditor

var nodeGroup: NodeGroup<Group> = MetadataNodeGroup(
properties = properties.envoy.snapshot
var nodeGroup: NodeGroup<Group<T>> = MetadataNodeGroup(
properties = properties.envoy.snapshot,
EmptyCustomGroupDataMapper()
)

fun build(changes: Flux<MultiClusterState>): ControlPlane {
fun build(changes: Flux<MultiClusterState>): ControlPlane<T> {
if (grpcServerExecutor == null) {
grpcServerExecutor = buildThreadPoolExecutor()
}
Expand Down Expand Up @@ -242,8 +244,8 @@ class ControlPlane private constructor(
}

private fun buildSnapshotCollectingCallback(
cache: SimpleCache<Group>
): SnapshotCollectingCallback<Group, Snapshot> {
cache: SimpleCache<Group<T>>
): SnapshotCollectingCallback<Group<T>, Snapshot> {
val cleanupProperties = properties.server.snapshotCleanup
return SnapshotCollectingCallback(
cache,
Expand Down Expand Up @@ -298,57 +300,57 @@ class ControlPlane private constructor(
)
}

fun withNodeGroup(nodeGroup: NodeGroup<Group>): ControlPlaneBuilder {
fun withNodeGroup(nodeGroup: NodeGroup<Group<T>>): ControlPlaneBuilder<T> {
this.nodeGroup = nodeGroup
return this
}

fun withGrpcServerExecutor(executor: Executor): ControlPlaneBuilder {
fun withGrpcServerExecutor(executor: Executor): ControlPlaneBuilder<T> {
grpcServerExecutor = executor
return this
}

fun withNioEventLoopExecutor(executor: Executor): ControlPlaneBuilder {
fun withNioEventLoopExecutor(executor: Executor): ControlPlaneBuilder<T> {
nioEventLoopExecutor = executor
return this
}

fun withNioBossEventLoopExecutor(executor: Executor): ControlPlaneBuilder {
fun withNioBossEventLoopExecutor(executor: Executor): ControlPlaneBuilder<T> {
nioBossEventLoopExecutor = executor
return this
}

fun withExecutorGroup(executor: ExecutorGroup): ControlPlaneBuilder {
fun withExecutorGroup(executor: ExecutorGroup): ControlPlaneBuilder<T> {
executorGroup = executor
return this
}

fun withGlobalSnapshotExecutor(executor: Executor): ControlPlaneBuilder {
fun withGlobalSnapshotExecutor(executor: Executor): ControlPlaneBuilder<T> {
globalSnapshotExecutor = executor
return this
}

fun withGlobalSnapshotAuditExecutor(executor: Executor): ControlPlaneBuilder {
fun withGlobalSnapshotAuditExecutor(executor: Executor): ControlPlaneBuilder<T> {
globalSnapshotAuditExecutor = executor
return this
}

fun withSnapshotChangeAuditor(snapshotChangeAuditor: SnapshotChangeAuditor): ControlPlaneBuilder {
fun withSnapshotChangeAuditor(snapshotChangeAuditor: SnapshotChangeAuditor): ControlPlaneBuilder<T> {
this.snapshotChangeAuditor = snapshotChangeAuditor
return this
}

fun withGroupSnapshotParallelExecutor(executorSupplier: () -> Executor): ControlPlaneBuilder {
fun withGroupSnapshotParallelExecutor(executorSupplier: () -> Executor): ControlPlaneBuilder<T> {
groupSnapshotParallelExecutorSupplier = executorSupplier
return this
}

fun withMetrics(metrics: EnvoyControlMetrics): ControlPlaneBuilder {
fun withMetrics(metrics: EnvoyControlMetrics): ControlPlaneBuilder<T> {
this.metrics = metrics
return this
}

fun withEnvoyHttpFilters(envoyHttpFilters: EnvoyHttpFilters): ControlPlaneBuilder {
fun withEnvoyHttpFilters(envoyHttpFilters: EnvoyHttpFilters): ControlPlaneBuilder<T> {
this.envoyHttpFilters = envoyHttpFilters
return this
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
package pl.allegro.tech.servicemesh.envoycontrol.groups

sealed class Group {
abstract val communicationMode: CommunicationMode
abstract val serviceName: String
abstract val discoveryServiceName: String?
abstract val proxySettings: ProxySettings
abstract val listenersConfig: ListenersConfig?
sealed interface Group<T> {
val communicationMode: CommunicationMode
val serviceName: String
val discoveryServiceName: String?
val proxySettings: ProxySettings
val listenersConfig: ListenersConfig?
val customData: T?
}

data class ServicesGroup(
data class ServicesGroup<T>(
override val communicationMode: CommunicationMode,
override val serviceName: String = "",
override val discoveryServiceName: String? = null,
override val proxySettings: ProxySettings = ProxySettings(),
override val listenersConfig: ListenersConfig? = null
) : Group()
override val listenersConfig: ListenersConfig? = null,
override val customData: T?
) : Group<T>

data class AllServicesGroup(
data class AllServicesGroup<T>(
override val communicationMode: CommunicationMode,
override val serviceName: String = "",
override val discoveryServiceName: String? = null,
override val proxySettings: ProxySettings = ProxySettings(),
override val listenersConfig: ListenersConfig? = null
) : Group()
override val listenersConfig: ListenersConfig? = null,
override val customData: T?
) : Group<T>

data class ListenersConfig(
val ingressHost: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,30 @@ package pl.allegro.tech.servicemesh.envoycontrol.groups
import com.google.protobuf.Struct
import com.google.protobuf.Value
import io.envoyproxy.controlplane.cache.NodeGroup
import io.envoyproxy.envoy.config.core.v3.Node
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import io.envoyproxy.envoy.config.core.v3.Node as NodeV3

class MetadataNodeGroup(
val properties: SnapshotProperties
) : NodeGroup<Group> {
interface CustomGroupDataMapper<T> {
fun map(node: NodeV3): T?

}

class EmptyCustomGroupDataMapper<T>: CustomGroupDataMapper<T> {
override fun map(node: Node): T? {
return null
}
}


class MetadataNodeGroup<T>(
val properties: SnapshotProperties,
val customGroupDataMapper: CustomGroupDataMapper<T>
) : NodeGroup<Group<T>> {
private val logger by logger()

override fun hash(node: NodeV3): Group {
override fun hash(node: NodeV3): Group<T> {
return createV3Group(node)
}

Expand Down Expand Up @@ -128,7 +142,7 @@ class MetadataNodeGroup(
)
}

private fun createV3Group(node: NodeV3): Group {
private fun createV3Group(node: NodeV3): Group<T> {
val nodeMetadata = NodeMetadata(node.metadata, properties)
val serviceName = serviceName(nodeMetadata)
val discoveryServiceName = nodeMetadata.discoveryServiceName
Expand All @@ -142,15 +156,17 @@ class MetadataNodeGroup(
serviceName,
discoveryServiceName,
proxySettings,
listenersConfig
listenersConfig,
customGroupDataMapper.map(node)
)
else ->
ServicesGroup(
nodeMetadata.communicationMode,
serviceName,
discoveryServiceName,
proxySettings,
listenersConfig
listenersConfig,
customGroupDataMapper.map(node)
)
}
}
Expand Down