반응형
[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]
본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다
마블다이어그램 참고 -http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#amb-java.lang.Iterable-
amb는 ambiguous(모호한)의 줄임말 입니다. 여러 개의 Observable 중에 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행한 Observable입니다. 이 후 나머지 데이터는 전부 무시합니다.
마블다이어 그램을 보면
이렇게 원이 먼저 들어왔기 때문에 원의 데이터만 발행하고 있습니다.
그럼 Test code를 확인 해보겠습니다.
Java
import io.reactivex.Observable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class amb_test {
public static void main(String[] args) {
String[] data1 = {"1", "3", "5", "7", "9"};
String[] data2 = {"2", "4", "6", "8"};
List<Observable<String>> sources = Arrays.asList(
Observable.fromArray(data1).delay(100L, TimeUnit.MILLISECONDS).doOnComplete(() -> System.out.println("Observable#1 : data1 Complete")),
Observable.fromArray(data2).doOnComplete(() -> System.out.println("Observable#2 : data2 Complete"))
);
CommonUtils.exampleStart();
Observable.amb(sources).doOnComplete(() -> System.out.println("Result: onComplte")).subscribe(it -> {
long time = System.currentTimeMillis() - CommonUtils.startTime;
System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);
});
CommonUtils.sleep(2000);
}
}
public class CommonUtils{
public static long startTime;
public static void exampleStart(){
startTime = System.currentTimeMillis();
}
public static void sleep(int mills){
try{
Thread.sleep(mills);
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static String getThreadName() {
String threadName = Thread.currentThread().getName();
if (threadName.length() > 30) {
threadName = threadName.substring(0, 30) + "...";
}
return threadName;
}
}
kotlin
import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable
import java.util.*
import java.util.concurrent.TimeUnit
fun main(){
val data1 = arrayOf("1", "3", "5", "7", "9")
val data2 = arrayOf("2", "4", "6", "8")
val sources = Arrays.asList(
data1.toObservable().delay(
100L,
TimeUnit.MILLISECONDS
).doOnComplete { println("Observable#1 : data1 Complete") },
data2.toObservable().doOnComplete { println("Observable#2 : data2 Complete") }
)
CommonUtilsk.exampleStart()
Observable.amb(sources).doOnComplete { println("Result: onComplte") }.subscribe { it ->
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
}
CommonUtilsk.sleep(2000)
}
internal object CommonUtilsk {
var startTime: Long = 0
fun exampleStart() {
startTime = System.currentTimeMillis()
}
fun sleep(mills: Int) {
try {
Thread.sleep(mills.toLong())
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
fun getThreadName(): String {
var threadName = Thread.currentThread().name
if (threadName.length > 30) {
threadName = threadName.substring(0, 30) + "..."
}
return threadName
}
}
결과
main | 7 | value = 2
main | 7 | value = 4
main | 7 | value = 6
main | 7 | value = 8
Observable#2 : data2 Complete
Result: onComplte
code를 보면 List에 Observable 2개가 들어가 있습니다. #1의 경우에는 홀수 String 데이터가 #2의 경우에는 짝수 String 데이터가 있습니다. 그리고 #1은 약 100ms 이후 발행합니다.
그래서 amb을 실행한 결과를 보면 data2에 대한 데이터만 나타나고 amb이 complete 되는 것을 확인 할 수 있습니다.
반응형
'2023년 이전 > ReativeX' 카테고리의 다른 글
RxJava, RxKotlin - takeUntil 함수 (0) | 2020.01.19 |
---|---|
RxJava, Rxkotlin 관련 CommonUtils와 CommonUtilsk (0) | 2020.01.19 |
RxJava, RxKotlin - Concat 함수 (0) | 2020.01.17 |
RxJava, RxKotlin - merge 함수 (0) | 2020.01.17 |
RxJava, RxKotlin - CombineLatest 함수 (0) | 2020.01.17 |