@@ -43,6 +43,7 @@ import org.springframework.kafka.test.rule.KafkaEmbedded
4343import org.springframework.kafka.test.utils.ContainerTestUtils
4444import org.springframework.kafka.test.utils.KafkaTestUtils
4545import spock.lang.Shared
46+ import spock.lang.RepeatUntilFailure
4647
4748import java.util.concurrent.ExecutionException
4849import java.util.concurrent.Future
@@ -104,6 +105,32 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
104105 }
105106 }
106107
108+ private static class SortBatchKafkaTraces implements Comparator<List<DDSpan > > {
109+ @Override
110+ int compare (List<DDSpan > o1 , List<DDSpan > o2 ) {
111+ return Long . compare(batchSortKey(o1), batchSortKey(o2))
112+ }
113+ }
114+
115+ private static long batchSortKey (List<DDSpan > trace ) {
116+ assert ! trace. isEmpty()
117+ if (trace. get(0 ). localRootSpan. operationName. toString() == " parent" ) {
118+ return Long . MIN_VALUE
119+ }
120+ def deliverSpan = trace. find { it. operationName. toString() == " kafka.deliver" }
121+ return deliverSpan ? deliverSpan. parentId : trace. get(0 ). parentId
122+ }
123+
124+ private static List<DDSpan > producerSpans (List<List<DDSpan > > traces ) {
125+ def producerTrace = traces. find { trace ->
126+ ! trace. isEmpty() && trace. get(0 ). localRootSpan. operationName. toString() == " parent"
127+ }
128+ assert producerTrace != null
129+ return producerTrace
130+ .findAll { it. getTag(Tags . SPAN_KIND ) == Tags . SPAN_KIND_PRODUCER }
131+ .sort { it. spanId }
132+ }
133+
107134
108135 static {
109136 PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<> (3 )
@@ -835,7 +862,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
835862 producer.close()
836863 }
837864
865+ // TODO remove both annotations before ,erge
838866 @Flaky(" Repeatedly fails with a partition set to 1 but expects 0 https :// github.com/DataDog/dd-trace-java/issues/3864")
867+ @RepeatUntilFailure (maxAttempts = 100 )
839868 def " test spring kafka template produce and batch consume" () {
840869 setup :
841870 def senderProps = KafkaTestUtils . senderProps(embeddedKafka. getBrokersAsString())
@@ -857,14 +886,14 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
857886 def container = new KafkaMessageListenerContainer<> (consumerFactory, containerProperties)
858887 def records = new LinkedBlockingQueue<ConsumerRecord<String , String > > ()
859888 container. setupMessageListener(new BatchMessageListener<String , String > () {
860- @Override
861- void onMessage (List<ConsumerRecord<String , String > > consumerRecords ) {
862- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
863- consumerRecords. each {
864- records. add(it)
865- }
889+ @Override
890+ void onMessage (List<ConsumerRecord<String , String > > consumerRecords ) {
891+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
892+ consumerRecords. each {
893+ records. add(it)
866894 }
867- })
895+ }
896+ })
868897 container. start()
869898 ContainerTestUtils . waitForAssignment(container, embeddedKafka. getPartitionsPerTopic())
870899
@@ -874,7 +903,8 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
874903 for (g in greetings) {
875904 kafkaTemplate. send(SHARED_TOPIC , g). addCallback({
876905 runUnderTrace(" producer callback" ) {}
877- }, { ex ->
906+ }, {
907+ ex ->
878908 runUnderTrace(" producer exception: " + ex) {}
879909 })
880910 }
@@ -888,17 +918,31 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
888918
889919 then :
890920 def receivedSet = greetings. toSet()
891- greetings. eachWithIndex { g , i ->
921+ def receivedRecords = []
922+ greetings. eachWithIndex {
923+ g, i ->
892924 def received = records. poll(5 , TimeUnit . SECONDS )
893925 receivedSet. remove(received. value()) // maybe received out of order in case several partitions
894926 assert received. key() == null
895927
896928 def headers = received. headers()
897929 assert headers. iterator(). hasNext()
930+ receivedRecords. add(received)
898931 }
899932 assert receivedSet. isEmpty()
900933
901- assertTraces(4 , SORT_TRACES_BY_ID ) {
934+ TEST_WRITER . waitForTraces(4 )
935+ def traces = Arrays . asList(TEST_WRITER . toArray()) as List<List<DDSpan > >
936+ def produceSpans = producerSpans(traces)
937+ def spanIdToRecord = receivedRecords. collectEntries {
938+ record ->
939+ def header = record. headers(). headers(" x-datadog-parent-id" ). iterator()
940+ assert header. hasNext()
941+ [(Long . parseLong(new String (header. next(). value(), StandardCharsets . UTF_8 ))): record]
942+ }
943+
944+ // Batch listener delivery order can vary; match each consumer trace to its producer via the propagated parent ID.
945+ assertTraces(4 , new SortBatchKafkaTraces ()) {
902946 trace(7 ) {
903947 basicSpan(it, " parent" )
904948 basicSpan(it, " producer callback" , span(0 ))
@@ -910,46 +954,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
910954 }
911955
912956 if (hasQueueSpan()) {
913- trace(2 ) {
914- consumerSpan(it, consumerProperties, trace(1 )[1 ], 0 .. 0 )
915- queueSpan(it, trace(0 )[6 ])
916- }
917- trace(2 ) {
918- consumerSpan(it, consumerProperties, trace(2 )[1 ], 0 .. 1 )
919- queueSpan(it, trace(0 )[4 ])
920- }
921- trace(2 ) {
922- consumerSpan(it, consumerProperties, trace(3 )[1 ], 0 .. 1 )
923- queueSpan(it, trace(0 )[2 ])
957+ [0 , 1 , 2 ]. each {
958+ i ->
959+ def expectedOffset = spanIdToRecord[produceSpans[i]. spanId]. offset()
960+ trace(2 ) {
961+ consumerSpan(it, consumerProperties, span(1 ), expectedOffset.. expectedOffset)
962+ queueSpan(it, produceSpans[i])
963+ }
924964 }
925965 } else {
926- trace(1 ) {
927- consumerSpan(it, consumerProperties, trace(0 )[6 ], 0 .. 0 )
928- }
929- trace(1 ) {
930- consumerSpan(it, consumerProperties, trace(0 )[4 ], 0 .. 1 )
931- }
932- trace(1 ) {
933- consumerSpan(it, consumerProperties, trace(0 )[2 ], 0 .. 1 )
966+ [0 , 1 , 2 ]. each {
967+ i ->
968+ def expectedOffset = spanIdToRecord[produceSpans[i]. spanId]. offset()
969+ trace(1 ) {
970+ consumerSpan(it, consumerProperties, produceSpans[i], expectedOffset.. expectedOffset)
971+ }
934972 }
935973 }
936974 }
937975
938976 if (isDataStreamsEnabled()) {
939- StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == 0 }
977+ StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find {
978+ it. parentHash == 0
979+ }
940980 verifyAll(first) {
941981 tags. hasAllTags(" direction:out" , " kafka_cluster_id:$clusterId " , " topic:$SHARED_TOPIC " . toString(), " type:kafka" )
942982 }
943983
944- StatsGroup second = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == first. hash }
984+ StatsGroup second = TEST_DATA_STREAMS_WRITER . groups. find {
985+ it. parentHash == first. hash
986+ }
945987 verifyAll(second) {
946988 tags. hasAllTags(
947- " direction:in" ,
948- " group:sender" ,
949- " kafka_cluster_id:$clusterId " ,
950- " topic:$SHARED_TOPIC " . toString(),
951- " type:kafka"
952- )
989+ " direction:in" ,
990+ " group:sender" ,
991+ " kafka_cluster_id:$clusterId " ,
992+ " topic:$SHARED_TOPIC " . toString(),
993+ " type:kafka"
994+ )
953995 }
954996 }
955997
@@ -981,16 +1023,16 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
9811023
9821024 // setup a Kafka message listener
9831025 container. setupMessageListener(new MessageListener<String , String > () {
984- @Override
985- void onMessage (ConsumerRecord<String , String > record ) {
986- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
987- records. add(record)
988- if (isDataStreamsEnabled()) {
989- // even if header propagation is disabled, we want data streams to work.
990- TEST_DATA_STREAMS_WRITER . waitForGroups(2 )
991- }
1026+ @Override
1027+ void onMessage (ConsumerRecord<String , String > record ) {
1028+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
1029+ records. add(record)
1030+ if (isDataStreamsEnabled()) {
1031+ // even if header propagation is disabled, we want data streams to work.
1032+ TEST_DATA_STREAMS_WRITER . waitForGroups(2 )
9921033 }
993- })
1034+ }
1035+ })
9941036
9951037 // start the container and underlying message listener
9961038 container. start()
@@ -1028,9 +1070,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10281070 def existingSpanId = 9876543210987654L
10291071 def headers = new RecordHeaders ()
10301072 headers. add(new RecordHeader (" x-datadog-trace-id" ,
1031- String . valueOf(existingTraceId). getBytes(StandardCharsets . UTF_8 )))
1073+ String . valueOf(existingTraceId). getBytes(StandardCharsets . UTF_8 )))
10321074 headers. add(new RecordHeader (" x-datadog-parent-id" ,
1033- String . valueOf(existingSpanId). getBytes(StandardCharsets . UTF_8 )))
1075+ String . valueOf(existingSpanId). getBytes(StandardCharsets . UTF_8 )))
10341076
10351077 when :
10361078 def record = new ProducerRecord (SHARED_TOPIC , 0 , null , " test-context-extraction" , headers)
@@ -1063,16 +1105,16 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10631105 def oldExtractorsByType = extractorsByTypeField. get(TEST_DATA_STREAMS_MONITORING )
10641106
10651107 def extractor = new DataStreamsTransactionExtractor () {
1066- String getName () {
1067- return " kafka-produce-test"
1068- }
1069- DataStreamsTransactionExtractor.Type getType () {
1070- return DataStreamsTransactionExtractor.Type . KAFKA_PRODUCE_HEADERS
1071- }
1072- String getValue () {
1073- return " x-transaction-id"
1074- }
1108+ String getName () {
1109+ return " kafka-produce-test"
1110+ }
1111+ DataStreamsTransactionExtractor.Type getType () {
1112+ return DataStreamsTransactionExtractor.Type . KAFKA_PRODUCE_HEADERS
10751113 }
1114+ String getValue () {
1115+ return " x-transaction-id"
1116+ }
1117+ }
10761118 def extractorsByType = new EnumMap<> (DataStreamsTransactionExtractor.Type )
10771119 extractorsByType. put(DataStreamsTransactionExtractor.Type . KAFKA_PRODUCE_HEADERS , [extractor])
10781120 extractorsByTypeField. set(TEST_DATA_STREAMS_MONITORING , extractorsByType)
0 commit comments