[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]
본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다
간단한 code를 보겠습니다. (지금까지 많이 사용한 code 입니다. CommonUtils와 CommonUtilsk의 내용은 여기에서 확인 할 수 있습니다.
Java
import io.reactivex.Observable;
public class scheduler_test {
public static void main(String[] args) {
Observable.just("Hello", "RxJava")
.subscribe(it -> {
System.out.println(CommonUtils.getThreadName() + " | value = " + it);
});
}
}
Kotlin
import io.reactivex.Observable
fun main() {
Observable.just("Hello", "RxKotlin")
.subscribe {
println(CommonUtilsk.getThreadName() + " | value = " + it)
}
}
결과
main | value = Hello
main | value = RxKotlin
보면 동작의 Thread가 main인 것을 볼 수 있습니다. 즉 main 스레드에서 동작한다는 의미입니다. 하지만 개발을 하다 보면 모든 동작을 main에서 동작할 수없습니다. 비동기 작업도 필요하고, 멀티 Thread도 필요 합니다. 이럴 때 사용하는 것이 스케쥴러 입니다.
자바 프로그램에서 자바 스레드를 만들면서 경재 조건이나 synchronized 등의 비동기 프로그래밍을 만드는것에 비해서 Rx 프로그램의 스케쥴러는 쓰면 비동기 프로그래밍을 훨씬 간결하게 작성할 수 있습니다.
그럼 간단한 스케쥴러 code를 보겠습니다.
Java
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class scheduler_test {
public static void main(String[] args) {
String[] objs = {"1-S","2-T","3-P"};
CommonUtils.exampleStart();
Observable source = Observable.fromArray(objs)
.doOnNext(data ->{
long time = System.currentTimeMillis() - CommonUtils.startTime;
System.out.println(CommonUtils.getThreadName() + " | " + time + " | " +"Original data = "+data );
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(scheduler_test::shape);
source.subscribe(it->
{ long time = System.currentTimeMillis() - CommonUtils.startTime;
System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);}
);
CommonUtils.sleep(1000);
}
public static String shape(String value){
return "<flip>-"+value;
}
}
Kotlin
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
fun main() {
CommonUtils.exampleStart()
val objs = arrayOf("1-S", "2-T", "3-P")
val source = objs.toObservable()
.doOnNext { data -> val time = System.currentTimeMillis() - CommonUtils.startTime
println(CommonUtils.getThreadName() + " | " + time + " | " + "Original data = $data") }
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map<String>{it->shape(it)}
source.subscribe { it ->
val time = System.currentTimeMillis() - CommonUtils.startTime
println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it)
}
CommonUtils.sleep(1000)
}
fun shape(value: String): String {
return "<flip>-$value"
}
결과
RxNewThreadScheduler-1 | 123 | Original data = 1-S
RxNewThreadScheduler-1 | 123 | Original data = 2-T
RxNewThreadScheduler-1 | 124 | Original data = 3-P
RxNewThreadScheduler-2 | 124 | value = <flip>-1-S
RxNewThreadScheduler-2 | 124 | value = <flip>-2-T
RxNewThreadScheduler-2 | 124 | value = <flip>-3-P
code를 보면
1. doOnNext함수는 onNext 이벤트가 발생하면 실행되고 여기서 원래 테이터 값을 확인합니다.
2. subscribeOn은 구독할 떄 실행되는 스레드를 지정합니다.
3. observeOn은 Observable에서 생성한 데이터 흐름이 여기저기 함수를 거치며 처리될 때 동작이 어느 쓰레드에서 일어나는지를 지정합니다.
쫌 이해가 안되시는 분들을 위해 약간 테스트를 보여드리면
.subscribeOn(Schedulers.newThread())을 지우면 결과가
main | 115 | Original data = 1-S
main | 116 | Original data = 2-T
main | 116 | Original data = 3-P
RxNewThreadScheduler-1 | 116 | value = <flip>-1-S
RxNewThreadScheduler-1 | 116 | value = <flip>-2-T
RxNewThreadScheduler-1 | 116 | value = <flip>-3-P
.observeOn(Schedulers.newThread())를 지우면 결과가
RxNewThreadScheduler-1 | 79 | Original data = 1-S
RxNewThreadScheduler-1 | 79 | value = <flip>-1-S
RxNewThreadScheduler-1 | 79 | Original data = 2-T
RxNewThreadScheduler-1 | 79 | value = <flip>-2-T
RxNewThreadScheduler-1 | 79 | Original data = 3-P
RxNewThreadScheduler-1 | 79 | value = <flip>-3-P
이렇게 나타납니다.
만일 .subscribeOn(Schedulers.newThread())과 .observeOn(Schedulers.newThread())를 지우면
main | 66 | Original data = 1-S
main | 66 | value = <flip>-1-S
main | 66 | Original data = 2-T
main | 66 | value = <flip>-2-T
main | 67 | Original data = 3-P
main | 67 | value = <flip>-3-P
다 메인에서 동작합니다.
즉 subscribeOn을 통하여 구독 할때 실행되는 스레드를 observeOn을 사용하면 함수 동작과 관련된 스레드를 정합니다. 만일 subscribeOn을 지정하고 observeOn을 지정하지 않으면 subscribeOn에서 지정한 스레드로 Observable에 대한 동작을 수행합니다. 별도로 지정하지 않으면 main에서 동작합니다.
'2023년 이전 > ReativeX' 카테고리의 다른 글
RxJava, RxKotlin - 스케줄러(3) (0) | 2020.01.28 |
---|---|
RxJava,RxKoltin - 스케줄러(2) (0) | 2020.01.27 |
RxJava, RxKotlin - timeinterval 함수 (0) | 2020.01.20 |
RxJava,RxKotlin - delay() 함수 (0) | 2020.01.20 |
RxJava - 수학관련 함수 (0) | 2020.01.20 |