본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - CombineLatest 함수

by JeongUPark 2020. 1. 17.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 -http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#combineLatest-org.reactivestreams.Publisher-org.reactivestreams.Publisher-io.reactivex.functions.BiFunction-

 

combineLatest 함수는 zip함수와 비슷하게 2개 이상의 Observable 함수를 기반으로 각각의 Observable의 값이 변경되었을 때 결과를 갱신해주는 함수 입니다. 마블다이어 그램을 보면서 설명드리겠습니다.

 

마블 다이어 그램을 보면 첫번째 Observable에서  핑그색이 들어 왔지만 아직 2번 Observable에서 데이터가 들어오지 않아 값이 없습니다. 하지만 마름모가 들어오면서 핑크 마름모가 나타나고 주황색이 들어오면서 주황색 마름모를 볼 수 있습니다.

 

이 처럼 combineLatest는 Observable의 데이터가 변경이 되면 그에 따라 Observable들의 값을 합쳐서 갱신하여 구독자에 발행하는 함수 입니다.

 

test code를 보겠습니다.

 

Java

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class combineLatest_test {

    public static void main(String[] args) {

        CommonUtils.exampleStart();
        String[] data1 = {"6", "7", "4", "2"};
        String[] data2 = {"DIAMOND", "STAR", "PENTAGON"};

        Observable source = Observable.combineLatest(
                Observable.fromArray(data1).zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (shape, notUsed)-> getColor(shape)),
                Observable.fromArray(data2).zipWith(Observable.interval(150L,200L, TimeUnit.MILLISECONDS), (shape, notUsed)-> getSuffix(shape)),
                (v1,v2)->v1+v2
        );
        source.subscribe(it->{long time = System.currentTimeMillis() - CommonUtils.startTime;
        System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);});

        CommonUtils.sleep(2000);
    }

    private static String getColor(String shape){
        if (shape.endsWith("<>")) //diamond
            return shape.replace("<>", "").trim();

        int hyphen = shape.indexOf("-");
        if (hyphen > 0) {
            return shape.substring(0, hyphen);
        }

        return shape; //for ball
    }
    private static String getSuffix(String shape) {
        if ("DIAMOND".equals(shape)) return "<>";
        if ("PENTAGON".equals(shape)) return "-P";
        if ("STAR".equals(shape)) return "-S";
        return ""; //for BALL
    }
}
public class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    public static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}

kotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.toObservable
import java.util.concurrent.TimeUnit

fun main(){

    CommonUtilsk.exampleStart()
    val data1 = arrayOf("6", "7", "4", "2")
    val data2 = arrayOf("DIAMOND", "STAR", "PENTAGON")

    val source = Observable.combineLatest(
        data1.toObservable().zipWith(Observable.interval(100L,TimeUnit.MILLISECONDS), BiFunction<String,Long,String> { t1, t2 -> getcolor(t1)}),
        data2.toObservable().zipWith(Observable.interval(150L,200L,TimeUnit.MILLISECONDS), BiFunction<String,Long,String>{t1,t2->getSuffix(t1)}),
        BiFunction<String,String,String>{ v1, v2 -> v1 + v2 }
    )

    source.subscribe { it->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }
    CommonUtilsk.sleep(2000)


}

fun getcolor(shape : String) : String{
    if (shape.endsWith("<>"))
    //diamond
        return shape.replace("<>", "").trim { it <= ' ' }

    val hyphen = shape.indexOf("-")
    return if (hyphen > 0) {
        shape.substring(0, hyphen)
    } else shape  //for ball
}
private fun getSuffix(shape: String): String {
    if ("DIAMOND" == shape) return "<>"
    if ("PENTAGON" == shape) return "-P"
    return if ("STAR" == shape) "-S" else ""
//for BALL
}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
    fun getThreadName(): String {
        var threadName = Thread.currentThread().name
        if (threadName.length > 30) {
            threadName = threadName.substring(0, 30) + "..."
        }
        return threadName
    }
}

 결과는 

RxComputationThreadPool-2 | 293 | value = 6<>
RxComputationThreadPool-1 | 343 | value = 7<>
RxComputationThreadPool-1 | 442 | value = 4<>
RxComputationThreadPool-2 | 494 | value = 4-S
RxComputationThreadPool-1 | 544 | value = 2-S
RxComputationThreadPool-2 | 692 | value = 2-P

