Aggregate Services into a Single Swagger
September 28th, 2017
Aggregate Services into a Single Swagger
Spring publishes a spring-kafka-test library that is promoted as a way to do some unit testing with Kafka. The project I’m currently working on reads messages from a JMS queue, does some transforming of that data, then produces that message onto a Kafka topic for downstream consumers to use as they see fit. Since we do functional tests, in addition to unit tests, I needed to verify that certain transformations make it onto the topic in the expected format. The Spring Kafka Test library provides an embedded Kafka broker that works great for this. I’ll show you how I implemented it using Gradle and a Spring Boot application. A sample project is here.
To start the embedded Kafka instance and the Spring Boot application, I used the SpawnProcessTask as the task type. To actually start the Kafka broker, just call the before() method on the instance. This starts up an embedded Zookeeper and Kafka at the same time. The constructor to the KafkaEmbedded class has a few parameters, but there really isn’t much that can be configured beyond those few parameters. This task also starts up the Spring Boot application as an executable jar. You could run it through a bootRun Gradle task, but using the jar is closer to how it will run out in the wild.
task startServer(type: SpawnProcessTask) {
doFirst {
kafkaEmbedded = new KafkaEmbedded(1, true, 1, 'output-topic')
kafkaEmbedded.before()
bootPort = availablePort()
command "java -Dserver.port=${bootPort} -Dspring.kafka.producer.bootstrap-servers=${kafkaEmbedded.getBrokersAsString()} -Dspring.kafka.consumer.group-id=func-test -jar ${projectDir}/build/libs/emb-kafka-${version}.jar"
ready 'Started EmbKafkaApplication' // looks for this text to indicate process has started
}
}
Once the services are up and running, the functionalTest task finally takes over and runs the functional tests.
task functionalTest(type: Test, dependsOn: [assemble, startServer]) {
testClassesDir = sourceSets.functionalTest.output.classesDir
classpath = sourceSets.functionalTest.runtimeClasspath
doFirst {
systemProperty('spring.kafka.producer.bootstrap-servers', kafkaEmbedded.getBrokersAsString())
systemProperty('server.port', bootPort)
}
}
functionalTest.finalizedBy('stopServer')
My functional tests are written using Spock, but any framework could probably be used. I created a base functional test class to house some common functionality. The big two methods in this case send the request to the REST endpoint and attempt to consume the produced message from Kafka. The consume() method takes a topic to consume from, the key value to look for, and a couple of parameters to control timing and retries. If you don’t use keys in your system, you could probably look for some other value in your consumed message to indicate the message you are looking for.
String consume(String topic, String key, int maxRetries, long pollMs) {
int retry = 0
String message = null
KafkaConsumer<String, String> consumer = createKafkaConsumer(topic)
while (!message && retry < maxRetries) {
retry++
ConsumerRecords consumerRecords = consumer.poll(pollMs)
consumerRecords.each { ConsumerRecord record ->
if (record.key() == key) {
message = record.value()
}
}
consumer.commitSync()
}
consumer.close()
return message
}
My simple test just sends a message to the web service and tries to consume it. Since a real service is probably transforming the message in some way, it would also make sense to verify those transforms are being performed correctly.
void 'testConsume'() {
given:
String key = '1'
String json = """{"key":"${key}", "data": "foo"}"""
when:
postJson('/test', json)
then:
consume('output-topic', key, 5, 500)
}
When the tests have completed, the stopServer task is executed which shuts down the Spring Boot service and the embedded Kafka instance with the after() method call.
task stopServer(type: KillProcessTask) {
doFirst {
kafkaEmbedded?.after()
}
}
With this basic framework, functional tests with Kafka are super easy and that makes developers want to write more tests!
Aggregate Services into a Single Swagger
Automatically mask sensitive fields in Log4j 2 to protect your users privacy and comply with PCI standards.
Real Time Chat Application with Kotlin and Firebase
Brendon has over 10 years of software development experience at organizations large and small. He craves learning new technologies and techniques and lives in and understands large enterprise application environments with complex software and hardware architectures.