Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two different source connectors uses the same continuation token of the Cosmos lease container #471

Open
webdirektindia opened this issue Aug 8, 2022 · 2 comments
Labels
bug Something isn't working

Comments

@webdirektindia
Copy link

Description

While running two source connectors on the single cosmos DB container (say User) both use the same User-leases container (i.e same continuation token) So the changes in the User container are processed by only one connector whose worker-id is specified in the User-lease container.

Expected Behavior

For each source connector, there must be different Leases containers.

Reproduce

I have created two Cosmos DB source connectors in a single Kafka connect instance:

1. create-user-cosmosdb-source-connector
This will fetch the record from Cosmos UserDB-->User container and store it in the "create-user" Kafka topic if the property OperationType is 1 else ignore it.

JSON config:

{
  "name": "create-user-cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "connect.cosmos.task.poll.interval": "1000",
    "connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "UserDb",
    "connect.cosmos.containers.topicmap": "create_user#User",
    "connect.cosmos.offset.useLatest": true,
    "topic.creation.enable": "false",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "transforms": "createIncludeFilter,replacefield",
    "transforms.createIncludeFilter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.createIncludeFilter.filter.condition": "$.[?(@.OperationType == 1)]",
    "transforms.createIncludeFilter.filter.type": "include", 
    "transforms.createIncludeFilter.missing.or.null.behavior": "fail",
    "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.replacefield.exclude": "_rid,_self,_etag,_attachements,_ts",
    "transforms.replacefield.include": "id,Login,Email,Password,Name,Country,OperationType"
  }
}

2. create-user-cosmosdb-source-connector
This will fetch the record from Cosmos UserDB-->User container and store it in the "update_user" Kafka topic if the property OperationType is 2 else ignore it.

JSON config:

{
  "name": "update-user-cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "connect.cosmos.task.poll.interval": "1000",
    "connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "UserDb",
    "connect.cosmos.containers.topicmap": "update_user#User",
    "connect.cosmos.offset.useLatest": true,
    "topic.creation.enable": "false",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "transforms": "updateIncludeFilter,replacefield",
    "transforms.updateIncludeFilter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.updateIncludeFilter.filter.condition": "$.[?(@.OperationType == 2)]",
    "transforms.updateIncludeFilter.filter.type": "include", 
    "transforms.updateIncludeFilter.missing.or.null.behavior": "fail",
    "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.replacefield.exclude": "_rid,_self,_etag,_attachements,_ts",
    "transforms.replacefield.include": "id,Login,Email,Password,Name,Country,OperationType"
  }
}

But when running in parallel both use the same continuation token of the Cosmos lease container (i.e. User-leases). So the message is processed by only one connector whose worker-id is specified in the User-lease container.

Record present in the User-leases container:

{
    "id": "UserUserDb.cosmos-instance.documents.azure.com_UserDb_User..0",
    "_etag": "\"00000000-0000-0000-9c3b-3307afb701d8\"",
    "LeaseToken": "0",
    "ContinuationToken": "\"77\"",
    "timestamp": "2022-07-20T13:18:28.053232Z",
    "Owner": "worker-3971065-0",
    "_rid": "9Y8kAO3czHkCAAAAAAAAAA==",
    "_self": "dbs/9Y8kAA==/colls/9Y8kAO3czHk=/docs/9Y8kAO3czHkCAAAAAAAAAA==/",
    "_attachments": "attachments/",
    "_ts": 1658323108
}
@webdirektindia webdirektindia added the bug Something isn't working label Aug 8, 2022
@kushagraThapar
Copy link
Collaborator

@webdirektindia - thanks for reporting this issue, we are investigating it.

@kushagraThapar
Copy link
Collaborator

@webdirektindia - I reviewed the code, and it is intentionally only creating a single lease container, so that it can share it across different change feed processor instances (in this case, different kafka source connector instances). What is your use case of processing the same change twice in separate kafka source connectors?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants