Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Producer json & base64 support & better test cases #295

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

JakkuSakura
Copy link

Now option -J works in producer mode. -B k for keys, -B v for payloads, -B a for both key and payloads to to be encoded/decoded in base64. base64 works in both stdin and json.

I wrote this producer json and base64 support for copying binary topics. There is no lisencing issue, since the new library base64 is in public domain(and I have modified it a bit).

There are still some issues, though. I can't figure out the memory management of this project, so there may be some memery leakage.

I also wrote 2 test cases and beautified other old 2 test cases. I removed the assumption of minimal default partition number of 3.

@masoncj
Copy link

masoncj commented Mar 2, 2021

This would be really helpful to copy messages between topics or brokers! Nice work!

Another feature that would be amazingly helpful in this scenario would be to preserve headers. -J seems to output these under a headers property in the output JSON. If these could be preserved (or if there were a command line option that would select out certain headers to preserve), then we could transparently copy data between brokers or topics.

WDYT?

@JakkuSakura
Copy link
Author

It's a good idea to be able to keep the headers. I'll have a look at it when I have time. (Haven't used headers before)

@masoncj
Copy link

masoncj commented Mar 3, 2021

Hi @qiujiangkun

I've implemented the header production from JSON over in this branch: https://github.com/masoncj/kafkacat/tree/producer_json_headers

However, I'm seeing an issue with this PR (absent any of my code above) where the produced data seems to be corrupted:

cat > in_cmason.json <<END
{"topic":"content","partition":0,"offset":0,"tstype":"create","ts":1602611620140,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","7a7d5892dad76b89","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"18278fa2-d7c1-4bdf-9f47-aaa657ccc10e\",\n  \"id\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"name\" : \"Wellbeing Check-in\",\n  \"description\" : \"Survey: Wellbeing Check-in (ID: 292368082 version: 2a24f6c2-7ba6-41f8-95ee-0fd7d90896f5)\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : null,\n  \"measureType\" : null,\n  \"units\" : null,\n  \"choices\" : null,\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.464101Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":1,"tstype":"create","ts":1602611624213,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","49e4f775786d9807","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"43ca5fe2-3cde-4cff-a271-a88695d78196\",\n  \"id\" : \"e67afd11-07cf-463a-a18d-226f309dee60\",\n  \"name\" : \"How was your day?\",\n  \"description\" : \"How was your day?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"4c985675-ff68-456b-ba81-0ee81c87c240\",\n    \"value\" : \"Bad\"\n  }, {\n    \"id\" : \"b55fbb21-5ee7-4283-a15f-00cc0c0ec366\",\n    \"value\" : \"OK\"\n  }, {\n    \"id\" : \"7a6a1407-1bc5-4475-bfe2-a75fef8c342a\",\n    \"value\" : \"Good\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.464689Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":2,"tstype":"create","ts":1602611624390,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","532fcb40e28072e3","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"df987555-f991-4340-b391-cd0e37424baa\",\n  \"id\" : \"cb94d22c-1701-4ec5-998a-1a882863509c\",\n  \"name\" : \"What was your physical activity level today?\",\n  \"description\" : \"What was your physical activity level today?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"30c354b9-a7f3-4b27-85d8-31d1494605b9\",\n    \"value\" : \"Inactive\"\n  }, {\n    \"id\" : \"12035e8c-f011-4030-8d07-3d6053f300c6\",\n    \"value\" : \"Light\"\n  }, {\n    \"id\" : \"6d955cc7-c056-4520-b4ed-2d59e38abe34\",\n    \"value\" : \"Moderate\"\n  }, {\n    \"id\" : \"bde446eb-5cd0-4677-ba1d-ec3ff75c4406\",\n    \"value\" : \"Vigorous\"\n  }, {\n    \"id\" : \"dc4d44ef-1aae-477e-9a3d-9e4b44be5759\",\n    \"value\" : \"Intense\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.464734Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":3,"tstype":"create","ts":1602611624518,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","57230c6ddc10789b","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"ec3363b5-34d2-4dae-9327-7a1b9923ba32\",\n  \"id\" : \"40bb07dd-fa16-4c92-a898-df3a2b52faa8\",\n  \"name\" : \"About how many hours did you sleep last night?\",\n  \"description\" : \"About how many hours did you sleep last night?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"NUMERIC\",\n  \"units\" : null,\n  \"choices\" : [ ],\n  \"validation\" : {\n    \"minValue\" : 0.0,\n    \"maxValue\" : 12.0\n  },\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537644Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":4,"tstype":"create","ts":1602611624644,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","6cc095cfedb4e4d9","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"cc89283a-e034-4ba4-a502-4c08d3aa9b01\",\n  \"id\" : \"88e240e9-d343-4c3d-94f5-0733827330e4\",\n  \"name\" : \"In the last 24 hours, did you take all of your medication exactly as your healthcare provider recommended?\",\n  \"description\" : \"In the last 24 hours, did you take all of your medication exactly as your healthcare provider recommended?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"495b1dc0-ef24-4477-a373-e33d28b8bcf5\",\n    \"value\" : \"I took all of them as recommended\"\n  }, {\n    \"id\" : \"19f8e435-14a4-4401-8558-d9460cad5546\",\n    \"value\" : \"I missed some of them\"\n  }, {\n    \"id\" : \"6c0fdaf2-f9d6-44c9-9f9c-538a7c194394\",\n    \"value\" : \"I took them at a different time\"\n  }, {\n    \"id\" : \"09e74b51-bcd1-4f61-9737-55b17b571005\",\n    \"value\" : \"I forgot to take them\"\n  }, {\n    \"id\" : \"f58cdd13-5d80-4c1e-bcdf-94b2fea7454e\",\n    \"value\" : \"I didn't because I felt better\"\n  }, {\n    \"id\" : \"c6d26769-202b-4a69-aa1e-ea93228287ae\",\n    \"value\" : \"I didn't because they make me feel worse\"\n  }, {\n    \"id\" : \"d96de9a5-9c00-42fc-ab98-41fff829f663\",\n    \"value\" : \"I didn't for a different reason\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537674Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":5,"tstype":"create","ts":1602611624719,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","3315c1ff34b847b5","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"c3ce5bfa-cae0-4e6f-b9ed-0a8894707ec0\",\n  \"id\" : \"a5c3dad6-a3a2-4b09-b762-d834e40189ed\",\n  \"name\" : \"Comments\",\n  \"description\" : \"Comments\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"88e240e9-d343-4c3d-94f5-0733827330e4\",\n  \"measureType\" : \"TEXT\",\n  \"units\" : null,\n  \"choices\" : [ ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537683Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":6,"tstype":"create","ts":1602611624773,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","1335e863fc3de15","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"1331848f-7655-40e0-ab0f-0b88305dd33d\",\n  \"id\" : \"15b044cf-1eec-43ac-b35d-ffa1ab9ac197\",\n  \"name\" : \"Any thoughts to record?\",\n  \"description\" : \"Any thoughts to record?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"fc584cf6-8ec8-45d8-8ae0-577073c5bb6e\",\n    \"value\" : \"Not today\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537702Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":7,"tstype":"create","ts":1602611624857,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","42bbb93e06c4244d","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"423752e8-34ac-45e0-8ddc-b27920e98244\",\n  \"id\" : \"fbda8ebc-d417-432d-b7ad-bff7c68c61b1\",\n  \"name\" : \"My notes for today:\",\n  \"description\" : \"My notes for today:\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"15b044cf-1eec-43ac-b35d-ffa1ab9ac197\",\n  \"measureType\" : \"TEXT\",\n  \"units\" : null,\n  \"choices\" : [ ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537711Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":8,"tstype":"create","ts":1602611624970,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-provider-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","ef85f342d5c07d64","X-B3-SpanId","35830a429a65d44b","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"a5650bba-0db1-4196-af05-036688625237\",\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"id\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"eventType\" : \"MeasureUpdated\",\n  \"createdAt\" : \"2020-10-13T17:53:44.918700Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"ENTITY\",\n  \"name\" : \"Wellbeing Check-in\",\n  \"description\" : \"Survey: Wellbeing Check-in (ID: 292368082 version: 2a24f6c2-7ba6-41f8-95ee-0fd7d90896f5)\",\n  \"timeDescription\" : null,\n  \"version\" : 0\n}"}
{"topic":"content","partition":0,"offset":9,"tstype":"create","ts":1602611625143,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-patient-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","45a3bb3569ad77ba","X-B3-SpanId","6bde2c3399974fda","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"5c4f3da0-410c-437d-9110-c935e7ffa8e5\",\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"id\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"eventType\" : \"MeasureUpdated\",\n  \"createdAt\" : \"2020-10-13T17:53:45.062533Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"ENTITY\",\n  \"parentMeasureId\" : null,\n  \"contentDocs\" : [ ],\n  \"name\" : \"Wellbeing Check-in\",\n  \"description\" : \"Survey: Wellbeing Check-in (ID: 292368082 version: 2a24f6c2-7ba6-41f8-95ee-0fd7d90896f5)\",\n  \"contentMedia\" : [ ],\n  \"units\" : null,\n  \"choices\" : null,\n  \"version\" : 0,\n  \"validation\" : null,\n  \"measureType\" : null\n}"}
END
kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --config retention.ms=-1 --config retention.bytes=-1 --topic content-test
cat in_cmason.json | jq -c 'del(.headers)' | ./kafkacat  -b localhost -P -t content-test -J
kafkacat -o beginning -e -t content-test -b localhost -J > out_cmason.json

Results in some messages like:

"payload":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0004tType\" \u0000\u0000\u0000\u0000\....

Any thoughts on where this corruption might be coming from?

@JakkuSakura
Copy link
Author

@masoncj It's probably because \n is processed by jq. Currently my version separates messages by \n at end of line.

@masoncj
Copy link

masoncj commented Mar 10, 2021

@qiujiangkun, I was able to identify and fix the source of the corruption I was seeing above. It didn't have anything to do with jq or line breaks (jq -c correctly preserves line breaks in JSON-lines files).

I used valgrind. It identified a number of use-after-free issues, such as (just one example, there are many similar ones):

==95246== Invalid read of size 8
==95246==    at 0x10015154E: crc32c (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x100177DB7: rd_slice_crc32c (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x10015F337: rd_kafka_msgset_create_ProduceRequest (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x1001371F1: rd_kafka_ProduceRequest (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x100114095: rd_kafka_broker_serve (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x100111260: rd_kafka_broker_thread_main (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x1001534E5: _thrd_wrapper_function (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x10064A2EA: _pthread_body (in /usr/lib/system/libsystem_pthread.dylib)
==95246==    by 0x10064D248: _pthread_start (in /usr/lib/system/libsystem_pthread.dylib)
==95246==    by 0x10064940C: thread_start (in /usr/lib/system/libsystem_pthread.dylib)
==95246==  Address 0x1016a69c0 is 768 bytes inside a block of size 2,048 free'd
==95246==    at 0x1000EB0CD: free (in /usr/local/Cellar/valgrind/HEAD-0b5ae2f/lib/valgrind/vgpreload_memcheck-amd64-darwin.so)
==95246==    by 0x10000BC78: yajlTestFree (json.c:436)
==95246==    by 0x1000F7208: yajl_buf_free (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x1000F5285: yajl_free (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x10000BBFE: parse_json_message (json.c:573)
==95246==    by 0x100003502: producer_run (kafkacat.c:419)
==95246==    by 0x10000107E: main (kafkacat.c:2562)
==95246==  Block was alloc'd at
==95246==    at 0x1000EACF5: malloc (in /usr/local/Cellar/valgrind/HEAD-0b5ae2f/lib/valgrind/vgpreload_memcheck-amd64-darwin.so)
==95246==    by 0x10000BC28: yajlTestMalloc (json.c:440)
==95246==    by 0x1000F7250: yajl_buf_append (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x1000F74A8: yajl_string_decode (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x1000F6E86: yajl_do_parse (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x10000BB87: parse_json_message (json.c:564)
==95246==    by 0x100003502: producer_run (kafkacat.c:419)
==95246==    by 0x10000107E: main (kafkacat.c:2562)

(I ❤️ valgrind.)

I think the issue here is that the yajl parser is allocated and freed in parse_json_message(), however we store references to data returned by the parser in the kafkacatMessageContext. It seems at least some of these references may be allocated by the parser and stored in the memory pool allocated by yajl_alloc(), and we use those references after the parser is freed.
I believe we may even sometimes pass references to this memory into librdkafka (see above) and it may access it even after produce() has exited. This may only be an issue with larger messages where we don't request that librdkafka copy the message internally.

I have a branch here that fixes these issues. It's a bit messy because I have to manage both the buffer and the parser memory pool, so I use a small allocated stub to store both of these references and free them in the message sent callback from librdkafka. I'd welcome stylistic comments here.

With these changes valgrind reports no errors from kafkacat code.

Support headers in JSON production
@masoncj
Copy link

masoncj commented Mar 11, 2021

@edenhill this is proving really useful for me in copying data between topics/servers. For example:

./kafkacat -C -F ~/.kafka-dev -o beginning -e -t patient -J | ./kafkacat -P -b localhost -t patient -J -X message.max.bytes=80000

(Note messsage.max.bytes required; see #137 (comment) )

Could we please discuss what it might take to get this merged? Thanks so much.

@AlexeiZenin
Copy link

@edenhill this is proving really useful for me in copying data between topics/servers. For example:

./kafkacat -C -F ~/.kafka-dev -o beginning -e -t patient -J | ./kafkacat -P -b localhost -t patient -J -X message.max.bytes=80000

(Note messsage.max.bytes required; see #137 (comment) )

Could we please discuss what it might take to get this merged? Thanks so much.

This would be very useful!

@JakkuSakura
Copy link
Author

A little ad, you can try out my kafcat

reddit

@Tommmster
Copy link

@edenhill this is proving really useful for me in copying data between topics/servers. For example:

./kafkacat -C -F ~/.kafka-dev -o beginning -e -t patient -J | ./kafkacat -P -b localhost -t patient -J -X message.max.bytes=80000

(Note messsage.max.bytes required; see #137 (comment) )

Could we please discuss what it might take to get this merged? Thanks so much.

This would definitely be a very welcomed feature!

@vozzen
Copy link

vozzen commented Nov 6, 2023

That'd be a very nice addition but no response for the last 2 years. Is the project somehow on hold?

@Hubbitus
Copy link

@JakkuSakura, very nice work! Thank you.
But could you please rebase your work on top of master and resolve conflicts?

@tmancill
Copy link

I'm seeing malloc assertions when trying to produce more than a few messages using this branch.

First I tried using a pipe.

./kafkacat -b broker:9092 -C -t topic1 -p 0 -J -B a | ./kafkacat -b localhost:9092 -t topic2 -P -p 0 -J -X -B a
Fatal glibc error: malloc.c:2496 (sysmalloc): assertion failed: (old_top == initial_top (av) && old_size == 0) || ((unsigned long) (old_size) >= MINSIZE && prev_inuse (old_top) && ((unsigned long) old_end & (pagesize - 1)) == 0)
Aborted (core dumped)

When that failed, I consumed a large number of messages into a file and then tried to produce by reading from stdin, but I'm not able to get more than 9 messages (about 25kB) before I see the same assertion.

Consume some messages

./kafkacat -b broker:9092 -C -t topic1 -p 0 -J -B a -o beginning -c 4000000 > /tmp/msgs

Try to produce a few to the topic; this is successful

head -5 /tmp/msgs | ./kafkacat -b localhost:9092 -t topic2 -P -p 0 -J -B a

Try to produce them all with -l

./kafkacat -b localhost:9092 -t topic2 -P -p 0 -J -B a -l /tmp/msgs
Fatal glibc error: malloc.c:2496 (sysmalloc): assertion failed: (old_top == initial_top (av) && old_size == 0) || ((unsigned long) (old_size) >= MINSIZE && prev_inuse (old_top) && ((unsigned long) old_end & (pagesize - 1)) == 0)
Aborted (core dumped)

Search for the first message that fails

head -10 /tmp/msgs | ./kafkacat -b localhost:9092 -t topic2 -P -p 0 -J -B a
Fatal glibc error: malloc.c:2496 (sysmalloc): assertion failed: (old_top == initial_top (av) && old_size == 0) || ((unsigned long) (old_size) >= MINSIZE && prev_inuse (old_top) && ((unsigned long) old_end & (pagesize - 1)) == 0)
Aborted (core dumped)

Is the 10th message corrupt? No, this is also successful

head -15 | tail -6 /tmp/msgs | ./kafkacat -b localhost:9092 -t topic2 -P -p 0 -J -B a

How much data are we talking about?

head -10 /tmp/msgs | wc
     10      10   28066

head -9 /tmp/msgs | wc
      9       9   25521

Concatenating @masoncj 's test data together three times didn't generate the error. I don't have a reproducible test case I can share (yet). Any ideas on how this could be data-dependent (so I can generate a test case to share)?

For those interested in core dump, the binary was built with ./bootstrap.sh on Amazon Linux 2023. Here's partial output of coredumpctl dump:

       Message: Process 2673238 (kafkacat) of user 1000 dumped core.

                Module libpcre2-8.so.0 from rpm pcre2-10.40-1.amzn2023.0.3.x86_64
                Module libselinux.so.1 from rpm libselinux-3.4-5.amzn2023.0.2.x86_64
                Module libkeyutils.so.1 from rpm keyutils-1.6.3-1.amzn2023.0.1.x86_64
                Module libkrb5support.so.0 from rpm krb5-1.21-3.amzn2023.0.3.x86_64
                Module libcom_err.so.2 from rpm e2fsprogs-1.46.5-2.amzn2023.0.2.x86_64
                Module libk5crypto.so.3 from rpm krb5-1.21-3.amzn2023.0.3.x86_64
                Module libkrb5.so.3 from rpm krb5-1.21-3.amzn2023.0.3.x86_64
                Module libunistring.so.2 from rpm libunistring-0.9.10-10.amzn2023.0.2.x86_64
                Module libgssapi_krb5.so.2 from rpm krb5-1.21-3.amzn2023.0.3.x86_64
                Module libpsl.so.5 from rpm libpsl-0.21.1-3.amzn2023.0.2.x86_64
                Module libidn2.so.0 from rpm libidn2-2.3.2-1.amzn2023.0.5.x86_64
                Module libnghttp2.so.14 from rpm nghttp2-1.57.0-1.amzn2023.0.1.x86_64
                Module libcurl.so.4 from rpm curl-8.5.0-1.amzn2023.0.2.x86_64
                Module libz.so.1 from rpm zlib-1.2.11-33.amzn2023.0.5.x86_64
                Module libcrypto.so.3 from rpm openssl-3.0.8-1.amzn2023.0.11.x86_64
                Module libssl.so.3 from rpm openssl-3.0.8-1.amzn2023.0.11.x86_64
                Stack trace of thread 2673238:
                #0  0x00007f421c8a153c __pthread_kill_implementation (libc.so.6 + 0xa153c)
                #1  0x00007f421c854d26 raise (libc.so.6 + 0x54d26)
                #2  0x00007f421c8287f3 abort (libc.so.6 + 0x287f3)
                #3  0x00007f421c829130 __libc_message.cold (libc.so.6 + 0x29130)
                #4  0x00007f421c84db17 __libc_assert_fail (libc.so.6 + 0x4db17)
                #5  0x00007f421c8adbfa sysmalloc (libc.so.6 + 0xadbfa)
                #6  0x00007f421c8aeb4b _int_malloc (libc.so.6 + 0xaeb4b)
                #7  0x00007f421c8aee7a _int_realloc (libc.so.6 + 0xaee7a)
                #8  0x00007f421c8afc1b realloc (libc.so.6 + 0xafc1b)
                #9  0x000000000040b212 n/a (/home/ec2-user/kafkacat/kcat-295/kafkacat + 0xb212)
                ELF object binary architecture: AMD x86-64

@JakkuSakura
Copy link
Author

Thanks for testing and reporting. It could be passing the wrong pointer when re-allocating memory chunk. So that when you process beyond certain bytes, it fails to re-allocate.

I'm becoming really busy recently. I will have a look and get it fixed and resolve conflicts when I have time

@Hubbitus
Copy link

Hubbitus commented Jun 4, 2024

Hello. Sorry, could you please continue and resolve outstanding issues in this PR?

# Conflicts:
#	Makefile
#	json.c
#	kcat.c
#	tests/0000-unit.sh
#	tests/0002-delim.sh
#	tests/helpers.sh
@JakkuSakura JakkuSakura changed the title Producer json & base64 support & better test cases Drift: Producer json & base64 support & better test cases Jul 2, 2024
@JakkuSakura JakkuSakura changed the title Drift: Producer json & base64 support & better test cases Draft: Producer json & base64 support & better test cases Jul 2, 2024
@JakkuSakura
Copy link
Author

I just merged the code with master branch. however, I don't have test env atm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants