Skip to content

Commit 1d32f71

Browse files
fix: Add ability to add headers
1 parent 0b0e73c commit 1d32f71

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
<dependency>
7474
<groupId>org.apache.kafka</groupId>
7575
<artifactId>kafka-clients</artifactId>
76-
<version>0.9.0.1</version>
76+
<version>3.0.0</version>
7777
<exclusions>
7878
<!-- XXX Transitively includes log4j 1.2.15 which has bad metadata
7979
http://stackoverflow.com/a/9047963 -->

src/main/java/co/signal/kafkameter/KafkaProducerSampler.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.io.IOException;
1919
import java.io.PrintWriter;
2020
import java.io.StringWriter;
21+
import java.util.LinkedHashMap;
22+
import java.util.Map;
2123
import java.util.Properties;
2224

2325
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -33,6 +35,7 @@
3335
import org.apache.jmeter.samplers.SampleResult;
3436
import org.apache.jorphan.logging.LoggingManager;
3537
import org.apache.kafka.clients.producer.ProducerRecord;
38+
import org.apache.kafka.common.header.internals.RecordHeader;
3639
import org.apache.log.Logger;
3740

3841
import co.signal.handlebars.CustomHandlebars;
@@ -121,6 +124,11 @@ public class KafkaProducerSampler extends AbstractJavaSamplerClient {
121124
*/
122125
private static final String PARAMETER_KAFKA_PARTITION = "kafka_partition";
123126

127+
/**
128+
* Parameter for setting the headers. It is optional.
129+
*/
130+
private static final String PARAMETER_KAFKA_HEADERS = "kafka_headers";
131+
124132
// private Producer<Long, byte[]> producer;
125133

126134
private KafkaProducer<String, String> producer;
@@ -178,6 +186,7 @@ public Arguments getDefaultParameters() {
178186
defaultParameters.addArgument(PARAMETER_KAFKA_USE_SSL, "${PARAMETER_KAFKA_USE_SSL}");
179187
defaultParameters.addArgument(PARAMETER_KAFKA_COMPRESSION_TYPE, null);
180188
defaultParameters.addArgument(PARAMETER_KAFKA_PARTITION, null);
189+
defaultParameters.addArgument(PARAMETER_KAFKA_HEADERS, null);
181190
return defaultParameters;
182191
}
183192

@@ -197,6 +206,19 @@ public SampleResult runTest(JavaSamplerContext context) {
197206
} catch (IOException e1) {
198207
e1.printStackTrace();
199208
}
209+
Map<String, String> headers = new LinkedHashMap<String, String>();
210+
try {
211+
String headerString = handlebars.compileInline(context.getParameter(PARAMETER_KAFKA_HEADERS)).apply("");
212+
if (!Strings.isNullOrEmpty(headerString)) {
213+
String[] pairs = headerString.split("&");
214+
for (String pair : pairs) {
215+
int idx = pair.indexOf("=");
216+
headers.put(pair.substring(0, idx), pair.substring(idx + 1));
217+
}
218+
}
219+
} catch (IOException e1) {
220+
e1.printStackTrace();
221+
}
200222
sampleResultStart(result, message);
201223

202224
final ProducerRecord<String, String> producerRecord;
@@ -207,6 +229,9 @@ public SampleResult runTest(JavaSamplerContext context) {
207229
final int partitionNumber = Integer.parseInt(partitionString);
208230
producerRecord = new ProducerRecord<String, String>(topic, partitionNumber, key, message);
209231
}
232+
for (Map.Entry<String, String> entry : headers.entrySet()) {
233+
producerRecord.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes()));
234+
}
210235

211236
try {
212237
producer.send(producerRecord);
@@ -301,4 +326,4 @@ private String getStackTrace(Exception exception) {
301326
exception.printStackTrace(new PrintWriter(stringWriter));
302327
return stringWriter.toString();
303328
}
304-
}
329+
}

0 commit comments

Comments
 (0)