Guillermo Portas

Software Engineer

Writing integration tests for Spring Boot and RabbitMQ

May 27, 2020

Some time ago I worked on a project that had a Spring Boot web service, connected to RabbitMQ to manage background tasks.

If you’ve never heard of these technologies, Spring Boot is one of the most used frameworks today for creating web applications and RabbitMQ is a message broker that is also widely used. So it is very common to find projects that use this technology stack.

Under this scenario, I needed to cover the interaction between the Spring Boot application and the RabbitMQ message broker with integration tests.

Until then, all application operations covered by integration tests were synchronous, such as database queries. But for this scenario, the sending and receiving of messages between the Spring Boot application and the RabbitMQ broker are asynchronous.

Even though it is a common scenario, I couldn’t find much information or ideas to test this integration. That’s why I decided to write this post: to share with you a possible approach to test the integration between Spring Boot and RabbitMQ.

Starting point

Through the following diagram the scenario is easier to understand:

Diagram

On the one hand, we have the Spring Boot application, which connects to the RabbitMQ broker through the Publisher component, which is responsible for publishing messages in RabbitMQ with information about an operation to be executed in the background.

On the other hand, the Listener component is responsible for listening to the RabbitMQ broker to consume the queued messages. This component transfers the information from the message to the Service component that will execute the logic necessary to perform the operation in the background. In addition to Listener, it can also be called Consumer. It is possible that in similar scenarios it may appear with this nomenclature.

Components to test

For this post, I will focus on the Publisher and Listener components and how to test their integrations with RabbitMQ.

Before writing the tests, to better understand the context, I share below the code of the components that we are going to test.

Let’s suppose that our application manages content related to textbooks and that the background operation will update the edition of a specific book.

Publisher

@Component
open class UpdateBookEditionQueuePublisher(
private val rabbitTemplate: RabbitTemplate
) : IUpdateBookEditionQueuePublisher {

    @Throws(EntityUpdateException::class)
    override fun publishMessage(bookTitle: String, newEdition: Int) {
        val message = UpdateBookEditionMessage(bookTitle, newEdition)
        try {
            rabbitTemplate.convertAndSend(
                    UPDATE_BOOK_EDITION_ROUTING_KEY,
                    message
            )
        } catch (e: AmqpException) {
            throw EntityUpdateException(e.message)
        }
    }
}

Listener

@Component
class UpdateBookEditionQueueListener(
        @Autowired private val booksService: IBooksService
) {

    @RabbitListener(
            id = "update-book-edition-queue-listener",
            queuesToDeclare = [
                Queue(
                        UPDATE_BOOK_EDITION_QUEUE_NAME
                )
            ]
    )
    @Throws(EntityUpdateException::class)
    fun onMessageReceived(@Payload message: UpdateBookEditionMessage) {
        booksService.updateBookEdition(
                message.bookTitle, 
                message.newEdition
        )
    }
}

Easy, isn’t it? The code speaks for itself!

Service

Regarding this component, it is enough to know only the updateBookEdition method of its interface, which is the one that the Listener invokes.

interface IBooksService {
    
    . . .
    
    @Throws(EntityUpdateException::class)
    fun updateBookEdition(bookTitle: String, newEdition: Int)

    . . .

}

Configurations and properties

All that’s left is to add the configurations of the components and application properties for Spring Boot to work with RabbitMQ:

@Configuration
class RabbitMQConfiguration {

    @Autowired
    lateinit var rabbitTemplate: RabbitTemplate

    @Bean
    fun jackson2MessageConverter(
            objectMapper: ObjectMapper
    ) = Jackson2JsonMessageConverter(objectMapper)

    @Bean
    fun updateBookEditionQueuePublisher(): IUpdateBookEditionQueuePublisher 
            = UpdateBookEditionQueuePublisher(rabbitTemplate)

    @Bean
    fun updateBookEditionQueue() =
            Queue(UPDATE_BOOK_EDITION_QUEUE_NAME)
}

As we can see, the RabbitMQ configuration is simple. This is because, due to the simplicity of the scenario, we use the Default Exchanger. If you want to know more about the different RabbitMQ exchangers, I suggest you to take a look at the documentation.

Finally, the configuration of the application (application.properties):

spring.rabbitmq.host=<your_rabbitmq_host>
spring.rabbitmq.port=<your_rabbitmq_port>
spring.rabbitmq.username=<your_rabbitmq_username>
spring.rabbitmq.password=<your_rabbitmq_password>
spring.rabbitmq.virtual-host=<your_rabbitmq_virtual_host>

Let’s go with the tests!

To run integration tests on RabbitMQ, it is necessary to have a test environment with a RabbitMQ broker equivalent to the production environment. In my case, having RabbitMQ broker dockerized, it was easy to replicate it in different environments.

Once we have the test RabbitMQ, we can create a configuration (application.properties) for the test environment:

spring.rabbitmq.host=<your_test_rabbitmq_host>
spring.rabbitmq.port=<your_test_rabbitmq_port>
spring.rabbitmq.username=<your_test_rabbitmq_username>
spring.rabbitmq.password=<your_test_rabbitmq_password>
spring.rabbitmq.virtual-host=<your_test_rabbitmq_virtual_host>

Publisher Integration Test

Everything is ready to write our first integration test. Let’s start with the Publisher:

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueuePublisherIntegrationTest {

    @Autowired
    private lateinit var sut: IUpdateBookEditionQueuePublisher

    @Test
    fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
        
    }
}

As we can see, we want to test that the Publisher component successfully publishes a message to the RabbitMQ queue. To do this, the first thing we have to do is declare the sending of a test message to the queue.

. . .

@Test
fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
    sut.publishMessage(
            TestConstants.FAKE_TITLE, 
            TestConstants.FAKE_EDITION
    )
    // Assert that the message is published in queue
}

. . .

Once the message is sent, how do we verify that it has been successfully queued to the RabbitMQ message queue? The first thing that came to my mind was something like this:

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@Test
fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
    sut.publishMessage(
            TestConstants.FAKE_TITLE, 
            TestConstants.FAKE_EDITION
    )
    assertTrue(isMessagePublishedInQueue())
}

private fun isMessagePublishedInQueue(): Boolean {
    val queueMessageCount = rabbitTemplate.execute {
        it.queueDeclare(
                UPDATE_BOOK_EDITION_QUEUE_NAME,
                true,
                false, 
                false, 
                null
        )
    }.messageCount

    val queuedMessage = rabbitTemplate
            .receiveAndConvert(
                    UPDATE_BOOK_EDITION_QUEUE_NAME
            ) as UpdateBookEditionMessage

    return queueMessageCount == 1 
            && queuedMessage.bookTitle == TestConstants.FAKE_TITLE 
            && queuedMessage.newEdition == TestConstants.FAKE_EDITION
}

. . .

At first glance we could say that this approach makes sense. First, to make sure that the message queue is empty before running the test, we purge it before and after running it. To perform this operation we use RabbitAdmin.

We use RabbitTemplate to query the message queue, getting its number of queued messages and the first queued. This information is sufficient to determine that only one message has been received and it is the one that we have sent in the test.

However, it will be a matter of luck if this test appears in green, and if it does it will be occasionally, since there are two points that we are not considering:

  • The Listener component is active since the Spring Boot application starts, so the Listener will try to consume the message once it is queued.
  • Queuing is an asynchronous operation, so it is possible that at the time we consult the message queue, the message is not yet in the queue.

The point that the listener is continuously active is quite easy to solve, since we can disable the automatic activation of the listeners at the start of the application through the following line in the test application.properties file:

spring.rabbitmq.listener.simple.auto-startup=false

To solve the point of asynchronous queuing, I used an useful library for writing tests for asynchronous operations, Awaitility. It is a DSL (Domain Specific Language) that allows us to write expectations on an asynchronous system in a concise and easy to read way.

Using Awaitility, the test finally ended up as follows:

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueuePublisherIntegrationTest {

    @Autowired
    private lateinit var sut: IUpdateBookEditionQueuePublisher

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @Autowired
    private lateinit var rabbitAdmin: RabbitAdmin

    @Before
    fun setUp() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @After
    fun tearDown() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @Test
    fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
        sut.publishMessage(
                TestConstants.FAKE_TITLE, 
                TestConstants.FAKE_EDITION
        )
        await().atMost(30, TimeUnit.SECONDS)
                .until(isMessagePublishedInQueue(), `is`(true))
    }

    private fun isMessagePublishedInQueue(): Callable<Boolean> {
        return Callable {
            val queueMessageCount = rabbitTemplate.execute {
                it.queueDeclare(
                        UPDATE_BOOK_EDITION_QUEUE_NAME,
                        true,
                        false,
                        false,
                        null)
            }.messageCount

            val queuedMessage = rabbitTemplate
                    .receiveAndConvert(
                            UPDATE_BOOK_EDITION_QUEUE_NAME
                    ) as UpdateBookEditionMessage

            queueMessageCount == 1
                    && queuedMessage.bookTitle == TestConstants.FAKE_TITLE
                    && queuedMessage.newEdition == TestConstants.FAKE_EDITION
        }
    }
}

As we can see, I finally created an assertion with Awaitility with which, for a maximum time of 30 seconds, we verify if the message has been published successfully. If the maximum time is consumed, the test would fail.

To do this, isMessagePublishedInQueue method has to be modified a bit, so that it returns an object of type Callable<Boolean>, which is necessary for the Awaitility assertion.

Listener Integration Test

Once I did the integration test for the Publisher component, writing an integration test for the Listener component was easy, since I had all the necessary tools.

Let’s see how I did it!

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueueListenerIntegrationTest {

    @Test
    fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
        
    }
}

