본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - takeUntil 함수

by JeongUPark 2020. 1. 19.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#takeUntil-org.reactivestreams.Publisher-

 

takeUntil 함수는 take 함수에 조건을 설정합니다. 마블다이어그램을 보면서 설명하겠습니다.

 

마블다이어 그램을 보면 원을 발행중에 6각형이 들어오면 발행을 멈추는 것을 볼 수 있습니다.

 

즉 take 처럼 일정 개수를 발행하되 다른 Observable이 특정 값을 발행하면 발행을 중단하고 onComplete를 발생시킵니다.

 

testCode로 알아보겠습니다.  CommonUtils와 CommonUtilsk의 내용은 여기에서 확인 할 수 있습니다.

Java

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class takeUntil_test {

    public static void main(String[] args){

        String[] data = {"1","2","3","4","5","6"};
        CommonUtils.exampleStart();
        Observable source = Observable.fromArray(data).zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val,notUsed)-> val)
                .takeUntil(Observable.timer(500L,TimeUnit.MILLISECONDS)).doOnComplete(()->System.out.println("onComplete"));

        source.subscribe(System.out::println);

        CommonUtils.sleep(1000);

    }
}

 

Kotlin

 

import io.reactivex.Observable
import java.util.concurrent.TimeUnit
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.toObservable

fun main(){

    val data = arrayOf("1", "2", "3", "4", "5", "6")
    CommonUtils.exampleStart()
    val source = data.toObservable()
        .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS), BiFunction { t1, t2 -> t1 })
        .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS)).doOnComplete({ println("onComplete") })

    source.subscribe{it->println(it)}

    CommonUtils.sleep(1000)

}

 

결과

1
2
3
4
onComplete

 

String array있는 데이터를 100ms 마다 발행을 하는데, 이때 500ms에 완료가되도록 takeUntil을 지정합니다.

그렇기 떄문에 1,2,3,4까지만 데이터가 나오고 500ms에 onComplete가 발생합니다.

반응형