티스토리 뷰

728x90
반응형

프로젝트 리액터(Project Reactor), 혹은 리액터리액티브 스트림즈의 구현체다.

 

로고와 이름부터 원자로, 아이언맨을 떠오르게 만드는 리액터는 2014년에 태어난 현대적인 기술로

 

2022년 현재 3.4.24 버전까지 릴리즈 되어있다.

 

스프링 MVC와 비교했을 때 가장 큰 차이점은 논 블로킹 통신을 지원한다는 점이며,

 

명령형이 아닌 선언형 프로그래밍으로 이루어졌다는 것 역시 큰 차이점이다.

 

지난 글에서 리액터를 사용한 리액티브 프로그래밍의 기초와 용어에 대해 보았으니,

 

2022.10.13 - [Development/Spring] - [Spring]리액티브 프로그래밍 시작하기, 용어 정리

 

[Spring]리액티브 프로그래밍 시작하기, 용어 정리

지난 글에서 리액티브 프로그래밍과 리액티브 스트림즈, 그 구현체인 프로젝트 리액터에 대해 알아보았다. 2022.10.12 - [Development/Spring] - [Spring]리액티브 프로그래밍(Reactive Programming) 2022.10.12 -..

gnidinger.tistory.com

이번 글에선 추가적인 구성요소와 개념에 대해서 얕게 알아본다.

 

subscribe()

 

지난 글에서 subscribe()연산을 시작하는 트리거이자 일종의 최종연산이라고 했었다.

 

Subscriber인 System.out::println과 함께 사용해서 연산 결과를 출력하는 예를 보았는데,

 

아래 코드로 subscriber()의 추가적인 매개변수와 기능에 대해 살펴보자.

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class HelloReactorExample {
    public static void main(String[] args) throws InterruptedException {
        Flux    
            .just("Hello", "Reactor")               
            .map(message -> message.toUpperCase())  
            .publishOn(Schedulers.parallel())       
            .subscribe(System.out::println,         
                    error -> System.out.println(error.getMessage()), // 추가
                    () -> System.out.println("# onComplete")); // 추가        

        Thread.sleep(100L);
    }
}

미리 언급한 대로 subscribe()에 두 개의 매개변수가 추가로 붙은 것을 확인할 수 있다.

 

두 번째 매개변수는 시퀀스에서 발생하는 에러에 대한 핸들링을,

 

세 번째 매개변수는 시퀀스 종료 뒤의 후처리를 나타내고 있다.

 

현재 코드상엔 에러가 없으므로 코드를 실행하면 아래와 같은 결과를 얻는다.

HELLO
REACTOR
# onComplete

나머지 컴포넌트에 대한 추가 설명은 아래와 같다.

 

  • Flux - 해당 시퀀스가 한 번에 여러 개의 데이터를 다룬다는 의미.
  • map() - 조건에 맞게 매핑
  • publishOn() - 스레드 관리자(Scheduler) 지정. Downstream의 스레드가 변경됨.

즉, 위의 코드는 총 두 개의 스레드에서 실행되는 것이다.

 

Marble Diagram

 

마블 다이어그램은 구슬(Marble)을 이용해 시간에 따른 데이터의 흐름을 나타내는 다이어그램을 말한다.

 

사용하는 오퍼레이터에 따라 굉장히 다양한 다이어그램이 그려질 수 있으며,

 

복잡한 코드에서 데이터의 흐름을 가능한 쉽게 파악할 수 있도록 하는 것이 목적이라 할 수 있다.

 

마블 다이어그램은 예를 들면 아래와 같이 생겼다.

 

시간의 흐름은 왼쪽에서 오른쪽으로, 오퍼레이터의 작용은 위에서 아래로 진행된다.

 

퍼블리셔에게서 Emit 된 세 개의 원은 map()을 통해 사각형으로 변환되고 Downstream으로 전달된다.

 

이어서 인풋 플럭스의 시퀀스는 정상 종료되고 아웃풋 플럭스의 시퀀스는 에러를 만나게 되는 다이어그램이다.

 

플럭스가 아닌 모노의 다이어그램은 데이터가 0개 또는 하나만 Emit 된다는 차이점이 있다.

 

Scheduler

 

스케줄러는 위에도 한 번 언급했지만 한 마디로 말하면 스레드 관리자이다.

 

더 쉽게 말하면 메인 스레드에서 처리되는 연산을 담당하는 새로운 스레드를 생성한다고 볼 수 있다.

 

논 블로킹, 비동기 방식의 리액터에서 스레드와 그 관리는 무엇보다 중요하며

 

멀티스레딩 프로세스를 단순하게 처리해주는 스케줄러는 그 역할이 크다고 할 수 있다.

 

또한 스케줄러는 전용 오퍼레이터라고 할 수 있는 subscribeOn(), publishOn()과 함께 사용되며,

 

그 예는 아래와 같다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Slf4j
public class SchedulersExample {
    public static void main(String[] args) throws InterruptedException{
        Flux.range(1, 10)
                .subscribeOn(Schedulers.boundedElastic())
//                .publishOn(Schedulers.parallel())
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .filter(n -> n % 2 == 0)
                .map(n -> n * 2)
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}
20:41:15.291 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:41:15.307 [main] INFO com.codestates.example.SchedulersExample - # doOnSubscribe
20:41:15.309 [boundedElastic-1] INFO com.codestates.example.SchedulersExample - # onNext: 4
20:41:15.309 [boundedElastic-1] INFO com.codestates.example.SchedulersExample - # onNext: 8
20:41:15.309 [boundedElastic-1] INFO com.codestates.example.SchedulersExample - # onNext: 12
20:41:15.309 [boundedElastic-1] INFO com.codestates.example.SchedulersExample - # onNext: 16
20:41:15.309 [boundedElastic-1] INFO com.codestates.example.SchedulersExample - # onNext: 20

subscribeOn() 호출과 함께 스레드가 메인에서 boundedElastic-1으로 변경된 것을 확인할 수 있다.

 

또한 위 코드에서 publishOn()을 사용한다면 스레드가 parallel-1로 변경되는 것도 예측할 수 있겠다.

 

그렇다면 subscribeOn()과 publishOn()의 차이는 무엇일까?

 

간단하게 표현하면 아래와 같다.

 

  • subscribeOn()

    • 호출 시점과 상관없이 앞,뒤의 체인을 묶어 별도의 스레드로 분리
    • 주로 원본 Publisher의 실행 스레드(구독 직후)를 지정하는 역할을 함
    • 한 시퀀스에서 여러번 호출되더라도 하나의 스레드로 처리 → 동시성이 낮음
  • publishOn()

    • 호출 시점 뒷쪽의 체인을 별도의 스레드로 분리
    • 주로 Operator 앞에서 호출해서 실행 스레드를 추가하는 역할을 함
    • 여러 번 호출될 경우 별개의 스레드 생성, 생성 순서는 체인을 거슬러 올라가며 부여

이상을 코드로 확인하면 다음과 같이 된다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Slf4j
public class SchedulersExample01 {
    public static void main(String[] args) throws InterruptedException{
        Flux
                .range(1, 10)
                .subscribeOn(Schedulers.boundedElastic())
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))

                .publishOn(Schedulers.parallel())
                .filter(n -> n % 2 == 0)
                .doOnNext(data -> log.info("# filter doOnNext"))

                .publishOn(Schedulers.parallel())
                .map(n -> n * 2)
                .doOnNext(data -> log.info("# map doOnNext"))

                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}
21:32:52.246 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:32:52.265 [main] INFO com.codestates.example.SchedulersExample03 - # doOnSubscribe
21:32:52.269 [parallel-2] INFO com.codestates.example.SchedulersExample03 - # filter doOnNext
21:32:52.269 [parallel-2] INFO com.codestates.example.SchedulersExample03 - # filter doOnNext
21:32:52.269 [parallel-2] INFO com.codestates.example.SchedulersExample03 - # filter doOnNext
21:32:52.269 [parallel-2] INFO com.codestates.example.SchedulersExample03 - # filter doOnNext
21:32:52.269 [parallel-2] INFO com.codestates.example.SchedulersExample03 - # filter doOnNext
21:32:52.269 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # map doOnNext
21:32:52.269 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # onNext: 4
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # map doOnNext
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # onNext: 8
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # map doOnNext
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # onNext: 12
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # map doOnNext
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # onNext: 16
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # map doOnNext
21:32:52.270 [parallel-1] INFO com.codestates.example.SchedulersExample03 - # onNext: 20

위에서 설명한 대로 별개의 스레드로 분리되며 체인을 거슬러 올라가며 번호가 매겨진 것을 확인할 수 있다.

 

마지막으로 리액터와 마블 다이어그램, 각종 오퍼레이터에 대한 설명은 공식문서에 아주 자세히 설명되어있다.

 

https://projectreactor.io/docs/core/release/reference/

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

 

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함