본문 바로가기
2023년 이전/ReativeX

RxJava,RxKotlin - switchMap

by JeongUPark 2020. 1. 14.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#switchMap-io.reactivex.functions.Function-

 

이전에 봤던 concatMap이 동작의 순서를 보장해준다면, switchMap은 함수를 보장하기 위해 새로운 데이터가 들어올 경우 이전 동작을 멈추고 현재 들어온 작업을 수행합니다. 마블다이어 그램을 본다면 이 이야기가 바로 이해될 것 같습니다.

 

마블다이어그램을 보면 초록원이 들어와서 마름모를 만들고 네모를 만들려 하는데 파랑원이 들어와서 초록원에 대한 동작을 멈추고 파랑원에 대한 동작을 수행합니다.

 

즉, switchMap은 여러개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 유용합니다.

 

test Code를 보겠습니다.

 

Java

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class SwitchMap_Test {

    public static void main(String[] args){
        CommonUtils.exampleStart();
        String[] balls= {"1","3","5"};

        Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx->balls[idx])
                .take(balls.length)
                .switchMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map(notUsed->ball+"<>").take(3));
        source.subscribe( it->{
            long time = System.currentTimeMillis() - CommonUtils.startTime;
            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it); }
        );
        CommonUtils.sleep(2000);
    }
}
public class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    public static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}

 

Kotlin

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(){
    CommonUtilsk.exampleStart()
    var balls = arrayOf("4","5","6")
    var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
        .map { it->
            it.toInt() }
        .map{idx->
            balls[idx]}
        .take(balls.size.toLong())
        .doOnNext {
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it) }
        .switchMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }

    source.subscribe{
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(2000)
}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
    fun getThreadName(): String {
        var threadName = Thread.currentThread().name
        if (threadName.length > 30) {
            threadName = threadName.substring(0, 30) + "..."
        }
        return threadName
    }
}

결과

RxComputationThreadPool-4 | 642 | value = 6 <>
RxComputationThreadPool-4 | 842 | value = 6 <>

code를 실행해보면 위와 같은 결과가 나옵니다. String array의 마지막 값인 6에 대한 결과 잘 나오는 것을 볼 수 있습니다. 그런데 결과에 보면 ThreadPool-4에 대한 결과만 나온것을 확인 할 수있습니다.

 

왜 이런지 확인을 위해 doOnNext를 추가하여 보았습니다.

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(){
    CommonUtilsk.exampleStart()
    var balls = arrayOf("4","5","6")
    var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
        .map { it->
            it.toInt() }
        .map{idx->
            balls[idx]}
        .take(balls.size.toLong())
        .doOnNext {
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it) }
        .switchMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }

    source.subscribe{
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }

    CommonUtilsk.sleep(2000)
}
RxComputationThreadPool-1 | 244 | value = 4
RxComputationThreadPool-1 | 335 | value = 5
RxComputationThreadPool-1 | 435 | value = 6
RxComputationThreadPool-4 | 637 | value = 6 <>
RxComputationThreadPool-4 | 837 | value = 6 <>

결과를 보면 4,5,6 값이 나올떄는 ThreadPool-1번에서 동작하고 ThreadPool-4에 대한 결과가 나오고 있습니다.

즉 doOnNext로 interval에 대한 동작을 확인하고 (ThreadPool-1) switchMap에 대한 동작을 확인하는데 switchMap은 마지막 발행 데이터를 확인함으로 ThreadPool-2,ThreadPool-3을 지나치고 ThreadPool-4에 대한 값이 나오고 있는 것을 알 수 있습니다.(데이터 발행이 시간을 가지고 나타나지 않고 빠르게 연속해서 발행되기 때문에 위와 같은 결과가 나온것 같습니다.)

반응형

'2023년 이전 > ReativeX' 카테고리의 다른 글

RxJava, RxKotlin - Scan 함수  (1) 2020.01.15
RxJava,RxKotlin - groupBy 함수  (0) 2020.01.14
RxJava, RxKotlin - concatMap  (0) 2020.01.14
RxJava, RxKotlin - repeat()  (0) 2019.12.16
RxJava, Rxkotlin - defer()  (0) 2019.12.16