본문 바로가기
2023년 이전/ReativeX

RxJava,RxKotlin - Observable (1)

by JeongUPark 2019. 10. 21.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

(Kotlin code 들은 실행 보았지만, Java code는 따로 실행 해보진 않았습니다.)

 

RxJava 프로그래밍은 Observable에서 시작해서 Obervable로 끝난다 해도 과언이 아닐정도로 중요한 개념입니다.

 

RxJava 1.x 에서는 데이터소스를 Observable과 Single 클래스로 구성되어있고, 2.x에서도 여전히 두 클래스가 존재하지만 Observable은 Observable,Maybe, Flowable 클래스로 구분해 사용합니다.

 

 

Observable은 Observer 패턴을 구현합니다. Observer 패턴은 객체의 상태 변화를 관찰하는 관찰자 목록을 객체에 등록합니다. 그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵서버에게 변화를 알려줍니다.

 

예를 들어 Android에서 Button에 OnClickListenr를 등록 후 Button을 누르면 onClick() 을 호출하는 것이 Observer 패턴의 예입니다.

 

RxJava 의 Observable은 세가지 알림을 구독자에게 전달합니다.

onNext : 데이터 발행을 알림

onComplete : 데이터 발행을 완료했음을 알림. onComplete 이후 onNext 이벤트가 발생하면 안됨

onError : 에러 발생을 알림

 

Observable클래스에는 Observable을 생성하는 함수부터, 결과처리 하는 함수등 다양한 함수가 존재합니다. 그 함수들을 지금부터 알아보도록하겠습니다.

 

just() 함수

just 함수는 Observable을 생성하는 가장간단한 방법입니다. 한 개의 값을 넣을 수도 있고, 여러개를 넣을 수 도 있습니다.(단 같은 타입이어야 합니다.)

 

사용법을 보겠습니다.

java

public static void main(String[] args){
    Observable.just(1,2,3,4,5,6).subscribe(System.out::println);
}

kotlin

fun main() {
    Observable.just(1,2,4,5,6).subscribe(System.out::println)
}

위의 code를 보면 just를 통하여 Observable을 생성하고 데이터 1,2,3,4,5,6을 넣습니다. 그리고 subscribe를 넣은 데이터들을 발행합니다.

 

subscribe() 함수

RxJava는 실제도 실행되기 위해서 사용하는 함수는 subscribe() 함수 입니다. just등으로 데이터 흐름을 정한 후 subscribe 함수를 호출해야 실제로 데이터를 발행합니다.

 

subscribe() 함수를 보면 모두 Disposable 인터페이스 객체를 리턴하고 있습니다. Disposable 인터페이스는 dispose() 함수와 isDispoed() 두가지가 있는데, disposed() 함수는 Obserbable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수 입니다.

 

위에서 Observable이 onComplete()을 호출할 경우 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊습니다. 그러므로 Observable이 정상적으로 onComplete로 종료가 되면 dispose를 호출할 필요가 없습니다.

 

onCreate()

 

just는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만 create는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 합니다.

java

public static void main(String[] args){
    Observable<Integer> source = Observable.create(
            (ObservableEmitter<Integer> emitter)->{
        emitter.onNext(100);
        emitter.onNext(200);
        emitter.onNext(300);
    });
    source.subscribe(System.out::println);
}

kotlin

    Observable.create<Integer> {
        it.onNext(100 as Integer)
        it.onNext(200 as Integer)
        it.onNext(300 as Integer)
    }.subscribe { data-> println(data) }

위 code를 보면 it은 ObservableEmitter로 람다 형식으로 간소하게 작성하였습니다. 그리고 as Integer를 하지 않을 경우 the integer literal does not conform to the expected type integer로 error가 발생하기 때문에 사용 하였습니다. 그리고 data는 변수이르모 원하는 이름으로 작성하시면 됩니다. 마지막으로 kotlin이 더 간소하다는 적을 볼 수 있습니다.

 

onCreate 사용시 주의할 점은 

1. Observable이 구독 해지 되었을 때 등록된 콜백을 모두 해제해야 합니다. 그렇지 않으면 잠재적으로 메모리 누수가 발생합니다.

2. 구독자가 구도갛느 ㄴ동안에만 onNext와 onComplete 이벤트를 호출해야합니다.

3. 에러 발생시 오직 onerror 이벤트로만 에러를 전달해야합니다.

4. 배압을 직접 처리해야 합니다. (배압은 나중에 알아보도록 하겠습니다.)

 

fromArray()

just와 onCreate는 단일 데이터를 다룹니다. 그럼 단일 데이터가 아닐때는 어떻게 해야할까요?  fromXXX()함수를 사용하면됩니다. 그 중에서 fromArray() 함수를 알아보겠습니다.

java code는 다음과 같습니다. 

public static void main(String[] args){
	Integer[] arr = {100,200,300};
    Observable<Integer> source = Observable.fromArray(arr);
    source.subscribe(System.out::println);
}

그리고 kotlin의 경우

fun main(){
    val arr = arrayOf(100, 200, 300)
    val source = arr.toObservable()
    source.subscribe{
        println(it)
    }
}

하면 결과는 

와 같이 나옵니다. 그런데 kotlin의 경우 java와 다르게 fromArray를 안쓰고 toObservable을 사용했는데, 이는 toObservable의 code를 보면

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)

 이렇게 fromArray를 사용하고 있습니다.

그리고 만약에 toObservable을 사용하지 않고 나는 fromArray를 쓰고 싶다 하시는 분은

    val arr = arrayOf(100, 200, 300)
    val source = Observable.fromArray(*arr)
    source.subscribe{
        println(it)
    }