이렇게 나옵니다.

 

그럼 실습 예제로 리액티브 연산자로 합계 구하기를 해보겠습니다.

a와 b를 입력 받아 a+b를 해보겠습니다.

Java

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.observables.ConnectableObservable;

import java.util.Scanner;

public class combineLatest_reativesum {

    public static void main(String[] args){
        new combineLatest_reativesum().run();
    }

    private void run(){
        ConnectableObservable<String> source = userInput();
        Observable<Integer> a = source.filter(str-> str.startsWith("a:"))
                .map(str-> str.replace("a:",""))
                .map(Integer::parseInt);
        Observable<Integer> b = source.filter(str-> str.startsWith("b:"))
                .map(str-> str.replace("b:",""))
                .map(Integer::parseInt);

        Observable.combineLatest(a.startWith(0),b.startWith(0),
                (x,y)-> x+y).subscribe(res -> System.out.println("Result : "+res));
        source.connect();
    }

    private ConnectableObservable<String> userInput(){
        return Observable.create((ObservableEmitter<String> emitter) -> {
            Scanner in = new Scanner(System.in);
            while (true){
                System.out.println("Input: ");
                String line = in.nextLine();
                emitter.onNext(line);
                if(line.indexOf("exit")>=8){
                    in.close();
                    break;
                }
            }
        }).publish();
    }
}

Kotlin

import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.observables.ConnectableObservable
import io.reactivex.functions.Function
import io.reactivex.functions.BiFunction
import java.util.*

fun main() {
    run()
}

private fun run() {
    val source = userInput()
    val a = source.filter { str -> str.startsWith("a:") }
        .map { str -> str.replace("a:", "") }
        .map<Int>(Function<String, Int> { Integer.parseInt(it) })
    val b = source.filter { str -> str.startsWith("b:") }
        .map { str -> str.replace("b:", "") }
        .map<Int>(Function<String, Int> { Integer.parseInt(it) })

    Observable.combineLatest<Int, Int, Int>(a.startWith(0), b.startWith(0),
        BiFunction(){ x, y -> x!! + y!! }).subscribe({ res -> println("Result : " + res!!) })
    source.connect()
}

private fun userInput(): ConnectableObservable<String> {
    return Observable.create { emitter: ObservableEmitter<String> ->
        val `in` = Scanner(System.`in`)
        while (true) {
            println("Input: ")
            val line = `in`.nextLine()
            emitter.onNext(line)
            if (line.indexOf("exit") >= 8) {
                `in`.close()
                break
            }
        }
    }.publish()
}

결과

Result : 0
Input: 
a:100
Result : 100
Input: 
b:200
Result : 300
Input: 
a:300
Result : 500
Input: 
b:400
Result : 700
Input: 

위의 code는 a: 숫자 b:숫자 를 입력받아 그 값들을 더하는 code 입니다. 위에서 보면 처음 result는 0입니다. 이는 a 와 b에 입력받은 값이 없기 때문에 0이기 때문입니다.

그리고 a:100을 통해 100을 입력하면 100+0이 되어 Result는 100

b:200을 하면 100+200이 되어 Result 는 300

다시 a:300을 하면 300+200이 되어 Result는 500이 됩니다.

그리고 입력할 때마다 이렇게 되는 이유는 combineLatest 함수를 사용했기 때문에 입력할때마다 데이터가 변경되어 그에 따른 결과를 발행하기 때문입니다.

 

지금까지 공부하면서 coding한 프로그램중 가장 reative coding느낌을 강하게 받을 수 있는 code가 아닌가 싶습니다.

 

반응형

'2023년 이전 > ReativeX' 카테고리의 다른 글

RxJava, RxKotlin - Concat 함수  (0) 2020.01.17
RxJava, RxKotlin - merge 함수  (0) 2020.01.17
RxJava, RxKotlin - zip 함수  (0) 2020.01.16
RxJava, RxKotlin - Scan 함수  (1) 2020.01.15
RxJava,RxKotlin - groupBy 함수  (0) 2020.01.14