Adding a FluxProcessor
Learn how to handle different types of exchanges in RSocket
In this lesson, we’ll learn about different exchange types in RSocket, such as request/response, fire-and-forget, and monitor channel exchanges. This will also lay the basis for our RSocket.
Handling a request/response RSocket exchange
First, we need to handle an RSocket request/response.
The following methods, added to RSocketService
, should illustrate how both the
itemProcessor
and itemSink
can be put to good use:
@MessageMapping("newItems.request-response") //1public Mono<Item> processNewItemsViaRSocketRequestResponse(Item item) { //2return this.repository.save(item) //3.doOnNext(savedItem -> this.itemSink.next(savedItem)); //4}
Here’s a breakdown of the code above:
-
In line 1, Spring Messaging’s
@MessageMapping
annotation is used to route RSocket messages with a destination ofnewItems.request-response
. -
In line 2, Spring Messaging listens reactively for these messages, then invokes the method with the payload. Notice that the return type is a Reactor type around the domain object,
Item
. This is the signature of the response that’s expected. -
In line 3, we save
Item
to MongoDB using a reactive repository. -
In line 4,
.doOnNext()
grabs the newly savedItem
...