이렇게 *arr로 C의 포인터 처럼 위치의 데이터를 읽어 올 수 있도록 지정해주어야 합니다. 그냥 arr로만 쓸 경우 그 위치 주소만 나타납니다.

 

fromIterable() 함수

Interable 인터페이스는 반복자를 반환 합니다. 즉, iterator 패턴을 구현한 것으로, 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여 합니다. iterator는 다음에 자세히 설명하도록 하겠습니다.

java

public static void main(String[] args){
    List<String> list = new ArrayList<String>();
    list.add("Alpha");
    list.add("Beta");
    list.add("Gamma");

    Observable<String> source = Observable.fromIterable(list);
    source.subscribe(System.out::println);
}

Kotlin

fun main(){
 	val list = mutableListOf("Alpha", "Beta", "Gamma")
    val source =  list.toObservable()
    source.subscribe(System.out::println)
}

fromArray 때와 마찬가지로 fromIterable에서도 kotlin code는 list에 toObservable을 사용했습니다.

하지만 이때 toObservable은 

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)

이렇게 fromIterable을 사용하고 있습니다.  즉, kotlin에서 fromArray와 fromIterable의 경우 toObservable을 사용하면 타입에 따라 선택이 되는 편리함을 확인 할 수 있습니다.

그리고 List 뿐만 아니라 다른 타입에도 적용이 가능합니다. 예를 들어 HashSet이나

fun main(){
    val sublist = HashSet<String>()
    sublist.add("AAA")
    sublist.add("BBB")
    sublist.add("CCC")
    val source =  sublist.toObservable()
    source.subscribe(System.out::println)
}

BlockQueue 객체를 구현하는 ArrayBlockingQueue도 가능합니다.

fun mian(){
    val queue = ArrayBlockingQueue<Order>(100)
    queue.add(Order("A1"))
    queue.add(Order("B1"))
    queue.add(Order("C1"))
    val source = queue.toObservable()
    source.subscribe(System.out::println)
}
data class Order(val id : String){
    override fun toString(): String {
        return "Order id - $id"
    }
}

즉, 배열 또는 iterable<E> 인터페이스로 구현하는 객체들은 kotlin에서는 toObservable로 손쉽게 Observable을 생성할 수 있습니다. java의 경우에는 배열은 fromArray로 , Iterable<E> 인터페이스로 구현하는 객체들은 fromIterable로 Observable을 생성 할 수 있습니다.

 

fromCallable() 함수

Callable 함수는 자바5에 추가된 함수로, Runable 인터페이스 처럼 메서드가 하나고, 인자가 없다는 점에서 비슷하지만 실행결과를 리턴한다는 점에서 차이가 있습니다. 그리고 Executor 인터페이스에서 인자로 활용되기 때문에 잠제적으로 다른 스레드에서 실행되는 것을 의미합니다. 

아무튼 이런 Callable을 이용한 Observable 생성은 다음과 같습니다. (차후 Callable에 대해 자세히 다뤄 보겠습니다.)

 

Java

public static main(String[] ars){
    Callable<String> callable = ()->{
        return "Hello Callabl";
    };
	Observalbe<String> source = Observable.fromCallable(callable);
    source.subscribe(System.out::println);
}

Kotlin

val callable = Callable<String>(){
    return@Callable "Hellow Callable"
}
fun main(){
	Observable.fromCallable(callable).subscribe(System.out::println)
    	// OR
    Observable.fromCallable{
        "Hellow callable"
    }.subscribe(System.out::println)
}

이렇게 사용할 수 있습니다. 그리고 결과를 보면

확인하 실 수 있습니다.

fromFuture() 함수

Future 인터페이스 역시 자바5에 추가된 API로 계산의 결과를 구할 떄 사용합니다.

일반적으로 Executor 인터페이스 구현한 클래스에 Callable 객체를 인자로 넣어 Future객체를 반환합니다(차후 Future에 대해 자세히 다뤄 보겠습니다.)

그럼 fromFuture 사용 법을 보겠습니다.

Java

pulbic static void main(String[] args){
    Future<String> future = Executors.newSingleThreadExecutor().submit(()->{
        return "Hellow Future";
    });
    Observable<String> source = Observable.fromFuture(future);
    source.subscribe(System.out::println);
}

Kotlin

fun main(){
    val future = Executors.newSingleThreadExecutor().submit<String> { "Hellow Future" }
    val source = Observable.fromFuture(future)
    source.subscribe(System.out::println)
}

처럼 구현 할 수 있고 그 결과는 

입니다. 참고로 Executors 클래스는 단일 스레드뿐 아니라 다양한 스레드풀을 지원하는데, ReactiveX에서는 ReativeX에서 제공하는 스케줄러는 활용하도록 권장합니다.

fromPubilsher() 함수

Publisher는 자바9의 표준 Flow API 입니다. (사실 Publisher는 저도 잘 몰라서 현재 공부 중입니다. 나중에 자세히 공부 후 글을 올리도록 하겠습니다.)

Java

pulic static void main(String[] args){
	
    Publisher<String> publisher = new Publisher<String>(){
    	@override
        public void subscribe(Subscriber<? super String> s){
        	s.onNext("Hellow Observable.fromPublisher");
            s.onComplete();
        }
    };
    
    Observable<String> source = Observable.fromPublisher(publisher);
    source.subscribe(System.out::println);
}

Kotlin

    val publisher = Publisher<String>{
            it.onNext("Hellow Observable.fromPublisher")
            it.onComplete()
    }

    val source = Observable.fromPublisher<String>(publisher)
    source.subscribe(System.out::println)

그리고 결과는 

입니다.

반응형