Gewusst wie: RxJava2: Series Intro und PublishProcessor

Ich habe die letzten Tage in einer bescheidenen Menge von Decken auf einem Computerstuhl vor meinem PC verbracht und mich immer wieder durch den Javadoc von RxJava2 gebahnt, um eine Art Verständnis dafür zu erlangen. Vielleicht bin ich ein bisschen düster, oder (hoffentlich und anscheinend wahrscheinlicher) RxJava2 ist einfach eine verwirrende Bibliothek. Ich bin von der Reactive Streams-Dokumentation zu den 2010 Rx.NET-Designrichtlinien übergegangen und habe praktisch auf jeder Seite von reactionx.io mit einem mühsamen Versuch-und-Irrtum-Prozess begonnen, mit dem ich wirklich angefangen habe Machen Sie sich ein Bild davon, wie diese Bibliothek funktioniert. Dieser Artikel ist der erste von vielen, der mein umständliches Stolpern durch die Tiefen von RxJava2 dokumentiert.

Die Hauptidee dieser Serie ist es, diese Konzepte für andere, die RxJava 2 noch nicht kennen, deutlich zugänglicher zu machen. Vielleicht habe ich Pech, oder vielleicht ist dies einfach kein "Ding", aber keine der Informationen, die ich finden konnte, schienen direkt genug. Daher beabsichtige ich, (irgendwann) alle Konzepte in RxJava2 so darzustellen, wie ich sie auf die direkteste und "instruktivste" Art und Weise lerne, die ich kann.

PublishProcessor-Grundlagen

Ein PublishProcessor erweitert die FlowableProcessor-Klasse. In der Erweiterung bedeutet dies, dass ein PublishProcessor gleichzeitig als Verleger und Abonnent fungiert. Aufgrund dieses doppelten Zwecks ist Multicasting aktiviert. Ein PublishProcessor sendet Elemente per Multicast an seine Abonnenten. Dies bedeutet, dass alle Abonnenten dieses PublishProcessors alle ausgegebenen Elemente erhalten, sobald sie von der Quelle [PublishProcessor] ausgegeben werden.

Ein PublishProcessor ist ideal, wenn Sie eine Beziehung zwischen Quelle und Verbraucher benötigen, bei der es eine Quelle und viele Verbraucher derselben Quelle gibt.

Instanziierung

PublishProcessor  processor = PublishProcessor.create ();

So instanziieren Sie in aller Einfachheit einen PublishProcessor. Im obigen Beispiel wird durch die Syntax ein PublishProcessor erstellt, der Objekte des Typs "Object" an seine Abonnenten sendet. Sie können dies jedoch nach Bedarf vornehmen.

Wer kann PublishProcessors abonnieren?

Der Abonnent von Reactive Extensions , das ist wer! Hier ist die grundlegendste funktionierende Subscriber-Implementierung:

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class BaseSubscriber implementiert Subscriber  {

    privates Abonnement Abonnement;

    @Override
    public void onSubscribe (Abonnement Abonnement) {
        System.out.println ("Neuer Abonnent");
        this.subscription = Abonnement;
        Subskriptionsanfrage (1);
    }

    @Override
    public void onNext (Objekt o) {
        System.out.println ("Got:" + o);
        Subskriptionsanfrage (1);
    }

    @Override
    public void onError (Throwable throwable) {
        throwable.printStackTrace ();
    }

    @Override
    public void onComplete () {
        System.out.println ("Complete.");
    }
}

Im obigen Code sind einige wichtige Punkte zu beachten. Zunächst müssen Sie bei der onSubscribe-Methode sicherstellen, dass ein Verweis auf das Subscription-Objekt gespeichert ist. In diesem Fall können Sie subscription.request (1) in der onNext-Methode verwenden. WIRKLICH WICHTIG - Stellen Sie sicher, dass Sie einen Aufruf zum Anfordern des Abonnements innerhalb des onSubscribe-Methodentexts ausführen. Dadurch wird sichergestellt, dass die Daten ordnungsgemäß fließen, wenn onNext aufgerufen wird. Es ist wie eine Achterbahn - onSubscribe ist das Äquivalent zum Einsteigen in die Achterbahn und zum langsamen Aufstieg, aber zum Anhalten vor dem ersten Abstieg. In dieser Analogie würde der erste "Drop" erst dann auftreten, wenn der erste onNext-Aufruf erfolgt. Dann können die Daten frei fließen oder bis onComplete / onError auftritt.

Im onNext-Methodenkörper findet die "Magie" statt. Hier sollten Sie die Geschäftslogik für alle Ihre Anforderungen bereitstellen. Die einzige Voraussetzung ist, dass Sie subscription.request (Long) aufrufen - dies stellt sicher, dass die Daten weiterhin ordnungsgemäß fließen können.

Die Implementierung der obigen onError-Methode teilt Ihnen einfach mit, wo Sie den Code abgelegt haben, und onComplete sendet nur eine einfache Konsolenmeldung, um Sie darüber zu informieren, dass das Abonnement mit dem PublishProcessor in diesem Fall erfolgreich abgeschlossen wurde.

Etwas zusammensetzen

In diesem Beispiel wird die BaseSubscriber-Beispielklasse von oben verwendet.

public static void main (String [] args) {

    PublishProcessor  processor = PublishProcessor.create ();
    BaseSubscriber subscriberA = new BaseSubscriber ();
    processor.subscribeActual (subscriberA);
    processor.onNext ("Textzeile");
    processor.onNext ("Eine andere Textzeile");
    processor.onNext ("Noch eine Textzeile");
    
    BaseSubscriber subscriberB = neuer BaseSubscriber ();
    processor.subscribeActual (subscriberB);
    processor.onNext ("Eine andere Textzeile");
    processor.onNext ("Letzte Textzeile");
    processor.onComplete ();

}

Alle Anrufe an onNext werden technisch an "alle" Teilnehmer weitergeleitet, aber für die ersten drei Anrufe gibt es nur einen Teilnehmer, sodass die Ausgabe nur einmal erscheinen sollte. Beim vierten und fünften Aufruf von onNext haben wir Teilnehmer B hinzugefügt, dh, wir sollten diese beiden Aufrufe in der Ausgabe zweimal wiederholen. Schließlich wird der Anruf an onComplete auch an alle Abonnenten gesendet, sodass die Abschlussnachricht auch zweimal angezeigt werden sollte (einmal für jeden Abonnenten, dessen onCompleted angerufen wurde).

Ausgabe

Neuer Abonnent
Got: Eine Textzeile
Got: Noch eine Textzeile
Got: Noch eine Textzeile
Neuer Abonnent
Got: Eine andere Textzeile
Got: Eine andere Textzeile
Got: Letzte Textzeile
Got: Letzte Textzeile
Komplett.
Komplett.

Erwartungsgemäß wurde nur bei den ersten drei Aufrufen von onNext eine einzige Textzeile angezeigt, da nur ein Abonnent vorhanden ist. In der fünften Zeile der Ausgabe sehen wir die Meldung "New Subscriber" (Neuer Abonnent), die angibt, dass jetzt zwei Abonnenten vorhanden sind. Und wie erwartet werden die endgültigen Ausgabezeilen alle für die Anzahl der Abonnenten dupliziert, über die sie derzeit verfügen.

Fazit

Dies ist mein erster von vielen Artikeln zu RxJava2 - Ich weiß, es scheint seltsam, mit etwas so Spezifischem wie dem PublishProcessor zu beginnen, aber ich habe externe Projekte, an denen ich arbeite, mit denen diese bestimmte Klasse sehr gut umgehen kann. Die Entscheidung war für mich nur aus der Not heraus getroffen worden. In Zukunft werde ich mich auf grundlegendere Konzepte konzentrieren und von dort aus aufbauen. Hoffe, dass dies jemandem von Nutzen war!