본문 바로가기
2023년 이전/ReativeX

RxJava, RxKotlin - zip 함수

by JeongUPark 2020. 1. 16.
반응형

[출처 - RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에]

본 글은 'RxJava 프로그래밍 : 리액티브 프로그래밍 기초부터 안드로이드 까지 한번에' 를 학습하면서 정리한 글입니다

 

마블다이어그램 참고 - http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#scan-io.reactivex.functions.BiFunction-



출처: https://jeongupark-study-house.tistory.com/109 [코더가 아닌 개발자!! Why를 가지자!]

 

zip 함수는 Observable 2개 혹은 그 이상을 결합하는 합수입니다. 

마블 다이어 그램은 다음과 같습니다.

test code를 보면

Java

import io.reactivex.Observable;

public class zip_test {

    public static void main(String[] args){

        String[] shapes={"BALL","PENTAGON","START"};
        String[] colorTriangles = {"2-T","4-T","6-T"};

        Observable<String> source = Observable.zip(Observable.fromArray(shapes).map(zip_test::getSuffix),
                Observable.fromArray(colorTriangles).map(zip_test::getNum),(suffix, num)-> num+suffix);

        source.subscribe(System.out::println);

    }
    static  String getSuffix(String shape){
        if(shape.equalsIgnoreCase("PENTAGON")){
            return "-P";
        }
        if(shape.equalsIgnoreCase("START")){
            return "-S";
        }
        return "-B";
    }
    static  String getNum(String num){
        if(num.indexOf("-")>0){
            return num.substring(0,num.indexOf("-"));
        }
        return num;
    }
}

 

kotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.toObservable

fun main(){
    val shapes = arrayOf("BALL", "PENTAGON", "START")
    val colorTriangles = arrayOf("2-T", "4-T", "6-T")

    val source = Observable.zip(
        shapes.toObservable().map{sh-> getSuffix(sh) },
        colorTriangles.toObservable().map{num-> getNum(num) }, BiFunction<String,String,String>{ s , n -> "$n$s"})

    source.subscribe{it->println(it)}
}
internal fun getSuffix(shape: String): String {
    if (shape.equals("PENTAGON", ignoreCase = true)) {
        return "-P"
    }
    return if (shape.equals("START", ignoreCase = true)) {
        "-S"
    } else "-B"
}

internal fun getNum(num: String): String {
    return if (num.indexOf("-") > 0) {
        num.substring(0, num.indexOf("-"))
    } else num
}

결과는

2-B
4-P
6-S

그리고 3개의 Observable을 사용할 경우는 다음과 같습니다.

Java

import io.reactivex.Observable;

public class zip_sum_test {

    public static void main(String[] args){

        Observable source = Observable.zip(Observable.just(100,200,300),Observable.just(10,20,30),Observable.just(1,2,3)
        ,(a,b,c)->a+b+c);

        source.subscribe(System.out::println);
    }
}

Kotlin

import io.reactivex.Observable
import io.reactivex.functions.Function3

fun main(){

    val source = Observable.zip<Int,Int,Int,Int>(
        Observable.just(100,200,300),
        Observable.just(10,20,30),
        Observable.just(1,2,3),
        Function3(){ a, b, c -> a!! + b!! + c!! }
    )
    source.subscribe{it->println(it)}
}

결과는

111
222
333

처럼 나타납니다.

 

작성시 kotlin에서 BiFunction은 2개의 항목을 받을 때 3개는 Function을 그리고 zip함수 사용시 인자에 대한 정의를 해줘어야 합니다. (위의 code에서는 zip<Int,Int,Int,Int> 이것을 말합니다.)

 

 

그럼 책에서 나온 몇몇 예제를 더 살펴보겠습니다.

 

interval과 함께 사용.

 

interval 함수와 zip을 함께 사용하면 interval 함수의 동작에 맞춰서 zip이 동작을 하는 것을 확인하였습니다.

 

