본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - amb 함수

by JeongUPark 2020. 1. 19.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 -http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#amb-java.lang.Iterable-

 

amb는 ambiguous(모호한)의 줄임말 입니다. 여러 개의 Observable 중에 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행한 Observable입니다. 이 후 나머지 데이터는 전부 무시합니다.

 

 

마블다이어 그램을 보면

 

이렇게 원이 먼저 들어왔기 때문에 원의 데이터만 발행하고 있습니다.

 

그럼 Test code를 확인 해보겠습니다.

Java

import io.reactivex.Observable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class amb_test {

    public static void main(String[] args) {

        String[] data1 = {"1", "3", "5", "7", "9"};
        String[] data2 = {"2", "4", "6", "8"};

        List<Observable<String>> sources = Arrays.asList(
                Observable.fromArray(data1).delay(100L, TimeUnit.MILLISECONDS).doOnComplete(() -> System.out.println("Observable#1 : data1 Complete")),
                Observable.fromArray(data2).doOnComplete(() -> System.out.println("Observable#2 : data2 Complete"))
        );
        CommonUtils.exampleStart();
        Observable.amb(sources).doOnComplete(() -> System.out.println("Result: onComplte")).subscribe(it -> {
            long time = System.currentTimeMillis() - CommonUtils.startTime;
            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);
        });

        CommonUtils.sleep(2000);

    }
}
public class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    public static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}

kotlin

import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable
import java.util.*
import java.util.concurrent.TimeUnit

fun main(){

    val data1 = arrayOf("1", "3", "5", "7", "9")
    val data2 = arrayOf("2", "4", "6", "8")

    val sources = Arrays.asList(
        data1.toObservable().delay(
            100L,
            TimeUnit.MILLISECONDS
        ).doOnComplete { println("Observable#1 : data1 Complete") },
        data2.toObservable().doOnComplete { println("Observable#2 : data2 Complete") }
    )
    CommonUtilsk.exampleStart()
    Observable.amb(sources).doOnComplete { println("Result: onComplte") }.subscribe { it ->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(2000)

}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
    fun getThreadName(): String {
        var threadName = Thread.currentThread().name
        if (threadName.length > 30) {
            threadName = threadName.substring(0, 30) + "..."
        }
        return threadName
    }
}

 결과

main | 7 | value = 2
main | 7 | value = 4
main | 7 | value = 6
main | 7 | value = 8
Observable#2 : data2 Complete
Result: onComplte

 

code를 보면 List에 Observable 2개가 들어가 있습니다. #1의 경우에는 홀수 String 데이터가 #2의 경우에는 짝수 String 데이터가 있습니다. 그리고 #1은 약 100ms 이후 발행합니다.

그래서 amb을 실행한 결과를 보면 data2에 대한 데이터만 나타나고 amb이 complete 되는 것을 확인 할 수 있습니다.

반응형