본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - Concat 함수

by JeongUPark 2020. 1. 17.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#concat-java.lang.Iterable-


concat 함수는 두개 이상의 Observable을 이어주는 역할을 하는 함수 입니다. 첫 번쨰 Observable이 onComplete가 발생해야 두번째 observable을 구독합니다.

 

마블다이어그램을 보면

 

간단한 testCode를 보면

Java

import io.reactivex.Observable;
import io.reactivex.functions.Action;

import java.util.concurrent.TimeUnit;

public class Concat_test {
    public static void main(String[] args) {
        Action onCompleteAction = () -> {
            long time = System.currentTimeMillis() - CommonUtils.startTime;
            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "onComplete");
        };
        CommonUtils.exampleStart();
        String[] data1 = {"1", "2", "3"};
        String[] data2 = {"4", "5", "6"};
        Observable source1 = Observable.fromArray(data1).doOnComplete(onCompleteAction);
        Observable source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> data2[idx])
                .take(data2.length)
                .doOnComplete(onCompleteAction);
        Observable source = Observable.concat(source1, source2).doOnComplete(onCompleteAction);
        source.subscribe(
                it -> {
                    long time = System.currentTimeMillis() - CommonUtils.startTime;
                    System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);
                }
        );
        CommonUtils.sleep(1000);
    }
}
public class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    public static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}

kotlin

import io.reactivex.Observable
import java.util.concurrent.TimeUnit
import io.reactivex.functions.Function

fun main(){
    val onCompleteAction = {
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "onComplete")
    }
    CommonUtilsk.exampleStart()
    val data1 = arrayOf("1", "2", "3")
    val data2 = arrayOf("4", "5", "6")
    val source1 = Observable.fromArray(*data1).doOnComplete(onCompleteAction)
    val source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
        .map<Int>(Function<Long, Int> { it.toInt() })
        .map { idx -> data2[idx] }
        .take(data2.size.toLong())
        .doOnComplete(onCompleteAction)
    val source = Observable.concat<Any>(source1, source2).doOnComplete(onCompleteAction)
    source.subscribe { it ->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }
    CommonUtilsk.sleep(1000)

}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
    fun getThreadName(): String {
        var threadName = Thread.currentThread().name
        if (threadName.length > 30) {
            threadName = threadName.substring(0, 30) + "..."
        }
        return threadName
    }
}

결과

main | 124 | value = 1
main | 126 | value = 2
main | 126 | value = 3
main | 126 | onComplete
RxComputationThreadPool-1 | 230 | value = 4
RxComputationThreadPool-1 | 330 | value = 5
RxComputationThreadPool-1 | 430 | value = 6
RxComputationThreadPool-1 | 430 | onComplete
RxComputationThreadPool-1 | 432 | onComplete

를 나타냅니다.

결과는 보면 fromArray를 사용한 Observable의 데이터 1 2 3을 발생하고 complete가 되고 ThreadPool-1을 통하여 4 5 6을 발생합니다. (interval 함수를 사용했기 때문입니다.)

그리고 마지막 onComplete는 interval과 concat의 onComplete를 나타냅니다.

 

반응형