[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]
본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다
1. 트램펄린 스케줄러
트래펌린 스케줄러는 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기행렬(Queue로 동작)을 생성하는 스케줄러입니다.
예제 code로 확인해 보겠습니다. (CommonUtils와 CommonUtilsk는 여기서 확인 하실 수 있습니다.)
Java
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class Scheduler_trampoline_test {
public static void main(String[] args){
String[] orgs = {"1","3","5"};
Observable source = Observable.fromArray(orgs);
source.subscribeOn(Schedulers.trampoline())
.map(data-> "<<"+data+">>")
.subscribe(it-> {
System.out.println(CommonUtils.getThreadName() + " | " + "value = " + it);
});
source.subscribeOn(Schedulers.trampoline())
.map(data-> "##"+data+"##")
.subscribe(it-> {
System.out.println(CommonUtils.getThreadName() + " | " + "value = " + it);
});
CommonUtils.sleep(500);
}
}
Kotlin
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
fun main(){
val orgs = arrayOf("1", "3", "5")
val source = orgs.toObservable()
source.subscribeOn(Schedulers.trampoline())
.map { data -> "<<$data>>" }
.subscribe { it ->
println(CommonUtilsk.getThreadName() + " | " + "value = " + it)
}
source.subscribeOn(Schedulers.trampoline())
.map { data -> "##$data##" }
.subscribe { it ->
println(CommonUtilsk.getThreadName() + " | " + "value = " + it)
}
CommonUtilsk.sleep(500)
}
결과
main | value = <<1>>
main | value = <<3>>
main | value = <<5>>
main | value = ##1##
main | value = ##3##
main | value = ##5##
결과를 보면 알수있듯이 새로운 Thread를 생성하지 않고 메인 Thread에서 모두 동작하고 있습니다. 그리고 queue에 작업을 넣고 하나씩 꺼내어 동작하기 때문에 첫 번째 구독과 두 번째 구독의 실행 순서가 바뀌는 경우는 발생하지 않습니다.
2.싱글 스레드 스케줄러
단일 스레드를 별도로 생성하여 구독작업을 처리하는 스레드 입니다. 그래서 생성된 스레드는 여러번 구독요청이 와도 공통으로 사용합니다. (CommonUtils와 CommonUtilsk는 여기서 확인 하실 수 있습니다.)
Java
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class Scheduler_single_test {
public static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
public static void main(String[] args){
Observable numbers = Observable.range(100,5);
Observable chars = Observable.range(0,5).map(it->numberToAlphabet(it));
numbers.subscribeOn(Schedulers.single())
.subscribe(it->{
System.out.println(CommonUtils.getThreadName() + " | " + "value = " + it);
});
chars.subscribeOn(Schedulers.single())
.subscribe(it->{
System.out.println(CommonUtils.getThreadName() + " | " + "value = " + it);
});
CommonUtils.sleep(500);
}
public static String numberToAlphabet (long x){
return Character.toString(ALPHABET.charAt((int)x));
}
}
Kotlin
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
val ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
fun main(){
val numbers = Observable.range(100, 5)
val chars = Observable.range(0, 5).map { it -> numberToAlphabet(it.toLong()) }
numbers.subscribeOn(Schedulers.single())
.subscribe { it -> println(CommonUtilsk.getThreadName() + " | " + "value = " + it) }
chars.subscribeOn(Schedulers.single())
.subscribe { it -> println(CommonUtilsk.getThreadName() + " | " + "value = " + it) }
CommonUtilsk.sleep(500)
}
fun numberToAlphabet(x: Long): String {
return ALPHABET[x.toInt()].toString()
}
결과
RxSingleScheduler-1 | value = 100
RxSingleScheduler-1 | value = 101
RxSingleScheduler-1 | value = 102
RxSingleScheduler-1 | value = 103
RxSingleScheduler-1 | value = 104
RxSingleScheduler-1 | value = A
RxSingleScheduler-1 | value = B
RxSingleScheduler-1 | value = C
RxSingleScheduler-1 | value = D
RxSingleScheduler-1 | value = E
결과를 보면 2개의 구독을 하였지만 모두 RxSingleScheduler-1에서 동작하는 것을 확인 할 수 있습니다. 즉 Schedulers.single()을 통하여 하나의 스레드에서만 동작하게 됩것입니다.
3.Executor 변환 스케줄러
자바에서 java.util.current 패키지에서 제공하는 Executor를 변환하여 스케줄러를 생성할 수 있습니다. (Executor는 간단하게 스레드풀을 만들어서 스레드 동작을 편리하게 만드는 방법중 하나입니다.)
방법은 다음과 같습니다.(CommonUtils와 CommonUtilsk는 여기서 확인 하실 수 있습니다.)
Java
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Scheduler_Executor_test {
public static void main(String[] args){
final int THREAD_NUM = 10;
String[] data = {"1","3","5"};
Observable source = Observable.fromArray(data);
Executor executor = Executors.newFixedThreadPool(THREAD_NUM);
source.subscribeOn(Schedulers.from(executor))
.subscribe(it->{
System.out.println(CommonUtils.getThreadName() + " | " + "value = " + it);
});
source.subscribeOn(Schedulers.from(executor))
.subscribe(it->{
System.out.println(CommonUtils.getThreadName() + " | " + "value = " + it);
});
CommonUtils.sleep(500);
}
}
Kotlin
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.Executors
fun main(){
val THREAD_NUM = 10
val data = arrayOf("1", "3", "5")
val source = data.toObservable()
val executor = Executors.newFixedThreadPool(THREAD_NUM)
source.subscribeOn(Schedulers.from(executor))
.subscribe { it -> println(CommonUtilsk.getThreadName() + " | " + "value = " + it) }
source.subscribeOn(Schedulers.from(executor))
.subscribe { it -> println(CommonUtilsk.getThreadName() + " | " + "value = " + it) }
CommonUtilsk.sleep(500)
}
결과
pool-1-thread-1 | value = 1
pool-1-thread-1 | value = 3
pool-1-thread-1 | value = 5
pool-1-thread-2 | value = 1
pool-1-thread-2 | value = 3
pool-1-thread-2 | value = 5
결과를 보면 pool-1에 thread-1과 thread-2에서 동작하는 것을 확인 할 수 있습니다. 그리고 동작을 시켜보면 결과가 다 나와도 동작이 꺼지지 않는데 이는 Executor를 종료하지 않아서 발생하는 문제 입니다.
만일 종료를 원하신다면 executor.shutdownNow() 같은 executor 종료 명령어를 추가하시면 됩니다.
'2023년 이전 > ReativeX' 카테고리의 다른 글
RxJava, RxKotlin - RxAndroid 란? (0) | 2020.02.10 |
---|---|
RxJava, RxKotlin - 스케줄러를 활용하여 콜백 지옥 벗어나기 (0) | 2020.02.10 |
RxJava,RxKoltin - 스케줄러(2) (0) | 2020.01.27 |
RxJava, RxKotlin - 스케쥴러(1) (0) | 2020.01.21 |
RxJava, RxKotlin - timeinterval 함수 (0) | 2020.01.20 |