본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - merge 함수

by JeongUPark 2020. 1. 17.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#merge-java.lang.Iterable-


merge 함수는 Observable의 순서와 데이터와 상관없이 먼저 입력되는 데이터를 그대로 발행하는 함수 입니다.

 

한마디로 모든 데이터를 싹다 발행하는데, 만일 어느 Observable이 종료 또는 error가 날 경우 멈춥니다.

 

간단한 test Code를 보겠습니다.

Java

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Merge_test {

    public static void main(String[] args){
        String[] data1 = {"1","3"};
        String[] data2 = {"2","4","6"};
        CommonUtils.exampleStart();
        Observable source1 = Observable.interval(0L,100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx->data1[idx])
                .take(data1.length);

        Observable source2 = Observable.interval(50L,TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx->data2[idx])
                .take(data2.length);

        Observable source = Observable.merge(source1,source2);
        source.subscribe(it->{
            long time = System.currentTimeMillis() - CommonUtils.startTime;
            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);
        });

        CommonUtils.sleep(1000);
    }
}

Kotlin

import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable
import java.util.concurrent.TimeUnit

fun main(){
    val data1 = arrayOf("1", "3")
    val data2 = arrayOf("2", "4", "6")
    CommonUtilsk.exampleStart()
    val source1 = Observable.interval(0L,100L, TimeUnit.MILLISECONDS)
        .map{ it->it.toInt() }
        .map{idx-> data1[idx]}
        .take(data1.size.toLong())

    val source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
        .map{ it->it.toInt() }
        .map{idx-> data2[idx]}
        .take(data2.size.toLong())

    Observable.merge(source1,source2).subscribe{
        it->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(1000)
}

결과

RxComputationThreadPool-1 | 142 | value = 1
RxComputationThreadPool-2 | 192 | value = 2
RxComputationThreadPool-1 | 241 | value = 3
RxComputationThreadPool-2 | 250 | value = 4
RxComputationThreadPool-2 | 290 | value = 6

 

결과를 보면

data1에 대한 발행은 ThreadPool-1에서 data2에 대한 발행은 ThreadPool-2에서 관여를 하고 있습니다.

그리고 시간을 보면 ThreadPool-1 들 사이에는 100 ms가 ThreadPool-1과 ThreadPool-2 사이는 50ms가 그리고 마지막으로 ThreadPool-2 는 50 ms가 나타나고 있습니다.( 즉, 위에서 coding 한 대로 나타나고 있습니다.) 

반응형

'2023년 이전 > ReativeX' 카테고리의 다른 글

RxJava, RxKotlin - amb 함수  (0) 2020.01.19
RxJava, RxKotlin - Concat 함수  (0) 2020.01.17
RxJava, RxKotlin - CombineLatest 함수  (0) 2020.01.17
RxJava, RxKotlin - zip 함수  (0) 2020.01.16
RxJava, RxKotlin - Scan 함수  (1) 2020.01.15