Schedulers and Publishers

Learn about Schedulers and Publishers for heavy computations in Java.

We'll cover the following

For some heavy computations, we may want to run them in the background, while rendering the result in a separate thread so as not to block the UI or rendering thread. For this case, we can use the observeOn method with a different Scheduler.

public static void runComputation() throws Exception {
   StringBuffer sb = new StringBuffer();
   Flowable<String> source = Flowable.fromCallable(() -> { //1
      Thread.sleep(1000); // imitate expensive computation
      return "Done";
   });
source.doOnComplete(() -> System.out.println("Completed runComputation"));

Flowable<String> background = source.subscribeOn(Schedulers.io()); //2

Flowable<String> foreground = background.observeOn(Schedulers.single());//3

foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
  1. Create a new Flowable from a Callable, functional interface (SAM) that simply returns a value.
  2. Run the Flowable using the “IO” Scheduler, which is good for blocking code like database calls or reading files.
  3. Observe the results of the Flowable using a single threaded Scheduler.
  4. Finally, subscribe to the resulting foreground Flowable to initiate the flow and print the results to standard out.

Publishers

For non-trivial problems, we might need to create our own Publisher.

For the following example, imagine you want to write to a file or read from a file using a custom Publisher in RxJava.

First, we write a range of numbers to a file using the following method:

public static void writeFile(File file) {
     try (PrintWriter pw = new PrintWriter(file)) {
         Flowable.range(1, 100)
           .observeOn(Schedulers.newThread())
           .blockingSubscribe(pw::println);
     } catch (FileNotFoundException e) {
          e.printStackTrace();
     }
}

Here, we use a try-with-resources block and blockingSubscribe to write the range to the file.

Second, we want to read from a file. In this example, the contents of a file are printed to standard out using the “IO” Scheduler:

public static void readFile(File file) {
     try (final BufferedReader br = new BufferedReader(new FileReader(file))) {

        Flowable<String> flow = Flowable.fromPublisher(new FilePublisher(br));

        flow.observeOn(Schedulers.io())
.blockingSubscribe(System.out::println);

     } catch (IOException e) {
         e.printStackTrace();
     }
}

A Publisher implements the subscribe method that takes a Subscriber. The Subscriber interface has several methods on it; the first of which to call is onSubscribe(Subscription). To implement backpressure in reactive streams, the Subscription interface was created, which has only two methods: request(n) for requesting the next n elements and cancel for cancelling the subscription.

static class FilePublisher implements Publisher < String > {
    BufferedReader reader;
    public FilePublisher(BufferedReader reader) {
        this.reader = reader;
    }
    @Override
    public void subscribe(Subscriber << ? super String > subscriber) {
        subscriber.onSubscribe(
            new FilePublisherSubscription(this, subscriber));
    }
    public String readLine() throws IOException {
        return reader.readLine();
    }
}

static class FilePublisherSubscription implements Subscription {
    FilePublisher publisher;
    Subscriber << ? super String > subscriber;
    public FilePublisherSubscription(FilePublisher publisher,
        Subscriber << ? super String > subscriber) {
        this.publisher = publisher;
        this.subscriber = subscriber;
    }
    @Override
    public void request(long n) {
        try {
            String line;
            for (int i = 0; i < n && publisher != null &&
                (line = publisher.readLine()) != null; i++) {
                if (subscriber != null) subscriber.onNext(line);
            }
        } catch (IOException ex) {
            subscriber.onError(ex);
        }
        subscriber.onComplete();
    }
    @Override
    public void cancel() {
        publisher = null;
    }
}

This example shows how we might implement a Publisher for reading files including backpressure support. A similar approach could be used for any Publisher or Subscription implementation.

Get hands-on with 1300+ tech skills courses.