반응형
[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]
본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다
마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#concat-java.lang.Iterable-
concat 함수는 두개 이상의 Observable을 이어주는 역할을 하는 함수 입니다. 첫 번쨰 Observable이 onComplete가 발생해야 두번째 observable을 구독합니다.
마블다이어그램을 보면
간단한 testCode를 보면
Java
import io.reactivex.Observable;
import io.reactivex.functions.Action;
import java.util.concurrent.TimeUnit;
public class Concat_test {
public static void main(String[] args) {
Action onCompleteAction = () -> {
long time = System.currentTimeMillis() - CommonUtils.startTime;
System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "onComplete");
};
CommonUtils.exampleStart();
String[] data1 = {"1", "2", "3"};
String[] data2 = {"4", "5", "6"};
Observable source1 = Observable.fromArray(data1).doOnComplete(onCompleteAction);
Observable source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length)
.doOnComplete(onCompleteAction);
Observable source = Observable.concat(source1, source2).doOnComplete(onCompleteAction);
source.subscribe(
it -> {
long time = System.currentTimeMillis() - CommonUtils.startTime;
System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);
}
);
CommonUtils.sleep(1000);
}
}
public class CommonUtils{
public static long startTime;
public static void exampleStart(){
startTime = System.currentTimeMillis();
}
public static void sleep(int mills){
try{
Thread.sleep(mills);
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static String getThreadName() {
String threadName = Thread.currentThread().getName();
if (threadName.length() > 30) {
threadName = threadName.substring(0, 30) + "...";
}
return threadName;
}
}
kotlin
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
import io.reactivex.functions.Function
fun main(){
val onCompleteAction = {
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "onComplete")
}
CommonUtilsk.exampleStart()
val data1 = arrayOf("1", "2", "3")
val data2 = arrayOf("4", "5", "6")
val source1 = Observable.fromArray(*data1).doOnComplete(onCompleteAction)
val source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map<Int>(Function<Long, Int> { it.toInt() })
.map { idx -> data2[idx] }
.take(data2.size.toLong())
.doOnComplete(onCompleteAction)
val source = Observable.concat<Any>(source1, source2).doOnComplete(onCompleteAction)
source.subscribe { it ->
val time = System.currentTimeMillis() - CommonUtilsk.startTime
println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
}
CommonUtilsk.sleep(1000)
}
internal object CommonUtilsk {
var startTime: Long = 0
fun exampleStart() {
startTime = System.currentTimeMillis()
}
fun sleep(mills: Int) {
try {
Thread.sleep(mills.toLong())
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
fun getThreadName(): String {
var threadName = Thread.currentThread().name
if (threadName.length > 30) {
threadName = threadName.substring(0, 30) + "..."
}
return threadName
}
}
결과
main | 124 | value = 1
main | 126 | value = 2
main | 126 | value = 3
main | 126 | onComplete
RxComputationThreadPool-1 | 230 | value = 4
RxComputationThreadPool-1 | 330 | value = 5
RxComputationThreadPool-1 | 430 | value = 6
RxComputationThreadPool-1 | 430 | onComplete
RxComputationThreadPool-1 | 432 | onComplete
를 나타냅니다.
결과는 보면 fromArray를 사용한 Observable의 데이터 1 2 3을 발생하고 complete가 되고 ThreadPool-1을 통하여 4 5 6을 발생합니다. (interval 함수를 사용했기 때문입니다.)
그리고 마지막 onComplete는 interval과 concat의 onComplete를 나타냅니다.
반응형
'2023년 이전 > ReativeX' 카테고리의 다른 글
RxJava, Rxkotlin 관련 CommonUtils와 CommonUtilsk (0) | 2020.01.19 |
---|---|
RxJava, RxKotlin - amb 함수 (0) | 2020.01.19 |
RxJava, RxKotlin - merge 함수 (0) | 2020.01.17 |
RxJava, RxKotlin - CombineLatest 함수 (0) | 2020.01.17 |
RxJava, RxKotlin - zip 함수 (0) | 2020.01.16 |