본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - concatMap

by JeongUPark 2020. 1. 14.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#concatMap-io.reactivex.functions.Function-

 

concatMap 함수는 flatMap과 비슷하지만 flatMap 처럼 중간에 다른 subscribe 요청이 올 경우 그 subscribe를 실행하는 것이 아닌 요청 된 순서되로 실행하는 함수 입니다.

 

마블 다이어 그램을 보면

 

초록 원에 대한 처리중에 파랑원 처리 요청이 들어와도 초록원에 대한 요청을 모두 처리 후 파랑원에 대한 요청을 처리합니다. (flatMap의 경우에는 초록원 처리중 파랑원 요청이 들어오면 같이 처리를 합니다. - flatMap에 대한 자세한 내용은 여기서 확인 하시면 됩니다.)

 

테스트 code는 다음과 같습니다.

 

JAVA

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class ConcatMap_test {

    public static void main(String[] args){

        CommonUtils.exampleStart();
        String[] balls= {"1","3","5"};

        Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx->balls[idx])
                .take(balls.length)
                .concatMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map(notUsed->ball+"<>").take(3));
        source.subscribe( it->{
            long time = System.currentTimeMillis() - CommonUtils.startTime;
            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it); }
        );
        CommonUtils.sleep(2000);
    }
}
public class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    public static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}

Kotlin

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(){
    CommonUtilsk.exampleStart()
    var balls = arrayOf("4","5","6")
    var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
        .map { it->
            it.toInt() }
        .map{idx->
            balls[idx]}
        .take(balls.size.toLong())
        .concatMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }

    source.subscribe{
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(2000)
}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
    fun getThreadName(): String {
        var threadName = Thread.currentThread().name
        if (threadName.length > 30) {
            threadName = threadName.substring(0, 30) + "..."
        }
        return threadName
    }
}

 

결과는

RxComputationThreadPool-2 | 406 | value = 4 <>
RxComputationThreadPool-2 | 606 | value = 4 <>
RxComputationThreadPool-3 | 809 | value = 5 <>
RxComputationThreadPool-3 | 1007 | value = 5 <>
RxComputationThreadPool-4 | 1209 | value = 6 <>
RxComputationThreadPool-4 | 1409 | value = 6 <>

과 같이 나타납니다.

 

이 결과를 볼떄 중간에 시간을 보면 406부터 시작하는데 그 원인은 다음과 같이 doOnNext를 추가해서 결과를 봐면

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(){
    CommonUtilsk.exampleStart()
    var balls = arrayOf("4","5","6")
    var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
        .map { it->
            it.toInt() }
        .map{idx->
            balls[idx]}
        .take(balls.size.toLong())
        .doOnNext {
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it) }
        .concatMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }

    source.subscribe{
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(2000)
}

결과

RxComputationThreadPool-1 | 198 | value = 4
RxComputationThreadPool-1 | 296 | value = 5
RxComputationThreadPool-1 | 397 | value = 6
RxComputationThreadPool-2 | 401 | value = 4 <>
RxComputationThreadPool-2 | 600 | value = 4 <>
RxComputationThreadPool-3 | 800 | value = 5 <>
RxComputationThreadPool-3 | 1002 | value = 5 <>
RxComputationThreadPool-4 | 1202 | value = 6 <>
RxComputationThreadPool-4 | 1404 | value = 6 <>

interval에 의해 먼저 동작을 하고 그 후 concatMap을 동작하기 때문에 시간이 쫌 걸린 후 동작합니다. 

 

만일 위 code를 concatMap에서 flatMap으로 변경하면

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(){
    CommonUtilsk.exampleStart()
    var balls = arrayOf("4","5","6")
    var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
        .map { it->
            it.toInt() }
        .map{idx->
            balls[idx]}
        .take(balls.size.toLong())
        .doOnNext {
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it) }
        .flatMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }

    source.subscribe{
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(2000)
}

결과는

RxComputationThreadPool-1 | 309 | value = 4
RxComputationThreadPool-1 | 407 | value = 5
RxComputationThreadPool-1 | 507 | value = 6
RxComputationThreadPool-2 | 511 | value = 4 <>
RxComputationThreadPool-3 | 607 | value = 5 <>
RxComputationThreadPool-4 | 708 | value = 6 <>
RxComputationThreadPool-2 | 711 | value = 4 <>
RxComputationThreadPool-3 | 808 | value = 5 <>
RxComputationThreadPool-4 | 909 | value = 6 <>

이렇게 나타납니다.

반응형

'2023년 이전 > ReativeX' 카테고리의 다른 글

RxJava,RxKotlin - groupBy 함수  (0) 2020.01.14
RxJava,RxKotlin - switchMap  (0) 2020.01.14
RxJava, RxKotlin - repeat()  (0) 2019.12.16
RxJava, Rxkotlin - defer()  (0) 2019.12.16
RxJava,RxKotlin - interavlRange()  (0) 2019.12.16