본문 바로가기
2023년 이전/ReativeX

RxJava,RxKoltin - 스케줄러(2)

by JeongUPark 2020. 1. 27.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

RxJava, RxKotlin에서는 계산 스케줄러, I/O 스케줄러, 트램펄린 스케줄러는 추천하며 뉴스레드 스케줄러는 특수한 상황에서 적용하길 권장합니다. 그럼 각각의 스케줄러들에 대해 알아 보겠습니다.

1. 뉴 스레드 스케줄러

뉴 스레드 스케줄러는 새로운 스레드를 생성합니다. Schedulers.newThread()를 인자로 넣어주면 새로운 스레드를 만들어 줍니다.

 

스케줄러는 subscribeOn() 함수와 observeOn()함수에 나눠서 적용할 수 있습니다. 이번에는 subscribeOn() 함수만 사용해서 설명하겠습니다. 

testCode를 보겠습니다. (CommonUtils와 CommonUtilsk는 여기서 확인 하실 수 있습니다.)

 

Java

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class Scheduler_NewThread_test {

    public static void main(String[] args) {
        String[] objs = {"1", "2", "3"};

        CommonUtils.exampleStart();
        Observable.fromArray(objs)
                .doOnNext(data -> {
                    long time = System.currentTimeMillis() - CommonUtils.startTime;
                    System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + data);
                })
                .map(data -> "<<" + data + ">>")
                .subscribeOn(Schedulers.newThread())
                .subscribe(
                        it -> {
                            long time = System.currentTimeMillis() - CommonUtils.startTime;
                            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it);
                        }
                );
        CommonUtils.sleep(500);

        Observable.fromArray(objs)
                .doOnNext(data -> {
                    long time = System.currentTimeMillis() - CommonUtils.startTime;
                    System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + data);
                })
                .map(data -> "##" + data + "##")
                .subscribeOn(Schedulers.newThread())
                .subscribe(
                        it -> {
                            long time = System.currentTimeMillis() - CommonUtils.startTime;
                            System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it);
                        }
                );

        CommonUtils.sleep(500);
    }
}

kotlin

import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers

fun main(){
    val objs = arrayOf("1", "2", "3")

    CommonUtilsk.exampleStart()
    objs.toObservable()
        .doOnNext { data ->
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + data)
        }
        .map { data -> "<<$data>>" }
        .subscribeOn(Schedulers.newThread())
        .subscribe { it ->
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "Original data = " + it)
        }
    CommonUtilsk.sleep(500)


    objs.toObservable()
        .doOnNext { data ->
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "Original data = " + data)
        }
        .map { data -> "##$data##" }
        .subscribeOn(Schedulers.newThread())
        .subscribe { it ->
            val time = System.currentTimeMillis() - CommonUtilsk.startTime
            println(CommonUtilsk.getThreadName() + " | " + time + " | " + "Original data = " + it)
        }

    CommonUtilsk.sleep(500)
}

결과

RxNewThreadScheduler-1 | 65 | Original data = 1
RxNewThreadScheduler-1 | 66 | Original data = <<1>>
RxNewThreadScheduler-1 | 66 | Original data = 2
RxNewThreadScheduler-1 | 66 | Original data = <<2>>
RxNewThreadScheduler-1 | 66 | Original data = 3
RxNewThreadScheduler-1 | 66 | Original data = <<3>>
RxNewThreadScheduler-2 | 564 | Original data = 1
RxNewThreadScheduler-2 | 564 | Original data = ##1##
RxNewThreadScheduler-2 | 564 | Original data = 2
RxNewThreadScheduler-2 | 564 | Original data = ##2##
RxNewThreadScheduler-2 | 564 | Original data = 3
RxNewThreadScheduler-2 | 564 | Original data = ##3##

code와 결과를 보면 알수 있듯이  subscribeOn에 Schedulers.newThread()를 사용하여 두개의 Thread인 RxNewThreadScheduler-1과 RxNewThreadScheduler-2에 각각의 결과를 볼 수 있습니다. 즉  Schedulers.newThread()를 사용하여 2개의 Thread를 사용한 것을 확인 할 수 있습니다.

 

2.계산 스케줄러

 계산 스케줄러는 cpu에 대응하는 계산용 스케줄러 입니다.  테스트 코드를 보겠습니다. (CommonUtils와 CommonUtilsk는 여기서 확인 하실 수 있습니다.)

