Adding a FluxProcessor

Learn how to handle different types of exchanges in RSocket

In this lesson, we’ll learn about different exchange types in RSocket, such as request/response, fire-and-forget, and monitor channel exchanges. This will also lay the basis for our RSocket.

Handling a request/response RSocket exchange

First, we need to handle an RSocket request/response. The following methods, added to RSocketService, should illustrate how both the itemProcessor and itemSink can be put to good use:

@MessageMapping("newItems.request-response") //1
public Mono<Item> processNewItemsViaRSocketRequestResponse(Item item) { //2
return this.repository.save(item) //3
.doOnNext(savedItem -> this.itemSink.next(savedItem)); //4
}
Handling a request-response RSocket exchange

Here’s a breakdown of the code above:

  1. In line 1, Spring Messaging’s @MessageMapping annotation is used to route RSocket messages with a destination of newItems.request-response.

  2. In line 2, Spring Messaging listens reactively for these messages, then invokes the method with the payload. Notice that the return type is a Reactor type around the domain object, Item. This is the signature of the response that’s expected.

  3. In line 3, we save Item to MongoDB using a reactive repository.

  4. In line 4, .doOnNext() grabs the newly saved Item ...