code를 보면

 

Java

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class zip_interval_test {

    public static void main(String[] args){

        Observable source = Observable.zip(Observable.interval(200L, TimeUnit.MILLISECONDS),
                Observable.just("RED","GREEN","BLUE"),
                (i,value)-> i+"-"+value);
        CommonUtils.exampleStart();
        source.subscribe( it->{
            long time = System.currentTimeMillis() - CommonUtils.startTime;
        System.out.println(CommonUtils.getThreadName() + " | " + time + " | " + "value = " + it);});
        CommonUtils.sleep(1000);

    }
}
public class CommonUtils{
    public static long startTime;
    public static void exampleStart(){
        startTime = System.currentTimeMillis();
    }
    public static void sleep(int mills){
        try{
            Thread.sleep(mills);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    public static String getThreadName() {
        String threadName = Thread.currentThread().getName();
        if (threadName.length() > 30) {
            threadName = threadName.substring(0, 30) + "...";
        }
        return threadName;
    }
}

kotlin

import io.reactivex.Observable
import  io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun main(){

    val source = Observable.zip(
        Observable.interval(200L, TimeUnit.MILLISECONDS),
        Observable.just("RED","GREEN","BLUE"),
        BiFunction<Long,String,String>{a,b-> "$a-$b"})

    CommonUtilsk.exampleStart()
    source.subscribe { it->
        val time = System.currentTimeMillis() - CommonUtilsk.startTime
        println(CommonUtilsk.getThreadName() + " | " + time + " | " + "value = " + it)
    }
    CommonUtilsk.sleep(1000)

}
internal object CommonUtilsk {
    var startTime: Long = 0
    fun exampleStart() {
        startTime = System.currentTimeMillis()
    }

    fun sleep(mills: Int) {
        try {
            Thread.sleep(mills.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }

    }
    fun getThreadName(): String {
        var threadName = Thread.currentThread().name
        if (threadName.length > 30) {
            threadName = threadName.substring(0, 30) + "..."
        }
        return threadName
    }
}

 그리고 결과는 다음과 같습니다.

RxComputationThreadPool-1 | 211 | value = 0-RED
RxComputationThreadPool-1 | 406 | value = 1-GREEN
RxComputationThreadPool-1 | 608 | value = 2-BLUE

 

interval에서 200 ms 를 설정했으므로 그에 맞춰서 결과가 200 ms 마다 나타납니다.

 

다음은 전기세 계산하는 예제 code 입니다.

Java

import io.reactivex.Observable;
import java.text.DecimalFormat;

import static java.lang.Long.max;
import static java.lang.Math.min;

public class zip_ElectricBills_1 {
    private  static String[] data = {
      "100", // 910+93.3*100 = 10,240원
      "300" // 1600+93.3*200+187.9*100 = 39,050원
    };
    //기본요금
    // 200kWh 이하 사용 910원
    // 201~400kWh 사용 1600원
    // 400kWh 초과 사용 7300원
    private static Observable<Integer> basePrice = Observable.fromArray(data)
            .map(Integer::parseInt)
            .map(val->{
                if(val <= 200) return 910;
                if(val <= 400) return 1600;
                return 7300;
            });
    // 전력량 요금
    // 처음 200kWh 까지 93.3원
    // 다음 200kWh 까지 187.9원
    // 400kWh 초과 280.6원
    private static Observable<Integer> usagePrice = Observable.fromArray(data)
            .map(Integer::parseInt)
            .map(val->{
                double series1  = min(200,val) *93.3;
                double series2  = min(200,max(val-200,0)) *187.9;
                double series3  = min(0,max(val-400,0)) *280.65;
                return (int)(series1+series2+series3);
            });
    private static int index = 0;
    public static void main(String[] args){

        Observable source = Observable.zip(basePrice,usagePrice,(v1,v2)->v1+v2);
        source.map(val->new DecimalFormat("#,###").format(val))
                .subscribe(val->{
                    StringBuilder sb = new StringBuilder();
                    sb.append("Usage: "+data[index]+"kWh => ");
                    sb.append("Price: "+val+"Won");
                    System.out.println(sb);
                    index++;
                });
    }
}

Kotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.toObservable

import java.lang.Long.max
import java.lang.Math.min
import java.text.DecimalFormat

fun main(){

    val data = arrayOf(
        "100", // 910+93.3*100 = 10,240원
        "300" // 1600+93.3*200+187.9*100 = 39,050원
    )
    //기본요금
    // 200kWh 이하 사용 910원
    // 201~400kWh 사용 1600원
    // 400kWh 초과 사용 7300원
    val basePrice = data.toObservable()
        .map{it-> Integer.parseInt(it)}
        .map { value ->
            if (value <= 200) return@map 910
            if (value <= 400) return@map 1600
            return@map 7300
        }
    // 전력량 요금
    // 처음 200kWh 까지 93.3원
    // 다음 200kWh 까지 187.9원
    // 400kWh 초과 280.6원
    val usagePrice = Observable.fromArray(*data)
        .map{it-> Integer.parseInt(it)}
        .map{ `val` ->
            val series1 = min(200, `val`) * 93.3
            val series2 = min(200, max((`val` - 200).toLong(), 0)) * 187.9
            val series3 = min(0, max((`val` - 400).toLong(), 0)) * 280.65
            return@map (series1 + series2 + series3).toInt()
        }
    var index = 0
    val source = Observable.zip<Int, Int, Int>(basePrice, usagePrice, BiFunction<Int,Int,Int>{ v1, v2 -> v1!! + v2!! })
    source.map { `val` -> DecimalFormat("#,###").format(`val`) }
        .subscribe { `val` ->
            val sb = StringBuilder()
            sb.append("Usage: " + data[index] + "kWh => ")
            sb.append("Price: " + `val` + "Won")
            println(sb)
            index++
        }
}

결과는 

Usage: 100kWh => Price: 10,240Won
Usage: 300kWh => Price: 39,050Won

 가 나타납니다.

하지만 index를 사용하는 경우에는 함수형 프로그램의 원칙에서 벗어난다고 합니다.( 좋지 못한 coding 방식이긴한데 은근 많이 사용합니다.. ㅎㅎ)

 

그래서 그와 관련된 수정 code는 다음과 같습니다.

Java

import com.sun.tools.javac.util.Pair;
import io.reactivex.Observable;

import java.text.DecimalFormat;

import static java.lang.Long.max;
import static java.lang.Math.min;

public class zip_ElectricBills_2 {
    private  static String[] data = {
      "100", // 910+93.3*100 = 10,240원
      "300" // 1600+93.3*200+187.9*100 = 39,050원
    };
    //기본요금
    // 200kWh 이하 사용 910원
    // 201~400kWh 사용 1600원
    // 400kWh 초과 사용 7300원
    private static Observable<Integer> basePrice = Observable.fromArray(data)
            .map(Integer::parseInt)
            .map(val->{
                if(val <= 200) return 910;
                if(val <= 400) return 1600;
                return 7300;
            });
    // 전력량 요금
    // 처음 200kWh 까지 93.3원
    // 다음 200kWh 까지 187.9원
    // 400kWh 초과 280.6원
    private static Observable<Integer> usagePrice = Observable.fromArray(data)
            .map(Integer::parseInt)
            .map(val->{
                double series1  = min(200,val) *93.3;
                double series2  = min(200,max(val-200,0)) *187.9;
                double series3  = min(0,max(val-400,0)) *280.65;
                return (int)(series1+series2+series3);
            });
    public static void main(String[] args){

        Observable<Pair<String,Integer>> source = Observable.zip(basePrice,usagePrice,
                Observable.fromArray(data),
                (v1,v2,i)-> Pair.of(i,v1+v2));
        source.map(val->Pair.of(val.fst,new DecimalFormat("#,###").format(val.snd)))
                .subscribe(val->{
                    StringBuilder sb = new StringBuilder();
                    sb.append("Usage: "+val.fst+"kWh => ");
                    sb.append("Price: "+val.snd+"Won");
                    System.out.println(sb);
                });
    }
}

Kotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Function3
import io.reactivex.rxkotlin.toObservable

import java.lang.Long.max
import java.lang.Math.min
import java.text.DecimalFormat

fun main(){

    val data = arrayOf(
        "100", // 910+93.3*100 = 10,240원
        "300" // 1600+93.3*200+187.9*100 = 39,050원
    )
    //기본요금
    // 200kWh 이하 사용 910원
    // 201~400kWh 사용 1600원
    // 400kWh 초과 사용 7300원
    val basePrice = data.toObservable()
        .map{it-> Integer.parseInt(it)}
        .map { value ->
            if (value <= 200) return@map 910
            if (value <= 400) return@map 1600
            return@map 7300
        }
    // 전력량 요금
    // 처음 200kWh 까지 93.3원
    // 다음 200kWh 까지 187.9원
    // 400kWh 초과 280.6원
    val usagePrice = Observable.fromArray(*data)
        .map{it-> Integer.parseInt(it)}
        .map{ `val` ->
            val series1 = min(200, `val`) * 93.3
            val series2 = min(200, max((`val` - 200).toLong(), 0)) * 187.9
            val series3 = min(0, max((`val` - 400).toLong(), 0)) * 280.65
            return@map (series1 + series2 + series3).toInt()
        }
    val source = Observable.zip<Int, Int, String,Pair<String,Int>>(basePrice, usagePrice,data.toObservable(),
        Function3<Int,Int,String,Pair<String,Int>>{ v1, v2,i -> Pair(i,v1!! + v2!! )})
    source.map { value ->Pair(value.first, DecimalFormat("#,###").format(value.second)) }
        .subscribe { value ->
            val sb = StringBuilder()
            sb.append("Usage: " + value.first + "kWh => ")
            sb.append("Price: " + value.second + "Won")
            println(sb)
        }
}

index를 사용하는 데신에 data array 관련 Observable을 만들어서 zip함수를 통하여 결합한 code 입니다.

 

결과는 동일 합니다.

Usage: 100kWh => Price: 10,240Won
Usage: 300kWh => Price: 39,050Won

 

그리고 마지막으로 zipwith라는 함수가 있습니다. zipwith는 zip함수와 동일하지만, Observable을 다양한 함수와 조합하면서 틈틈이 호출할 수 있는 장점이 있습니다.

 

Java

import io.reactivex.Observable;

public class zipwith_test {

    public static void main(String[] args){

        Observable source = Observable.zip(Observable.just(100,200,300),
                Observable.just(10,20,30),
                (a,b)->a+b).zipWith(Observable.just(1,2,3), (ab,c)->ab+c);

        source.subscribe(System.out::println);

    }
}

kotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction

fun main(){

    val source = Observable.zip<Int,Int,Int>(Observable.just(100,200,300),
        Observable.just(10,20,30), BiFunction<Int,Int,Int>{a,b->a+b})
        .zipWith(Observable.just(1,2,3), BiFunction<Int,Int,Int>{ab,c-> ab+c})
    source.subscribe(System.out::println)

}

결과

111
222
333
반응형

'2023년 이전 > ReativeX' 카테고리의 다른 글

RxJava, RxKotlin - merge 함수  (0) 2020.01.17
RxJava, RxKotlin - CombineLatest 함수  (0) 2020.01.17
RxJava, RxKotlin - Scan 함수  (1) 2020.01.15
RxJava,RxKotlin - groupBy 함수  (0) 2020.01.14
RxJava,RxKotlin - switchMap  (0) 2020.01.14