Schedulers and Publishers
Learn about Schedulers and Publishers for heavy computations in Java.
We'll cover the following
For some heavy computations, we may want to run them in the background, while rendering the result in a separate thread so as not to block the UI or rendering thread. For this case, we can use the observeOn
method with a different Scheduler.
public static void runComputation() throws Exception {
StringBuffer sb = new StringBuffer();
Flowable<String> source = Flowable.fromCallable(() -> { //1
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
source.doOnComplete(() -> System.out.println("Completed runComputation"));
Flowable<String> background = source.subscribeOn(Schedulers.io()); //2
Flowable<String> foreground = background.observeOn(Schedulers.single());//3
foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
- Create a new Flowable from a Callable, functional interface (SAM) that simply returns a value.
- Run the Flowable using the “IO” Scheduler, which is good for blocking code like database calls or reading files.
- Observe the results of the Flowable using a single threaded Scheduler.
- Finally, subscribe to the resulting
foreground
Flowable to initiate the flow and print the results to standard out.
Publishers
For non-trivial problems, we might need to create our own Publisher
.
For the following example, imagine you want to write to a file or read from a file using a custom Publisher in RxJava.
First, we write a range of numbers to a file using the following method:
public static void writeFile(File file) {
try (PrintWriter pw = new PrintWriter(file)) {
Flowable.range(1, 100)
.observeOn(Schedulers.newThread())
.blockingSubscribe(pw::println);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
Here, we use a try-with-resources block and blockingSubscribe
to write the range to the file.
Second, we want to read from a file. In this example, the contents of a file are printed to standard out using the “IO” Scheduler:
public static void readFile(File file) {
try (final BufferedReader br = new BufferedReader(new FileReader(file))) {
Flowable<String> flow = Flowable.fromPublisher(new FilePublisher(br));
flow.observeOn(Schedulers.io())
.blockingSubscribe(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
A Publisher implements the subscribe
method that takes a Subscriber
. The Subscriber
interface
has several methods on it; the first of which to call is onSubscribe(Subscription)
. To implement
backpressure in reactive streams, the Subscription
interface was created, which has only two
methods: request(n)
for requesting the next n
elements and cancel
for cancelling the subscription.
static class FilePublisher implements Publisher < String > {
BufferedReader reader;
public FilePublisher(BufferedReader reader) {
this.reader = reader;
}
@Override
public void subscribe(Subscriber << ? super String > subscriber) {
subscriber.onSubscribe(
new FilePublisherSubscription(this, subscriber));
}
public String readLine() throws IOException {
return reader.readLine();
}
}
static class FilePublisherSubscription implements Subscription {
FilePublisher publisher;
Subscriber << ? super String > subscriber;
public FilePublisherSubscription(FilePublisher publisher,
Subscriber << ? super String > subscriber) {
this.publisher = publisher;
this.subscriber = subscriber;
}
@Override
public void request(long n) {
try {
String line;
for (int i = 0; i < n && publisher != null &&
(line = publisher.readLine()) != null; i++) {
if (subscriber != null) subscriber.onNext(line);
}
} catch (IOException ex) {
subscriber.onError(ex);
}
subscriber.onComplete();
}
@Override
public void cancel() {
publisher = null;
}
}
This example shows how we might implement a Publisher for reading files including backpressure support. A similar approach could be used for any Publisher or Subscription implementation.
Get hands-on with 1400+ tech skills courses.