Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES Document version is set to Kafka record offset #499

Open
michaelfortin opened this issue Feb 23, 2021 · 6 comments
Open

ES Document version is set to Kafka record offset #499

michaelfortin opened this issue Feb 23, 2021 · 6 comments

Comments

@michaelfortin
Copy link

michaelfortin commented Feb 23, 2021

connector version: 11.0.0

When using "key.ignore": false and "write.method": "insert", the connector set the document version (version appearing in Elasticsearch) to the record offset in Kafka, for a reason that I cannot quite understand.

It sometimes leads to version conflict error and the message is not indexed in elasticsearch

[2021-02-12 02:18:07,133] WARN Ignoring version conflicts for items: [Key{index-000042/_doc/1355b1bc-2de2-4591-922a-890407bf3308}, Key{index-000042/_doc/c3d9f18e-91ab-49f0-9892-ffbd928dd783}, Key{index-000042/_doc/81928de3-0f77-4e22-a6f0-fd24088dceb6}, Key{index-000042/_doc/a5733fef-e8a6-4193-89cc-ef9b0f9c39d5}, Key{index-000042/_doc/f04a6eaa-a4d2-4707-b1ba-7f696a887251}, Key{index-000042/_doc/f2aa9c5c-52f3-4435-92b4-e7f77fe8a748}, Key{index-000042/_doc/610ab60f-2045-4238-83e9-e19991b7d732}, Key{index-000042/_doc/599d8291-bde2-4fd3-91a1-34706591f04b}, Key{index-000042/_doc/3767cc22-e894-4614-8e98-07de4becd170}, Key{index-000042/_doc/4ec40fff-707b-40f8-a308-d184be29b9e7}, Key{index-000042/_doc/78e113b1-f2fe-4348-9e99-afc926eb4caa}, Key{index-000042/_doc/9078d606-5280-47f0-9590-ae274693874e}, Key{index-000042/_doc/cd23d612-8013-4890-90df-d7b4a17bc80c}, Key{index-000042/_doc/4b34770f-6304-4beb-942d-dd98cf875ede}, Key{index-000042/_doc/bd2e8889-5597-4fa2-a210-c7483518ca27}, Key{index-000042/_doc/aabdbd33-742e-49bb-adef-cdc3d4f00b91}, Key{index-000042/_doc/03c52672-d98c-4f13-a4cc-b7d6c9bc6ef5}, Key{index-000042/_doc/1dde610d-2d52-403d-8559-2e83ba1024c5}, Key{index-000042/_doc/94db6a14-26d7-47ea-a80d-9fa41840a954}, Key{index-000042/_doc/07b97061-5842-4e77-b686-20e5af239e59}

This error can happen if the version provided by the connector is lower than the version of the document with the same _id in Elasticsearch. When this happens, this means the record offset in Kafka was lower than the a previous record generating the same _id. This situation is possible if the Kafka topic has multiple partitions, if partitions are created on the fly or if the partitions are deleted and are created again. In my case it happens quite often and it affects the connector performance.

The connector should have a new boolean parameter called version.ignore . When true, the connector would not attempt to add a version to the document under any circumstances. When false (default), the current behavior would take place.

The new config

  public static final String IGNORE_VERSION_CONFIG = "version.ignore";
  private static final String IGNORE_VERSION_DOC =
      "Whether to ignore the record offset when forming the Elasticsearch document version."
          + " When this is set to ``true``, no document version will be provided to Elasticsearch"
          + " When this is set to ``false``"
          + " with ``" + IGNORE_KEY_CONFIG + "`` set to ``false``"
          + " and ``" + WRITE_METHOD_CONFIG + "`` set to ``" + WriteMethod.INSERT + "``,"
          + " the version provided will be the record offset in Kafka";
  private static final String IGNORE_VERSION_DISPLAY = "Ignore version mode";
  private static final boolean IGNORE_VERSION_DEFAULT = false;

Its usage

  private DocWriteRequest<?> maybeAddExternalVersioning(
      DocWriteRequest<?> request,
      SinkRecord record
  ) {
    if (!config.ignoreVersion()) {
      if (!config.shouldIgnoreKey(record.topic())) {
        request.versionType(VersionType.EXTERNAL);
        request.version(record.kafkaOffset());
      }
    }
    return request;
  }

I built a new version this way and it works fine. How can I create a pull request?

@jacobb35
Copy link

jacobb35 commented Mar 25, 2021

I implemented the solution in my code and it works like a charm. It saved my life. Thanks a lot for this @michaelfortin.

@Harish-Sridhar
Copy link

I also face the same issue. When can we expect a fix for this

@yzia2000
Copy link

Hi, I have written an PR for this feature and tested the package locally. This is the PR: #628

@ChrisIgel
Copy link

What can be done to get this PR merged?

@yzia2000
Copy link

What can be done to get this PR merged?

I will probably have to create a Jira ticket for this to be noticed...

@Mahanmmi
Copy link

Mahanmmi commented Aug 17, 2023

+++
For PR to get merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants