Flowable
Learn how Flowables can solve the problem of a fast producer and a slow consumer.
We'll cover the following...
A scenario
Say that, when given a list of image files, we needed to load each image file to memory and apply a CPU-bound function .processBitmap()
, which takes some time to compute.
Press + to interact
File[] imageFiles = // image files to processObservable<File> fileStream = Observable.fromArray(imageFiles).subscribeOn(Schedulers.computation());Observable<Bitmap> bitmapStream =fileStream.map(file -> BitmapFactory.decodeFile(file.getAbsolutePath()));Observable<Pair<Bitmap, Integer>> bitmapZippedStream = Observable.zip(bitmapStream,Observable.range(1, imageFiles.length),Pair::new).doOnNext(pair -> Log.d(TAG, "Produced bitmap: " + pair.second));bitmapZippedStream.observeOn(AndroidSchedulers.mainThread()).subscribe(pair -> {processBitmap(pair.first);Log.d(TAG, "Processed bitmap: " + pair.second);});
Press + to interact
private void processBitmap(Bitmap bitmap) {// Simulate long operationtry {new Thread(() -> {}).sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return bitmap;}
The above code should be fairly straightforward:
- First, a list of image files is converted into an
Observable<File>
. - Using
.map()
, each emission from that stream is then decoded into a bitmap, which produces anObservable<Bitmap>
. - The bitmap stream is then zipped with an integer stream (or
Observable.range(1, imageFiles.length)
) and combined as aPair
. - Each decoded bitmap is then printed on
Logcat
. - Finally, in the
.subscribe()
function, each bitmap is processed via a long operation.processBitmap()
. A thread sleep of 300 milliseconds is performed to simulate this.
1.
Can you guess what might be produced when running the above code?
Show Answer
Q1 / Q1
Did you find this helpful?
Backpressure
To solve the issue, we need a way to notify upstream that it should slow down its production until the consumer downstream can keep up with processing items.
The mechanism by which we can notify upstream that it should slow down its production is called backpressure. However, by using an Observable
, we cannot apply backpressure because Observable
...