[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]
본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다
마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#concatMap-io.reactivex.functions.Function-
concatMap 함수는 flatMap과 비슷하지만 flatMap 처럼 중간에 다른 subscribe 요청이 올 경우 그 subscribe를 실행하는 것이 아닌 요청 된 순서되로 실행하는 함수 입니다.
마블 다이어 그램을 보면
초록 원에 대한 처리중에 파랑원 처리 요청이 들어와도 초록원에 대한 요청을 모두 처리 후 파랑원에 대한 요청을 처리합니다. (flatMap의 경우에는 초록원 처리중 파랑원 요청이 들어오면 같이 처리를 합니다. - flatMap에 대한 자세한 내용은 여기서 확인 하시면 됩니다.)
테스트 code는 다음과 같습니다.
JAVA
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class ConcatMap_test {
public static void main(String[] args){
CommonUtils.exampleStart();
String[] balls= {"1","3","5"};
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx->balls[idx])
.take(balls.length)
.concatMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map(notUsed->ball+"<>").take(3));
source.subscribe( it->{
long time = System.currentTimeMillis() - CommonUtils.startTime;
System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it); }
);
CommonUtils.sleep(2000);
}
}
public class CommonUtils{
public static long startTime;
public static void exampleStart(){
startTime = System.currentTimeMillis();
}
public static void sleep(int mills){
try{
Thread.sleep(mills);
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static String getThreadName() {
String threadName = Thread.currentThread().getName();
if (threadName.length() > 30) {
threadName = threadName.substring(0, 30) + "...";
}
return threadName;
}
}
Kotlin
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(){
CommonUtilsk.exampleStart()
var balls = arrayOf("4","5","6")
var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
.map { it->
it.toInt() }
.map{idx->
balls[idx]}
.take(balls.size.toLong())
.concatMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }
source.subscribe{
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
}
CommonUtilsk.sleep(2000)
}
internal object CommonUtilsk {
var startTime: Long = 0
fun exampleStart() {
startTime = System.currentTimeMillis()
}
fun sleep(mills: Int) {
try {
Thread.sleep(mills.toLong())
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
fun getThreadName(): String {
var threadName = Thread.currentThread().name
if (threadName.length > 30) {
threadName = threadName.substring(0, 30) + "..."
}
return threadName
}
}
결과는
RxComputationThreadPool-2 | 406 | value = 4 <>
RxComputationThreadPool-2 | 606 | value = 4 <>
RxComputationThreadPool-3 | 809 | value = 5 <>
RxComputationThreadPool-3 | 1007 | value = 5 <>
RxComputationThreadPool-4 | 1209 | value = 6 <>
RxComputationThreadPool-4 | 1409 | value = 6 <>
과 같이 나타납니다.
이 결과를 볼떄 중간에 시간을 보면 406부터 시작하는데 그 원인은 다음과 같이 doOnNext를 추가해서 결과를 봐면
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(){
CommonUtilsk.exampleStart()
var balls = arrayOf("4","5","6")
var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
.map { it->
it.toInt() }
.map{idx->
balls[idx]}
.take(balls.size.toLong())
.doOnNext {
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it) }
.concatMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }
source.subscribe{
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
}
CommonUtilsk.sleep(2000)
}
결과
RxComputationThreadPool-1 | 198 | value = 4
RxComputationThreadPool-1 | 296 | value = 5
RxComputationThreadPool-1 | 397 | value = 6
RxComputationThreadPool-2 | 401 | value = 4 <>
RxComputationThreadPool-2 | 600 | value = 4 <>
RxComputationThreadPool-3 | 800 | value = 5 <>
RxComputationThreadPool-3 | 1002 | value = 5 <>
RxComputationThreadPool-4 | 1202 | value = 6 <>
RxComputationThreadPool-4 | 1404 | value = 6 <>
interval에 의해 먼저 동작을 하고 그 후 concatMap을 동작하기 때문에 시간이 쫌 걸린 후 동작합니다.
만일 위 code를 concatMap에서 flatMap으로 변경하면
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(){
CommonUtilsk.exampleStart()
var balls = arrayOf("4","5","6")
var source = Observable.interval(100L,TimeUnit.MILLISECONDS)
.map { it->
it.toInt() }
.map{idx->
balls[idx]}
.take(balls.size.toLong())
.doOnNext {
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it) }
.flatMap { ball->Observable.interval(200L,TimeUnit.MILLISECONDS).map { notUsed->"$ball <>" }.take(2) }
source.subscribe{
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
}
CommonUtilsk.sleep(2000)
}
결과는
RxComputationThreadPool-1 | 309 | value = 4
RxComputationThreadPool-1 | 407 | value = 5
RxComputationThreadPool-1 | 507 | value = 6
RxComputationThreadPool-2 | 511 | value = 4 <>
RxComputationThreadPool-3 | 607 | value = 5 <>
RxComputationThreadPool-4 | 708 | value = 6 <>
RxComputationThreadPool-2 | 711 | value = 4 <>
RxComputationThreadPool-3 | 808 | value = 5 <>
RxComputationThreadPool-4 | 909 | value = 6 <>
이렇게 나타납니다.
'2023년 이전 > ReativeX' 카테고리의 다른 글
RxJava,RxKotlin - groupBy 함수 (0) | 2020.01.14 |
---|---|
RxJava,RxKotlin - switchMap (0) | 2020.01.14 |
RxJava, RxKotlin - repeat() (0) | 2019.12.16 |
RxJava, Rxkotlin - defer() (0) | 2019.12.16 |
RxJava,RxKotlin - interavlRange() (0) | 2019.12.16 |