반응형
[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]
본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다
마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#takeUntil-org.reactivestreams.Publisher-
takeUntil 함수는 take 함수에 조건을 설정합니다. 마블다이어그램을 보면서 설명하겠습니다.
마블다이어 그램을 보면 원을 발행중에 6각형이 들어오면 발행을 멈추는 것을 볼 수 있습니다.
즉 take 처럼 일정 개수를 발행하되 다른 Observable이 특정 값을 발행하면 발행을 중단하고 onComplete를 발생시킵니다.
testCode로 알아보겠습니다. CommonUtils와 CommonUtilsk의 내용은 여기에서 확인 할 수 있습니다.
Java
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class takeUntil_test {
public static void main(String[] args){
String[] data = {"1","2","3","4","5","6"};
CommonUtils.exampleStart();
Observable source = Observable.fromArray(data).zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val,notUsed)-> val)
.takeUntil(Observable.timer(500L,TimeUnit.MILLISECONDS)).doOnComplete(()->System.out.println("onComplete"));
source.subscribe(System.out::println);
CommonUtils.sleep(1000);
}
}
Kotlin
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.toObservable
fun main(){
val data = arrayOf("1", "2", "3", "4", "5", "6")
CommonUtils.exampleStart()
val source = data.toObservable()
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS), BiFunction { t1, t2 -> t1 })
.takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS)).doOnComplete({ println("onComplete") })
source.subscribe{it->println(it)}
CommonUtils.sleep(1000)
}
결과
1
2
3
4
onComplete
String array있는 데이터를 100ms 마다 발행을 하는데, 이때 500ms에 완료가되도록 takeUntil을 지정합니다.
그렇기 떄문에 1,2,3,4까지만 데이터가 나오고 500ms에 onComplete가 발생합니다.
반응형
'2023년 이전 > ReativeX' 카테고리의 다른 글
RxJava,RxKotlin - all 함수 (0) | 2020.01.19 |
---|---|
RxJava, RxKotlin - skipUntil 함수 (0) | 2020.01.19 |
RxJava, Rxkotlin 관련 CommonUtils와 CommonUtilsk (0) | 2020.01.19 |
RxJava, RxKotlin - amb 함수 (0) | 2020.01.19 |
RxJava, RxKotlin - Concat 함수 (0) | 2020.01.17 |