How to Set Up Kafka Integration Test – Grape Up

ByFreda D. Cuevas

Jul 28, 2022 , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ,

Do you take into consideration device testing as not adequate resolution for maintaining the application’s reliability and security? Are you worried that someway or somewhere there is a potential bug hiding in the assumption that device checks should cover all situations? And also is mocking Kafka not ample for project needs? If even one particular answer is  ‘yes’, then welcome to a wonderful and straightforward guide on how to established up Integration Tests for Kafka making use of TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-supply Java library specialised in delivering all necessary remedies for the integration and tests of exterior sources. It usually means that we are ready to mimic an genuine database, net server, or even an party bus atmosphere and address that as a reputable position to test app features. All these fancy attributes are hooked into docker images, outlined as containers. Do we require to take a look at the databases layer with actual MongoDB? No anxieties, we have a exam container for that. We can not also forget about about UI checks – Selenium Container will do nearly anything that we basically require.
In our case, we will focus on Kafka Testcontainer.

What is Embedded Kafka?

As the title indicates, we are likely to offer with an in-memory Kafka instance, all set to be applied as a typical broker with comprehensive features. It will allow us to operate with producers and individuals, as common, earning our integration tests light-weight. 

Prior to we start off

The notion for our test is straightforward – I would like to take a look at Kafka client and producer employing two unique techniques and test how we can use them in real cases. 

Kafka Messages are serialized using Avro schemas.

Embedded Kafka – Producer Take a look at

The notion is quick – let us build a straightforward project with the controller, which invokes a assistance technique to thrust a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also really worth mentioning amazing plugin for Avro. Below plugins section:

id 'org.springframework.boot' model '2.6.8'
id 'io.spring.dependency-management' version '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3."

Avro Plugin supports schema car-building. This is a should-have.

Backlink to plugin:

Now let us define the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "sort": "report",
  "title": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be targeted only on sending messages to Kafka employing a template, practically nothing thrilling about that aspect. Primary features can be done just using this line:

ListenableFuture> future = this.kafkaTemplate.ship("sign up-ask for", kafkaMessage)

We simply cannot overlook about examination properties:

    let-bean-definition-overriding: accurate
      team-id: group_id
      automobile-offset-reset: earliest
      essential-deserializer: org.apache.kafka.frequent.serialization.StringDeserializer
      worth-deserializer: com.grapeup.myawesome.myawesomeconsumer.popular.CustomKafkaAvroDeserializer
      vehicle.register.schemas: correct
      vital-serializer: org.apache.kafka.widespread.serialization.StringSerializer
      price-serializer: com.grapeup.myawesome.myawesomeconsumer.popular.CustomKafkaAvroSerializer
      precise.avro.reader: legitimate

As we see in the described examination houses, we declare a tailor made deserializer/serializer for KafkaMessages. It is remarkably recommended to use Kafka with Avro – really do not let JSONs preserve item composition, let’s use civilized mapper and item definition like Avro.


public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        tremendous(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        tremendous(new MockSchemaRegistryClient(), props)


community class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    general public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        super(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        tremendous(new MockSchemaRegistryClient(), props)

And we have anything to start crafting our check.

@TestInstance(TestInstance.Lifecycle.For every_Course)
@EmbeddedKafka(partitions = 1, topics = "sign-up-request")
course ProducerControllerTest {

All we need to do is add @EmbeddedKafka annotation with detailed topics and partitions. Software Context will boot Kafka Broker with furnished configuration just like that. Keep in mind that @TestInstance should really be made use of with unique thought. Lifecycle.Per_Course will stay clear of generating the identical objects/context for every take a look at process. Really worth examining if checks are as well time-consuming.

Buyer consumerServiceTest
void Set up() 
DefaultKafkaConsumerFactory shopper = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = client.createConsumer()

Listed here we can declare the examination shopper, based on the Avro schema return variety. All Kafka attributes are now furnished in the .yml file. That customer will be used as a test if the producer in fact pushed a concept.

In this article is the genuine take a look at process:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto request = RegisterRequestDto.builder()

                publish("/sign-up-ask for")
                      .content material(objectMapper.writeValueAsBytes(request)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Matter_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.value()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Very first of all, we use MockMvc to conduct an motion on our endpoint. That endpoint makes use of ProducerService to push messages to Kafka. KafkaConsumer is applied to confirm if the producer labored as envisioned. And that’s it – we have a entirely functioning examination with embedded Kafka.

Check Containers – Shopper Take a look at

TestContainers are very little else like independent docker pictures completely ready for being dockerized. The subsequent test state of affairs will be improved by a MongoDB graphic. Why not preserve our facts in the database correct immediately after something transpired in Kafka flow?

Dependencies are not significantly unique than in the prior instance. The next steps are needed for test containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

set('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s concentrate now on the Consumer section. The take a look at situation will be uncomplicated – 1 purchaser support will be responsible for finding the Kafka message and storing the parsed payload in the MongoDB collection. All that we have to have to know about KafkaListeners, for now, is that annotation:

@KafkaListener(subject areas = "sign up-request")

By the features of the annotation processor, KafkaListenerContainerFactory will be dependable to develop a listener on our method. From this moment our approach will react to any upcoming Kafka information with the stated subject matter.

Avro serializer and deserializer configs are the identical as in the prior examination.

About TestContainer, we should commence with the next annotations:

@ActiveProfiles("take a look at")
general public course AbstractIntegrationTest {

All through startup, all configured TestContainers modules will be activated. It usually means that we will get accessibility to the total functioning atmosphere of the selected resource. As case in point:

private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

community static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a final result of booting the examination, we can hope two docker containers to begin with the delivered configuration.

What is actually significant for the mongo container – it presents us comprehensive obtain to the database employing just a uncomplicated connection uri. With these a attribute, we are able to consider a glimpse what is the latest point out in our collections, even all through debug manner and well prepared breakpoints.
Take a glance also at the Ryuk container – it functions like overwatch and checks if our containers have started appropriately.

And listed here is the final aspect of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.insert("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.insert("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("spring.knowledge.mongodb.uri", mongoDBContainer::getReplicaSetUrl)



community void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource provides us the alternative to set all wanted ecosystem variables for the duration of the check lifecycle. Strongly required for any config functions for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each and every listener to get envisioned partitions during container startup.

And the previous element of the Kafka test containers journey – the major human body of the exam:

general public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("register-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").establish())

   //Hold out for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().sizing())

private KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

personal void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   test (KafkaProducer producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord record = new ProducerRecord<>(topicName, registerRequest)

The custom producer is dependable for crafting our message to KafkaBroker. Also, it is encouraged to give some time for customers to take care of messages properly. As we see, the concept was not just consumed by the listener, but also stored in the MongoDB assortment.


As we can see, latest methods for integration assessments are rather simple to apply and preserve in jobs. There is no place in trying to keep just unit checks and counting on all lines coated as a sign of code/logic quality. Now the problem is, ought to we use an Embedded alternative or TestContainers? I counsel first of all focusing on the word “Embedded”. As a great integration exam, we want to get an practically best duplicate of the manufacturing setting with all properties/features provided. In-memory remedies are excellent, but largely, not adequate for big business enterprise jobs. Undoubtedly, the gain of Embedded products and services is the straightforward way to put into practice these types of tests and preserve configuration, just when nearly anything happens in memory.
TestContainers at the first sight may appear like overkill, but they give us the most important function, which is a different surroundings. We never have to even depend on present docker visuals – if we want we can use tailor made kinds. This is a huge enhancement for likely take a look at eventualities.
What about Jenkins? There is no cause to be frightened also to use TestContainers in Jenkins. I firmly propose checking TestContainers documentation on how simply we can established up the configuration for Jenkins brokers.
To sum up – if there is no blocker or any unwanted ailment for applying TestContainers, then don’t wait. It is often excellent to preserve all solutions managed and secured with integration take a look at contracts.

Source backlink