Java

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class Scheduler_computation_test {
    public static void main(String[] args) {
        String[] objs = {"1", "2", "3"};

        CommonUtils.exampleStart();
        Observable source = Observable.fromArray(objs)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b)-> a);
        source.map(item-> "<<"+item+">>")
                .subscribeOn(Schedulers.computation())
                .subscribe(it -> {
                    long time = System.currentTimeMillis() - CommonUtils.startTime;
                    System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it);
                });
        source.map(item-> "++"+item+"++")
                .subscribeOn(Schedulers.computation())
                .subscribe(it -> {
                    long time = System.currentTimeMillis() - CommonUtils.startTime;
                    System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it);
                });
        CommonUtils.sleep(500);
    }

}

Kotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit

fun main(){
    val objs = arrayOf("1", "2", "3")

    CommonUtils.exampleStart()
    val source = objs.toObservable()
        .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS), BiFunction<String,Long,String>{ a, c-> a})
    source.map({ item -> "<<$item>>" })
        .subscribeOn(Schedulers.computation())
        .subscribe({ it ->
            val time = System.currentTimeMillis() - CommonUtils.startTime
            println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it)
        })
    source.map({ item -> "++$item++" })
        .subscribeOn(Schedulers.computation())
        .subscribe({ it ->
            val time = System.currentTimeMillis() - CommonUtils.startTime
            println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it)
        })
    CommonUtils.sleep(500)
}

결과

RxComputationThreadPool-4 | 194 | Original data = ++1++
RxComputationThreadPool-3 | 194 | Original data = <<1>>
RxComputationThreadPool-4 | 291 | Original data = ++2++
RxComputationThreadPool-3 | 291 | Original data = <<2>>
RxComputationThreadPool-3 | 391 | Original data = <<3>>
RxComputationThreadPool-4 | 391 | Original data = ++3++
RxComputationThreadPool-3 | 189 | Original data = <<1>>
RxComputationThreadPool-4 | 189 | Original data = ++1++
RxComputationThreadPool-4 | 289 | Original data = ++2++
RxComputationThreadPool-3 | 290 | Original data = <<2>>
RxComputationThreadPool-4 | 390 | Original data = ++3++
RxComputationThreadPool-3 | 390 | Original data = <<3>>

결과가 여러가지로 나오는 이유는 첫 번째 구독과 두 번째 구독이 거의 동시에 이루어지기 떄문에 RxJava, Rxkotlin 내부에서 동일한 스레드에 작업을 할당하기 떄문입니다.

 

3. IO 스케줄러

IO 스케줄러는 계산 스케줄러와는 다르게 네트워크사으이 요청을 처리하거나 각종 입출력 작업을 실행하기 위한 스케줄러 입니다. 계산 스케줄러와 다른 점은 기본적으로 생성되는 스레드 개수가 다르다는 것입니다.

계산 스케줄러는 cpu의 갯수만큼 스레드를 생성하지만 IO스케줄러는 필요할 떄마다 스레드를 계속 생성합니다.

 

test code를 보겠습니다. (CommonUtils와 CommonUtilsk는 여기서 확인 하실 수 있습니다.)

 

Java

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

import java.io.File;

public class Scheduler_IO_test {

    public static void main(String[] args){

        String root = "C:\\";
        File[] files = new File(root).listFiles();

        Observable source = Observable.fromArray(files)
                .filter(f -> f.isDirectory())
                .map(f-> f.getAbsolutePath())
                .subscribeOn(Schedulers.io());
        source.subscribe(
                it -> {
                    long time = System.currentTimeMillis() - CommonUtils.startTime;
                    System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = " + it);}
        );
        CommonUtils.sleep(500);
    }
}

Kotlin

import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import java.io.File

fun main(){
    val root = "C:\\"
    val files = File(root).listFiles()

    val source = files.toObservable()
        .filter { f -> f.isDirectory }
        .map { f -> f.absolutePath }
        .subscribeOn(Schedulers.io())
    source.subscribe { it ->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "Original data = " + it)
    }
    CommonUtilsk.sleep(500)
}

결과

....
RxCachedThreadScheduler-1 | 1580110627337 | Original data = C:\Program Files
RxCachedThreadScheduler-1 | 1580110627337 | Original data = C:\Program Files (x86)
....
RxCachedThreadScheduler-1 | 1580110627337 | Original data = C:\Windows

결과는 c드라이버에 있는 폴더들이 나타압니다. (개인정보라 모두 있을 만한 데이터만 보여드립니다.)

반응형