Skip to content

Support for multiple/stream-of values in Rx adapter #2120

Open
@edudar

Description

@edudar

Looking through the source code of adapter-rxjava it seems like the only supported scenario is single observable even though all-purpose Observable is allowed as return type. Would it be beneficial to support more generic use-case when observable emits multiple values?

I was able to work this on with the following code (based on okhttp streaming example)

@GET("stream")
@Streaming
Observable<ResponseBody> stream();
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(System.out::println, System.err::println, () -> System.out.println("Completed!"));
retrofit.stream().subscribe(
        body -> {
            try {
                BufferedSource source = body.source();
                Buffer buffer = new Buffer();
                while (!source.exhausted()) {
                    source.read(buffer, 8192);
                    subject.onNext(buffer.readUtf8());
                }
            } catch (Exception e) { subject.onError(e); }
        },
        subject::onError,
        subject::onCompleted);

This code is not the best one for sure, just an illustration. I can receive values emitted through Observable from my service in a streaming fashion now aka server-send events, though. For example, I can request a set of documents from Couchbase that uses Rx natively in their client and receive them as they are emitted by DB instead of waiting for all documents to come together on service and then being sent to me. The code obviously missing body converters so it will become even more verbose in the real world scenario.

Seems like this type of feature can live inside adapter-rxjava.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions