본문 바로가기
2023년 이전/ReativeX

RxJava,RxKotlin - Observable (4) - Subject

by JeongUPark 2019. 10. 22.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다


앞선 글에서 Subject는 차가운 Observable은 뜨거운 Observable로 바꿔준다고 소개했습니다. Subject의 특징은 Observable의 속성과 구독자의 속성을 모두 가지고 있다는 점입니다. 즉 데이터를 발행할 수 도 있고, 데이터를 받아서 치러할 수 도 있다는 말이 됩니다.

이런 Subject에서 제공하는 각 클래스를 알아 보겠습니다.

 

AsyncSubject

AsyncSubject는 Observable에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 입니다. 마지막 데이터만 받고 이전 데이터들은 무시합니다

AsyncSubject 마블 다이어 그램

위의 마블 다이어그램을 보면, 구독자가 구독을 하면 이전 데이터들은 다 무시하고 Observable이 발행하는 데이터중 완료 전 데이터만 받습니다.  완료 후에 구독을 하면 완료 전 데이터를 받습니다.

 

Code로 확인해 보면

Java

public static void main(String[] args){
  		AsyncSubject<String> a_Subject = AsyncSubject.create();
        a_Subject.subscribe(data->System.out.println("Subscriber #1 => "+data));
        a_Subject.onNext("AA");
        a_Subject.onNext("BB");
        a_Subject.onNext("CC");
        a_Subject.subscribe(data->System.out.println("Subscriber #2 => "+data));
        a_Subject.onComplete();
        a_Subject.onNext("DD");
        a_Subject.subscribe(data->System.out.println("Subscriber #3 => "+data));
}

 Kotlin

fun main(){
    val a_Subject = AsyncSubject.create<String>()
    a_Subject.subscribe{data -> println("Subscriber #1 => $data")}
    a_Subject.onNext("AA")
    a_Subject.onNext("BB")
    a_Subject.onNext("CC")
    a_Subject.subscribe{data -> println("Subscriber #2 => $data")}
    a_Subject.onComplete()
    a_Subject.onNext("DD")
    a_Subject.subscribe{data -> println("Subscriber #3 => $data")}
}

이렇게 되고 결과는 

이런 결과가 나옵니다. 결과로 알 수 있듯이 값을 계속 발행하지만 onComplete가 되지 않는 이상 구독자는 데이터를 받을 수 없습니다. 그리고 onComplete이 후에도 데이터를 발행하여도 subscribe를 하면 완료전 발행한 데이터를 출력합니다. 즉, 완료 후에는 새로운 데이터를 발행해도 완료전 데이터를 사용하는 것을 할 수 있습니다.

그리고 AsyncSubject 클ㅋ래스는 구독자로도 동작할 수 있습니다. AsyncSubject클래스를 Observable의 동작하는 Code를 보겠습니다.

Java

public void static main(String args[]){
	Float[] temperature = {10.1f,13.4f,12.5f};
    Observable<Float> source = Observable.fromArray(temperature);
    
    AsyncSubject<Float> subject = AsyncSubject.create();
    subject.subscribe(data-> System.out.println("subscriber #1=> "+data));
    
    source.subscribe(subject);
}

kotlin

fun main(){

	val temperature = arrayOf(10.1f, 13.4f, 12.5f)
    val source = temperature.toObservable()

    val subject = AsyncSubject.create<Float>()
    subject.subscribe { data -> println("subscriber #1=> " + data!!) }

    source.subscribe(subject)
}

이렇게 Observable 생성 후 subscribe에 AsyncSuject를 적용하면

array의 마지막 데이터 12.5가 출력되는 것을 확인 할 수 있습니다.

 

즉, AsyncSuject는 Observable가 Complete되기 전 데이터를 받는 것을 확인 할 수 있습니다.

BehaviorSubject

BehaviorSubject는 구독을 하면 가장 최근 값 혹은 기본값을 넘겨주는 클래스 입니다.

BehaviorSubject 마블다이어그램

Behavior마블다이어그램을 보면 첫번째 구독자는 기본값이 핑크색 원을 받고, 그다음 발행되는 빨간원 초록원 파랑원 순으로 데이터를 받습니다. 두번째 구독자는 구독전 가장 최신 값이 초록원을 받고 파랑원을 받습니다.

즉, 가장 최신값을 수신합니다. 위의 AsyncSubject랑 다른 점은 AsyncSubject는 완료 시 마지막 데이터를 받는데(위의 경우 구독자1 구독자2 둘다 파랑색을 받을 것입니다.) BehaviorSubject는 항상 최신값을 수신합니다.

구현 code를 보면

Java

public static void main(String[] args){
        BehaviorSubject<String> b_subject = BehaviorSubject.createDefault("AA");
        b_subject.subscribe(data->System.out.println("Subscribe #1=> "+data));
        b_subject.onNext("BB");
        b_subject.onNext("CC");
        b_subject.subscribe(data->System.out.println("Subscribe #2=> "+data));
        b_subject.onNext("DD");
        b_subject.onComplete();

    }

Kotlin

fun main(){
    val b_subject = BehaviorSubject.createDefault("AA")
    b_subject.subscribe { data -> println("Subscribe #1=> $data") }
    b_subject.onNext("BB")
    b_subject.onNext("CC")
    b_subject.subscribe { data -> println("Subscribe #2=> $data") }
    b_subject.onNext("DD")
    b_subject.onComplete()
}

으로 결과는

각 구독에서 데이터 변경에 따른 데이터를 계속 수신 하고 있는것을 확인 할 수 있습니다.

PublishSubject

가장 평범한 Subject 입니다. 해당시간에 발생한 데이터를 그대로 구독자에게 전달합니다.

그 구현을 보면

Java

public static void main(String[] args) {
        PublishSubject<String> p_Subject = PublishSubject.create();
        p_Subject.subscribe(data->System.out.println("Subscribe #1=> "+data));
        p_Subject.onNext("AA");
        p_Subject.onNext("BB");
        p_Subject.subscribe(data->System.out.println("Subscribe #2=> "+data));
        p_Subject.onNext("CC");
        p_Subject.onNext("DD");
        p_Subject.onComplete();
    }

Kotlin

fun main(){
    val p_Subject = PublishSubject.create<String>()
    p_Subject.subscribe { data -> println("Subscribe #1=> $data") }
    p_Subject.onNext("AA")
    p_Subject.onNext("BB")
    p_Subject.subscribe { data -> println("Subscribe #2=> $data") }
    p_Subject.onNext("CC")
    p_Subject.onNext("DD")
    p_Subject.onComplete()
}

결과는

어떻게 보면 BehaviorSubject와 비슷하다고 생각할 수 있지만, Behavior는 가장 최신 데이터를 받고, PublishSubject는 입력된 그 순간 데이터를 받는 점이 다릅니다.

ReplaySubject

앞서 Subject는 차가운 Observable을 뜨거운 Observable로 변경한다고 설명했습니다. 그런데 이 ReplaySubject는 뜨거운 Observable을 활요하는데 차가운 Observable처럼 동작합니다.

ReplaySubject는 새로운 구독자가 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장합니다.

더 다양한 마블다이어그램은 여기서 확인 하실 수 있습니다.

그 code를 보면

Java

    public static void main(String[] args) {
        ReplaySubject<String> r_subject = ReplaySubject.create();
        r_subject.subscribe(data->System.out.println("Subscribe #1=> "+data));
        r_subject.onNext("AA");
        r_subject.onNext("BB");
        r_subject.subscribe(data->System.out.println("Subscribe #2=> "+data));
        r_subject.onNext("CC");
        r_subject.onComplete();
    }

Kotlin

fun main(){
    val r_subject = ReplaySubject.create<String>()
    r_subject.subscribe { data -> println("Subscribe #1=> $data") }
    r_subject.onNext("AA")
    r_subject.onNext("BB")
    r_subject.subscribe { data -> println("Subscribe #2=> $data") }
    r_subject.onNext("CC")
    r_subject.onComplete()
}

