Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source connector fails to write data to topic saying schema is not matching when using avro converters. #538

Open
sujit-warrier-maersk opened this issue Nov 19, 2023 · 2 comments
Labels
bug Something isn't working

Comments

@sujit-warrier-maersk
Copy link

sujit-warrier-maersk commented Nov 19, 2023

Description

  • When trying to create a new instance of source connector with value converter set to io.confluent.connect.avro.AvroConverter. The lease container is created and a new schema is created and sometimes a few documents are written to the topic and then it fails.

  • There is no schema defined before connector instance is created. but still we get the below error.

Kafka connect env settings:

  CONNECT_BOOTSTRAP_SERVERS: server-1:39094
      CONNECT_GROUP_ID: "kc101-connect"
      CONNECT_CONFIG_STORAGE_TOPIC: "_kc101-connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "_kc101-connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "_kc101-connect-status"
      CONNECT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: schema-registry:8081
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: schema-registry:8081
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect-1"
      CONNECT_LISTENERS: http://connect-1:8083
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'

Connect instance settings:

{
 "name": "cosmosdb-source-connector-tp-legs-raw",
 "config": {
   "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
   "tasks.max": "1",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "connect.cosmos.task.poll.interval": "100",
   "connect.cosmos.connection.endpoint": "{{CosmosDbUrl}}",
   "connect.cosmos.master.key": "{{CosmosDnApiKey}}",
   "connect.cosmos.databasename": "Preplanning",
   "connect.cosmos.containers.topicmap": "transport-plan-raw1#transport-plan-raw-events",
   "connect.cosmos.offset.useLatest": false,
   "value.converter.schemas.enable": "true",
   "key.converter.schemas.enable": "false",
   "value.converter.schema.registry.url": "http://schema-registry:8081",
   "key.converter.schema.registry.url": "http://schema-registry:8081"
 }
}

Error Message: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic transport-plan-raw1 : Schema being registered is incompatible with an earlier schema for subject "transport-plan-raw1-value", details:

[
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/name')",
    "additionalInfo": "expected: inferred_name_2092778255"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/name')",
    "additionalInfo": "expected: inferred_name__1187958375"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/9/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "MISSING_UNION_BRANCH",
    "description": "The new schema is missing a type inside a union field at path '/fields/11/type/fields/10/type/1' in the old schema",
    "additionalInfo": "reader union lacking writer type: ARRAY"
  },
  { "oldSchemaVersion": 1 },
  {
    "oldSchema": "{\"type\":\"record\",\"name\":\"inferred_name_2092778255\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"messageKey\",\"type\":\"string\"},{\"name\":\"shipmentNumber\",\"type\":\"string\"},{\"name\":\"shipmentIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentVersionIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentVersionCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentVersionUpdatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentStructureUpdateDatetime\",\"type\":\"string\"},{\"name\":\"containerServiceTypeLoadService\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_586916356\",\"fields\":[{\"name\":\"containerServiceTypeLoadService\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_586916356\"}},{\"name\":\"containerServiceTypeDischargeService\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1295212612\",\"fields\":[{\"name\":\"containerServiceTypeDischargeService\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_1295212612\"}},{\"name\":\"cargoServiceType\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1290169462\",\"fields\":[{\"name\":\"cargoServiceType\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_1290169462\"}},{\"name\":\"shipmentRoute\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1187958375\",\"fields\":[{\"name\":\"shipmentRouteIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentRouteTypeCode\",\"type\":\"string\"},{\"name\":\"shipmentRouteTypeName\",\"type\":\"string\"},{\"name\":\"tradeLaneName\",\"type\":\"string\"},{\"name\":\"shipmentRouteCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRouteUpdatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointPlaceOfReceipt\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_762902212\",\"fields\":[{\"name\":\"routePointIdentifier\",\"type\":\"string\"},{\"name\":\"site\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1587501840\",\"fields\":[{\"name\":\"siteIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"},{\"name\":\"operationalFacility\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1582750289\",\"fields\":[{\"name\":\"operationalFacilityIdentifier\",\"type\":\"string\"},{\"name\":\"facilityCode\",\"type\":\"string\"},{\"name\":\"facilityName\",\"type\":\"string\"},{\"name\":\"facilityType\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1888608854\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"},{\"name\":\"isChildOf\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1616517423\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"},{\"name\":\"isChildOfL1\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__2136641715\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__2136641715\"}}],\"connect.name\":\"inferred_name__1616517423\"}}],\"connect.name\":\"inferred_name_1888608854\"}}],\"connect.name\":\"inferred_name__1582750289\"}}],\"connect.name\":\"inferred_name_1587501840\"}},{\"name\":\"city\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__666696926\",\"fields\":[{\"name\":\"cityIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__666696926\"}},{\"name\":\"territorialUnit\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__275936916\",\"fields\":[{\"name\":\"territorialUnitIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"isoSubdivisionCategory\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__275936916\"}},{\"name\":\"country\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_206982570\",\"fields\":[{\"name\":\"countryIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_206982570\"}},{\"name\":\"locationFunctionCode\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointUpdatedDatetime\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_762902212\"}},{\"name\":\"shipmentRoutePointFirstVesselLoad\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointFinalVesselDischarge\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointPlaceOfDelivery\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRouteLinks\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"inferred_name__1469213405\",\"fields\":[{\"name\":\"shipmentRouteLinkIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointStartingPoint\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointEndingPoint\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRouteLinkSequenceNumber\",\"type\":\"string\"},{\"name\":\"routeLinkProductIdentifier\",\"type\":\"long\"},{\"name\":\"shipmentRoutingType\",\"type\":\"string\"},{\"name\":\"transportModeCode\",\"type\":\"string\"},{\"name\":\"transportModeName\",\"type\":\"string\"},{\"name\":\"waterAirLand\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfDepartureLocal\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfDeparture\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfArrivalLocal\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfArrival\",\"type\":\"string\"},{\"name\":\"voyageNumberStartsAt\",\"type\":\"string\"},{\"name\":\"voyageNumberEndsAt\",\"type\":\"string\"},{\"name\":\"vesselSchedule\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1720401109\",\"fields\":[{\"name\":\"vesselCode\",\"type\":\"string\"},{\"name\":\"vesselName\",\"type\":\"string\"},{\"name\":\"alternativeCodes\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1505900296\",\"fields\":[{\"name\":\"rksT_CARRIER_ID\",\"type\":\"string\"},{\"name\":\"rksT_CARRIER_NAME\",\"type\":\"string\"},{\"name\":\"rksT_SERVICE_CODE\",\"type\":\"string\"},{\"name\":\"rksT_SERVICE_NAME\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__1505900296\"}}],\"connect.name\":\"inferred_name_1720401109\"}},{\"name\":\"shipmentRouteLinkCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRouteLinkUpdatedDatetime\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__1469213405\"},\"connect.name\":\"inferred_name_726885652\"}],\"default\":null}],\"connect.name\":\"inferred_name__1187958375\"}},{\"name\":\"_rid\",\"type\":\"string\"},{\"name\":\"_self\",\"type\":\"string\"},{\"name\":\"_etag\",\"type\":\"string\"},{\"name\":\"_attachments\",\"type\":\"string\"},{\"name\":\"_ts\",\"type\":\"long\"},{\"name\":\"_lsn\",\"type\":\"long\"}],\"connect.name\":\"inferred_name_2092778255\"}\"}, {compatibility: \"BACKWARD"
  }
]

Error code: 409

Expected Behavior

  • Connector instance should start pushing data into topioc

Reproduce

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Additional Context

  • (If applicable: Add any other context about the problem here; for example: proposed solution, doc changes, screenshots, logs, links, etc)
@sujit-warrier-maersk sujit-warrier-maersk added the bug Something isn't working label Nov 19, 2023
@kushagraThapar
Copy link
Collaborator

@TheovanKraay can you please take a look at this issue, thanks!

@sujit-warrier-maersk - we will revert back soon. Because of the holiday season, it might take a week. But wanted to let you know that we have acknowledged this issue and will work on it, thanks!

@TheovanKraay
Copy link

TheovanKraay commented Nov 27, 2023

@sujit-warrier-maersk do you need to use AvroConverter? Typically JsonConverter is used, per sample in documentation for source connector.

{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.task.poll.interval": "100",
    "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
    "connect.cosmos.master.key": "<cosmosdbprimarykey>",
    "connect.cosmos.databasename": "kafkaconnect",
    "connect.cosmos.containers.topicmap": "apparels#kafka",
    "connect.cosmos.offset.useLatest": false,
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants