티스토리 뷰

728x90
반응형

 

목차

     

    지난 글에선 작은 프로젝트를 마이그레이션 하며 사용한 연산자에 대해 정리했다.

     

    [WebFlux]자주 사용하는 연산자 정리(1)

     

    [WebFlux]자주 사용하는 연산자 정리(1)

    목차 Method vs. Operator 별생각 없이 메서드나 연산자라는 단어를 마구 뒤섞어 쓰다가 의문이 들었다. 같은 뜻의 단어를 두 개나 사용할 필요가 있을까? 하고. 해서 연산자 정리도 할 겸 아주 얕게

    gnidinger.tistory.com

    회사에 들어와 그보다 훨씬 규모가 있고 잘 짜인 코드를 공부하는 중인데,

     

    처음 보는 데다 모르는 연산자가 마구 쏟아져 나와서 정리가 필요할 것 같아 글쓰기를 눌렀다.

     

    예제까지 세세하게 볼 수는 없겠지만 일단 정리해 놓는 것으로!

     

    buffer

     

    시작하기 전에, 웹플럭스에서의 버퍼에 대해 잠시 살피고 넘어가자.

     

    Buffer?

     

    웹플럭스에서 버퍼란 데이터에게 일시적으로 할당되는 메모리 영역을 가리킨다.

     

    조금 더 구체적으로 웹플럭스의 비동기 이벤트 스트림에서 발생하는 데이터를 임시로 저장하고

     

    일정량이 모이면 한 번에 처리하는 역할을 한다고 볼 수 있다.

     

    이는 데이터를 원하는 크기의, 상대적으로 작은 단위로 조각내어 처리하는 대신 일정량의 데이터를 모아

     

    한 번에 처리(방출)함으로써 입출력 횟수를 줄여 처리 속도를 향상시킬 수 있다.

     

    buffer()

     

    계속해서 buffer()란 원본 Flux의 항목을 특정 크기나 시간 단위로 버퍼링 해서 새로운 Flux로 생성하는 연산자이다.

     

    버퍼링 된 항목을 비동기로 처리하거나 그룹화해야 하는 경우 유용하게 쓰이며,

     

    버퍼링에 대해 바로 예를 들자면 아래와 같은 코드가 가능하다.

    Flux.range(1, 10)
        .buffer(3)
        .subscribe(System.out::println);
    [1, 2, 3]
    [4, 5, 6]
    [7, 8, 9]
    [10]

    위처럼 buffer() 안에 숫자 3을 넣어주면 최대 3개의 항목을 포함하는 버퍼를 생성하는 것을 볼 수 있다.

     

    혹은 시간 간격을 포함해 아래와 같은 코드를 작성할 수도 있는데,

    Flux.interval(Duration.ofMillis(500)) // 0.5초마다 요소 생성
        .buffer(Duration.ofSeconds(2)) // 2초마다 요소를 묶어 버퍼로 반환
        .take(2) // 총 2개의 버퍼만 받도록 설정
        .subscribe(System.out::println);
    
    try {
        Thread.sleep(5000); // 5초간 실행 대기
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    [0, 1, 2, 3]
    [4, 5, 6, 7]

    결과는 실행 환경과 기타 조건에 의해 달라질 수 있다.

     

    이어서 이를 포함해 아래의 예와 같이 다양한 매개변수를 넣어 사용할 수 있으며,

    Flux<T> buffer()
    Flux<T> buffer(int maxSize)
    Flux<T> buffer(Duration timespan)
    Flux<T> buffer(Duration timespan, Duration timeshift)
    Flux<T> buffer(Publisher<?> boundary)
    Flux<T> buffer(Publisher<?> boundary, int maxSize)

    각 오버로딩 메서드에 대한 설명은 아래와 같다.

     

    • buffer(): 스트림의 항목들을 List로 모은 뒤, 항목들이 모이면 완료된 List를 방출하고, 새로운 빈 List를 시작.
    • buffer(int maxSize): maxSize만큼의 항목들을 모아 List로 방출. 항목 수가 maxSize에 도달하면 새로운 List를 시작.
    • buffer(Duration timespan): 지정된 시간 동안 도착한 항목들을 모아 List로 방출. 시간이 경과하면 새로운 List를 시작.
    • buffer(Duration timespan, Duration timeshift)
      timespan 동안 도착한 항목들을 모아 List로 방출.
      그리고 timeshift 시간만큼 이동하여 새로운 List를 시작.
      이렇게 하면 겹치는 구간이 생김.
    • buffer(Publisher<?> boundary)
      다른 Publisher를 사용하여 버퍼링의 경계를 설정. boundary가 항목을 방출할 때마다 버퍼를 시작.
    • buffer(Publisher<?> boundary, int maxSize)
      boundary 항목과 maxSize에 따라 버퍼를 생성.
      maxSize 크기에 도달하거나 boundary가 항목을 방출할 때마다 새로운 List를 시작.

    또한 buffer() 연산자를 더욱 세밀하게 조정하기 위한 bufferUntil, bufferWhile, bufferTimeout, bufferWhen과 같은

     

    변형 연산자도 제공된다.

     

    bufferUntil(predicate: Predicate<? super T>)

     

    bufferUntil()은 말 그대로 주어진 조건 Predicate가 true를 반환할 때까지의 버퍼를 생성한다.

     

    구체적으로는 조건이 충족되기 전까지 버퍼에 요소를 추가하며, 조건이 충족되면 버퍼를 방출한 뒤

     

    새로운 버퍼를 시작한다. 예를 들면 아래와 같다.

    Flux.range(1, 10)
        .bufferUntil(num -> num % 4 == 0)
        .subscribe(System.out::println);
    [1, 2, 3, 4]
    [5, 6, 7, 8]
    [9, 10]

    1부터 10까지의 값을 가지는 Flux에서 4의 배수가 될 때마다 버퍼를 방출하고 새로 시작하는 것을 확인할 수 있다.

     

    bufferUntilChanged()

     

    bufferUntilChanged()는 연속으로 발생하는 동일한 요소를 하나의 버퍼로 묶는 연산자이다.

     

    주의할 것은 중복 제거나 무시가 아닌 하나의 버퍼로 묶는다는 사실이다.

     

    예를 들면 아래와 같다.

    Flux.just(1, 1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
        .bufferUntilChanged()
        .subscribe(System.out::println);
    [1, 1]
    [2, 2]
    [3, 3, 3]
    [4, 4, 4, 4]

     

    이어서 bufferUntilChanged()에 매개변수로 전달하는 키에 따라 데이터 스트림을 분류하는 역할을 한다.

     

    연속된 값의 키가 동일하면 버퍼에 추가하고 키가 변경되면 버퍼를 방출한 뒤 새로 시작한다.

     

    아래에 보듯이 Function.identity를 사용하면 기존의 bufferUntilChanged와 동일한 역할을 한다.

    Flux.just(1, 1, 2, 3, 3, 3, 2, 4, 4, 4, 4)
        .bufferUntilChanged(Function.identity())
        .subscribe(System.out::println);
    [1, 1]
    [2]
    [3, 3, 3]
    [2]
    [4, 4, 4, 4]

     

    bufferWhile(predicate: Predicate<? super T>)

     

    이번에는 주어진 Predicate가 true를 반환하는 동안에만 버퍼를 생성한다.

     

    바로 예를 들자면 아래와 같다.

    Flux.range(1, 10)
        .bufferWhile(num -> num > 6)
        .subscribe(System.out::println);
    [7, 8, 9, 10]

     

    bufferTimeout(int maxSize, Duration maxTime)

     

    주어진 시간 간격(Duration)마다 버퍼에 요소를 추가한다.

     

    이때 maxSize 만큼의 항목을 수집하거나 일정 시간이 경과할 때까지 데이터를 모은 뒤 조건이 충족되면 버퍼를 방출한다.

     

    코드로 예를 들면 아래와 같다.

    Flux.interval(Duration.ofSeconds(1))
        .bufferTimeout(2, Duration.ofSeconds(3))
        .subscribe(System.out::println);
    
    try {
        Thread.sleep(5000); // 5초간 실행 대기
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    [0, 1]
    [2, 3]

     

    bufferWhen(Publisher<U> bucketOpening, Function<> closeSelector)

     

    첫 번째 매개변수는 버퍼링을 시작하는 조건을 정의하며, 이 조건이 충족되면 버퍼링이 시작된다.

     

    두 번째 매개변수는 각 버퍼의 종료조건을 정의하는 함수인데, 이 함수는 버퍼를 닫을 때 사용할 Publisher를 반환해야 한다.

     

    정리하자면 bufferWhen() 연산자는

     

    • bucketOpening에서 신호를 받아 새로운 버퍼를 생성하고
    • 해당 버퍼에서 closeSelector를 호출해 버퍼를 닫는 신호를 기다린 뒤 버퍼를 방출하고 새로운 버퍼를 시작한다.

    잘 이해가 안 가지만 일단 코드로 예를 들면 아래와 같다.

    Flux.interval(Duration.ofSeconds(1)) // 1초마다 항목을 방출하는 Flux
        .bufferWhen(
            Mono.delay(Duration.ofSeconds(5)), // 5초 후에 버퍼 생성 조건을 정의하는 Mono
            signal -> Flux.interval(Duration.ofSeconds(2)) // 2초 뒤 버퍼를 닫는 신호를 방출하는 Flux
        )
        .subscribe(System.out::println);
    
    // 잠시 대기
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    [5, 6]

     

    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2024/05   »
    1 2 3 4
    5 6 7 8 9 10 11
    12 13 14 15 16 17 18
    19 20 21 22 23 24 25
    26 27 28 29 30 31
    글 보관함