본문 바로가기
2023년 이전/ReativeX

RxJava,RxKotlin - Interval

by JeongUPark 2019. 11. 20.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다



마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#interval-long-long-java.util.concurrent.TimeUnit-

 

interval() 함수는 일정 시간 간격으로 데이터 흐름을 생성합니다.

 

interval 함수는subscribe한 시간 부터 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체(기본형인 long 값이 아니라 래퍼 클래스인 Long 객체입니다) 객체를 발행합니다. 주로 사용하는 함수 원형은 두 가지가 있습니다.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit) {
    return interval(period, period, unit, Schedulers.computation());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
    return interval(initialDelay, period, unit, Schedulers.computation());
}

첫번 째는 period 간격으로 테이터를 발행합니다.

두 번재는 첫번째와 같지만 최초 지연 시간을 지정할 수 있습니다.

 

@SchedulerSupport는 interval 함수의 동작이 계산 스케줄러에서 실행된다는 의미입니다.

 

interval() 함수는 기본적으로 영원히 지속 실행되기 때문에 폴링 용도로 많이 사용 합니다.

 

interval() 함수의 예제 code를 보면

 

JAVA

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Interval_test {

    public static void main(String[] args){

        CommonUtils.exampleStart();
        Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONDS).map(data->(data+1)*100).take(5);

        source.subscribe(it-> {
            long time = System.currentTimeMillis() - CommonUtils.startTime;
            System.out.println(getThreadName() + " | " + time + " | " + "value = " + it);
        });
        CommonUtils.sleep(1000);

    }
    private static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}
class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

Kotlin

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(){
    CommonUtilsk.exampleStart()
    val source = Observable.interval(100L, TimeUnit.MILLISECONDS).map { data -> (data + 1) * 100 }.take(5)

    source.subscribe { it ->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(getThreadName() + " | " + time + " | " + "value = " + it)
    }
    CommonUtilsk.sleep(1000)
}

fun getThreadName() : String{
    var threadName = Thread.currentThread().name
    if (threadName.length > 30) {
        threadName = threadName.substring(0, 30) + "..."
    }
    return threadName
}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
}

위의 kotlin CommonUtilsk로 만든 이유는 JAVA에 있는 동일한 이름을 가진 class가 존재하여   java.lang.NoSuchFieldError: INSTANCE error가 나기 때문입니다.

 

위의 code들은 100ms 간견으로 0 부터 데이터를 발행 후 map을 통해 (data+1)*100 하여 데이터를 발행하고, take를 통하여 최초 5개만 취득합니다.

RxComputationThreadPool-1 | 194 | value = 100
RxComputationThreadPool-1 | 292 | value = 200
RxComputationThreadPool-1 | 392 | value = 300
RxComputationThreadPool-1 | 491 | value = 400
RxComputationThreadPool-1 | 592 | value = 500

(위에서 설명했다 시피 interval은 영원히 지속되기 때문입니다. 만일 take를 하지 않으면 위에서 sleep요청 시간인 10초 진행 후에 멈춥니다. 즉 value 가 100~1000까지 나옵니다.)

RxComputationThreadPool-1 | 260 | value = 100
RxComputationThreadPool-1 | 359 | value = 200
RxComputationThreadPool-1 | 459 | value = 300
RxComputationThreadPool-1 | 560 | value = 400
RxComputationThreadPool-1 | 660 | value = 500
RxComputationThreadPool-1 | 760 | value = 600
RxComputationThreadPool-1 | 860 | value = 700
RxComputationThreadPool-1 | 960 | value = 800
RxComputationThreadPool-1 | 1059 | value = 900
RxComputationThreadPool-1 | 1159 | value = 1000

그리고 간격들은 보면 약 100 millis 마다 결과가 나오는 것을 확인 할 수 있습니다.

 

만일 sleep Method 부분을 주석 처리하면 Main Thread가 할일 이 없기 때문에 바로 종료 됩니다.

 

그리고 위에서 설명한 interval() 함수 중 initialDelay를 가지는 함수로 위 내용들을 실행해보면

RxComputationThreadPool-1 | 103 | value = 100
RxComputationThreadPool-1 | 203 | value = 200
RxComputationThreadPool-1 | 303 | value = 300
RxComputationThreadPool-1 | 402 | value = 400
RxComputationThreadPool-1 | 503 | value = 500

위의 결과 들보다 처음 수행 시간이 훨씬 짧은 것을 볼 수 있는데 그 이유는 시작 시간인 initialDelay인 초기 지연값이 0이기 때문에 더 짧은 시간에 수행 됩니다.

 

 

2020년 1월 추가 사항

interval 함수의 원형을 보면

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.COMPUTATION)
    public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
        return interval(initialDelay, period, unit, Schedulers.computation());
    }

 이렇게 계산스케줄러에서 동작합니다. 하지만 꼭 계산 스케줄러에서 사용하지 않고 원하는 스케줄러로 변경도 가능합니다. 그 이유는 

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");

        return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
    }

위의 code를 보면 아까 COMPUTATION에 있던 내용이 CUSTOM으로 바뀌었습니다. 즉 개발자가 원하는 스케줄러를 지정할 수 있다는 의미입니다. 즉 위의 COMPUTATION(계산 스케줄러)에서 동작하는 interval 함수는 사실 아래 custom에서 동작하는 interval 함수에 마지막 인자로 Schdulers.computation을 넣어서 동작하는 code 입니다. 리엑티브의 대부분 함수는 마지막 인자로 스케줄러를 지정할 수 있습니다.( flatMap() 이나 scan()은 등은 대포적인 연산자지만 스케줄러를 인자로 받지 않는 경우도 있습니다.)

 

반응형

'2023년 이전 > ReativeX' 카테고리의 다른 글

RxJava,RxKotlin - range()  (0) 2019.12.04
RxJava, RxKotlin - timer  (0) 2019.11.20
RxJava,RxKotlin - reduce  (0) 2019.11.13
RxJava, RxKotlin - filter  (0) 2019.11.13
flatMap을 이용한 구구단 만들기  (0) 2019.11.13