결과는 

첫번째 subscribe는 AA와 BB를 받습니다. 2번째 subscribe는 구독 시 앞에 있던 AA,BB를 받습니다. 그리고 CC가 발행 시 둘다 CC를 받습니다.

 

UnicastSubject

UnicastSubject는 Observable가 데이터를 Queue처럼 쌓아 두었다가, 구독 시점부터 차례대로 전달합니다. 그리고 구독자는 single observer만 허용한다. 즉 subscribe를 1번한 호출 할 수 있습니다다.

 

code를 보면

Java

    public static void main(String[] args) {
        UnicastSubject<String> u_subject =UnicastSubject.create();
        System.out.println("start data");
        u_subject.onNext("AA");
        u_subject.onNext("BB");
        u_subject.onNext("CC");
        System.out.println("subscribe start");
        u_subject.subscribe(data->System.out.println("Subscribe #1=> "+data));
        u_subject.onNext("DD");
        u_subject.onNext("EE");
        u_subject.onNext("FF");
        u_subject.onComplete();
    }

Kotlin

fun main(){
    val u_subject = UnicastSubject.create<String>()
    println("start data")
    u_subject.onNext("AA")
    u_subject.onNext("BB")
    u_subject.onNext("CC")
    println("subscribe start")
    u_subject.subscribe { data -> println("Subscribe #1=> $data") }
    u_subject.onNext("DD")
    u_subject.onNext("EE")
    u_subject.onNext("FF")
    u_subject.onComplete()
}

결과는

이렇게 나옵니다. 보면 AA,BB,CC 데이터가 subscribe 전에 발행했지만 subscribe 하니 다 전달 받은 것을 확인 할 수 있습니다.

그리고 subscribe를 2번하면 

fun main(){
    val u_subject = UnicastSubject.create<String>()
    println("start data")
    u_subject.onNext("AA")
    u_subject.onNext("BB")
    u_subject.onNext("CC")
    println("subscribe start")
    u_subject.subscribe { data -> println("Subscribe #1=> $data") }
    u_subject.onNext("DD")
    u_subject.onNext("EE")
    u_subject.subscribe { data -> println("Subscribe #1=> $data") }
    u_subject.onNext("FF")
    u_subject.onComplete()
}

아래와 같은 결과를 받을 수 있습니다.

start data
subscribe start
Subscribe #1=> AA
Subscribe #1=> BB
Subscribe #1=> CC
Subscribe #1=> DD
Subscribe #1=> EE
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.IllegalStateException: Only a single observer allowed.
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
	at io.reactivex.subjects.UnicastSubject.subscribeActual(UnicastSubject.java:311)
	at io.reactivex.Observable.subscribe(Observable.java:12267)
	at io.reactivex.Observable.subscribe(Observable.java:12253)
	at io.reactivex.Observable.subscribe(Observable.java:12155)
	at UnicastSuject_testKt.main(UnicastSuject_test.kt:13)
	at UnicastSuject_testKt.main(UnicastSuject_test.kt)
Caused by: java.lang.IllegalStateException: Only a single observer allowed.
	... 6 more
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.IllegalStateException: Only a single observer allowed.
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
	at io.reactivex.subjects.UnicastSubject.subscribeActual(UnicastSubject.java:311)
	at io.reactivex.Observable.subscribe(Observable.java:12267)
	at io.reactivex.Observable.subscribe(Observable.java:12253)
Subscribe #1=> FF
	at io.reactivex.Observable.subscribe(Observable.java:12155)
	at UnicastSuject_testKt.main(UnicastSuject_test.kt:13)
	at UnicastSuject_testKt.main(UnicastSuject_test.kt)
Caused by: java.lang.IllegalStateException: Only a single observer allowed.
	... 6 more

error를 보면 중간에 Only a single observer allowed라는 문구를 볼 수 있는데, 이를 통해 아까 이야기한 subscribe를 한번만 쓸 수 있는것을 확인 할 수 있습니다.

 

마블다이어그램참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/Subject.html

반응형