We are going to test that the Listener component successfully consumes the queued messages from the RabbitMQ queue and that the message data is sent to the corresponding Service.

The first thing we are going to do is to send a test message:

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@Test
fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
    sendTestMessageToQueue()
    // TODO
}

private fun sendTestMessageToQueue() {
    rabbitTemplate.convertAndSend(
            UPDATE_BOOK_EDITION_ROUTING_KEY,
            UpdateBookEditionMessage(
                    TestConstants.FAKE_TITLE, 
                    TestConstants.FAKE_EDITION
            )
    )
}

. . .

As we can see, just like in the Publisher integration test, we purge the queue with RabbitAdmin before and after running the test to make sure that the message queue is empty before each test run.

Then, with RabbitTemplate, we send the test message.

The next step is to activate the Listener component. Remember that the automatic start of the Listener was disabled through the property added to the properties file of the test application.

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Autowired
private lateinit var rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry

@MockBean
private lateinit var booksServiceDouble: IBooksService

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    rabbitListenerEndpointRegistry.stop()
}

@Test
fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
    sendTestMessageToQueue()
    startRabbitListener()
    Mockito.verify(booksServiceDouble, Mockito.times(1))
            .updateBookEdition(
                    TestConstants.FAKE_TITLE,
                    TestConstants.FAKE_EDITION
            )
}

private fun sendTestMessageToQueue() {
    rabbitTemplate.convertAndSend(
            UPDATE_BOOK_EDITION_ROUTING_KEY,
            UpdateBookEditionMessage(
                    TestConstants.FAKE_TITLE, 
                    TestConstants.FAKE_EDITION
            )
    )
}
    
private fun startRabbitListener() {
    rabbitListenerEndpointRegistry.getListenerContainer(
            "update-book-edition-queue-listener"
    ).start()
}

. . .

We activate the Listener using RabbitListenerEndpointRegistry. Through the getListenerContainer method we obtain the specific Listener that we are testing with an identifier that we pass as a parameter. Specified identifier must be the same that appears in the @RabbitListener annotation in the Listener component class.

Once the Listener starts, it checks for queued messages and, if so, proceeds to consume them. To validate that the Listener works correctly, we check that it receives the test message and invokes the Service by sending the data of the consumed message.

Once the test is run, we deactivate the listener again through the RabbitListenerEndpointRegistry in the tearDown method, to maintain idempotency in test executions, as we do when purging the queue.

It may seem that everything makes sense, but again we have to use Awaitility, to avoid problems that can occur with the different asynchronous events on the message queue.

Finally, we check that the queue is empty after consuming the message.

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueueListenerIntegrationTest {

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @Autowired
    private lateinit var rabbitAdmin: RabbitAdmin

    @Autowired
    private lateinit var rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry

    @MockBean
    private lateinit var booksServiceDouble: IBooksService

    @Before
    fun setUp() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @After
    fun tearDown() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
        rabbitListenerEndpointRegistry.stop()
    }

    @Test
    fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled {
        sendTestMessageToQueue()
        awaitForFilledQueue()
        startRabbitListener()
        awaitForEmptyQueue()
        Mockito.verify(booksServiceDouble, Mockito.times(1))
                .updateBookEdition(
                        TestConstants.FAKE_TITLE,
                        TestConstants.FAKE_EDITION
                )
    }

    private fun sendTestMessageToQueue() {
        rabbitTemplate.convertAndSend(
                UPDATE_BOOK_EDITION_ROUTING_KEY,
                UpdateBookEditionMessage(
                        TestConstants.FAKE_TITLE,
                        TestConstants.FAKE_EDITION
                )
        )
    }

    private fun awaitForFilledQueue() {
        await().atMost(30, TimeUnit.SECONDS)
                .until(isQueueEmpty(), CoreMatchers.`is`(false))
    }

    private fun startRabbitListener() {
        rabbitListenerEndpointRegistry.getListenerContainer(
                "update-book-edition-queue-listener"
        ).start()
    }

    private fun awaitForEmptyQueue() {
        await().atMost(30, TimeUnit.SECONDS)
                .until(isQueueEmpty(), CoreMatchers.`is`(true))
    }

    private fun isQueueEmpty(): Callable<Boolean> {
        return Callable {
            val queueMessageCount = rabbitTemplate.execute {
                it.queueDeclare(
                        UPDATE_BOOK_EDITION_QUEUE_NAME,
                        true,
                        false,
                        false,
                        null)
            }.messageCount

            queueMessageCount == 0
        }
    }
}

That’s all! This is how the integration test for the Listener component ends up.

Conclusion

In this post, I have shared with you my experience writing integration tests for RabbitMQ and Spring Boot. Hope you found it interesting to use the Awaitility library to test asynchronous scenarios.

I’m sure there are other ways to do this or to improve my solution, so feel free to share your experiences with me.

I will also be happy to answer any questions you may have!

Share This Post