티스토리 뷰

728x90
반응형

 

목차

     

    마지막으로 연산자를 정리한 게 벌써 거의 세 달 전이다.

     

    회사에 들어와 웹플럭스로 개발을 하며 정리하고 싶은 건 산더미 같았지만

     

    다른 공부할 것이 너무 많고 바쁜 데다가 프로젝트까지 겹쳐 전혀 정리를 하지 못했다.

     

    그래서 오랜만에 돌아온 연산자 정리 시간.

     

    웹플럭스 연산자 사전이라도 있으면 좋겠다.

     

    defer()

     

    defer 연산자는 Mono와 Flux 둘 모두에서 사용가능한 연산자이다.

     

    이 연산자는 리액티브 프로그래밍에서 지연 평가(Lazy Evaluation)를 수행하기 위한 연산자로,

     

    구독이 이루어지는 시점마다 새로운 Mono 혹은 Flux 인스턴스를 생성해 데이터 스트림을 제공한다.

     

    여기서 지연 평가란 다른 의미가 아니라 실제로 데이터가 필요한 시점까지 데이터 생성을 하지 않는다는 의미이다.

     

    예를 들면 아래와 같은 코드를 작성할 수 있다.

    import reactor.core.publisher.Mono;
    
    public class Example {
    	public static void main(String[] args) {
    
    		Mono<String> deferredMono = Mono.defer(() -> {
    			System.out.println("Mono Instance 생성");
    			return Mono.just("Hello");
    		});
    
    		System.out.println("Before subscription");
    
    		deferredMono.subscribe(value -> System.out.println("Subscriber 1 received: " + value));
    		deferredMono.subscribe(value -> System.out.println("Subscriber 2 received: " + value));
    
    		System.out.println("After subscription");
    	}
    }

    이 코드에서 "Mono Instance 생성"부분은 구독이 이루어질 때마다 새로 출력된다.

    Before subscription
    Mono Instance 생성
    Subscriber 1 received: Hello
    Mono Instance 생성
    Subscriber 2 received: Hello
    After subscription

    조금 생각해보면 이는 단순히 데이터가 필요한 시점까지 기다리는 것뿐만 아니라, 다음과 같은 상황에서 유효하게 쓰인다.

     

    1. 상태 제공
      Mono, 혹은 Flux가 특정 상태에 의존적이라면 각 구독자에게 독립적인 상태를 제공할 수 있다.
    2. 동적 데이터 제공
      데이터 소스가 시간, 환경에 따라 변하는 경우 구독 시점의 데이터를 가져올 수 있다.
    3. 에러 핸들링
      구독이 이루어질 때마다 새로 데이터 스트림을 발생시키기 때문에 예외를 더 쉽게 잡을 수 있다.
    4. 리소스 관리
      특히 디비 연결이나 네트워크 연결 같은 리소스를 효율적으로 관리할 수 있다.
      이는 실제 필요한 경우에만 초기화하거나 할당할 수 있기 때문이다.

     

    Flux.range()

     

    이 연산자는 for문을 대체하는 역할을 한다.

     

    정해진 정수 범위 안의 값과 개수를 발행하는 Flux를 생성하기 때문인데,

     

    바로 예제를 보자.

    import reactor.core.publisher.Flux;
    
    public class Example {
    	public static void main(String[] args) {
    
    		Flux.range(5, 3)
    			.subscribe(
    				item -> System.out.println("Next: " + item),
    				err -> System.out.println("Error: " + err),
    				() -> System.out.println("Done")
    			);
    	}
    }
    Next: 5
    Next: 6
    Next: 7
    Done

    5부터 시작해서 세 개의 정수가 차례로 발행된 것을 확인할 수 있다.

     

    Collection.singletonList()

     

    이 연산자는 요소 하나로 구성된 불변 리스트를 생성한다.

     

    엄밀하게 말하면 웹플럭스와는 관련이 없지만, 요즘 종종 사용해서 적어둔다.

     

    이는 가끔 데이터를 스트림으로 받아 하나씩 처리하고 싶을 때 사용하는데,

     

    불변인 데다 단일요소를 가지기 때문에 Thread-Safe 해서 사용하기 편하다.

    import java.util.Collections;
    import java.util.List;
    
    public class Example {
    	public static void main(String[] args) {
    
    		List<String> singleElementList = Collections.singletonList("Hello, World!");
    
    		System.out.println("List: " + singleElementList);
    
    		// 불변이므로 아래와 같은 작업은 UnsupportedOperationException을 발생
    		singleElementList.add("Another element");
    		singleElementList.remove(0);
    	}
    }
    List: [Hello, World!]

     

    concatMap()

     

    concatMap()은 flatMap()과 비슷하지만 데이터의 순서가 중요할 때 사용된다.

     

    데이터를 단일 스트림으로 연결해 순서를 유지해주기 때문이다.

     

    비동기 처리에서 데이터의 순서가 필요한 경우엔 진짜 자주 사용하는 것 같다.

     

    추가로 해당 연산자를 사용하면 다음 작업이 이전 작업의 종료를 기다리기 때문에 작업의 중첩을 제거할 수 있다.

    import reactor.core.publisher.Flux;
    
    public class Example {
    	public static void main(String[] args) {
    
    		Flux<Integer> flux = Flux.range(1, 5);
    
    		Flux<String> transformedFlux = flux.concatMap(item -> {
    			return Flux.just(Integer.toString(item), Integer.toString(item * 2));
    		});
    
    		transformedFlux.subscribe(System.out::println);
    	}
    }
    1
    2
    2
    4
    3
    6
    4
    8
    5
    10

    물론 이러한 특성 덕분에 처리속도는 flatMap()에 비해 느리다.

     

    next()

     

    이 연산자는 Mono, Flux에서 공통으로 사용할 수 있는데, 해당 스트림에서 처음 발생하는 항목을 반환하는 Mono를 생성한다.

     

    주로 테스트를 하거나, 비어있는 값을 반환하고 싶을 때 사용된다.

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    public class Example {
    	public static void main(String[] args) {
    
    		Flux<Integer> numberFlux = Flux.just(1, 2, 3, 4, 5);
    
    		Mono<Integer> firstNumberMono = numberFlux.next();
    
    		firstNumberMono.subscribe(
    			item -> System.out.println("Next item: " + item),
    			err -> System.err.println("Error: " + err),
    			() -> System.out.println("Done")
    		);
    	}
    }
    Next item: 1
    Done

     

    then()

     

    이 연산자는 Mono, Flux가 완료되면 실행되는 연산이다. 그간 방출된 모든 항목을 무시하고

     

    스트림이 완료되면 이어서 작업하는 데 쓰인다.

     

    즉, 스트림이 완료되길 기다려 완료 신호만을 방출하는 Mono<Void>를 반환하고 종료된다.

    import reactor.core.publisher.Mono;
    
    public class Example {
    	public static void main(String[] args) {
    
    		Mono<String> mono1 = Mono.just("Hello");
    
    		Mono<Void> result = mono1
    			.doOnNext(item -> System.out.println("Received: " + item))
    			.then();
    
    		result.subscribe(
    			null,
    			err -> System.err.println("Error: " + err),
    			() -> System.out.println("Done")
    		);
    	}
    }
    Received: Hello
    Done

    추가로 then(Mono<T>), then(Flux<T>)를 추가해주면 원 스트림 완료시

     

    무시하고 다음 연산을 진행해 결과를 반환한다.

    Received: World
    Done

    "Hello"가 무시된 것을 확인할 수 있다.

     

    take()

     

    이 연산자는 이름대로 정해진 개수의 항목을 가져올 때 사용된다.

     

    데이터 스트림을 받아 특정 수의 항복을 방출하고 스트림을 끝내기 때문에 이후의 데이터는 발행 자체가 되지 않는다.

    import reactor.core.publisher.Flux;
    
    public class Example {
    	public static void main(String[] args) {
    
    		Flux<Integer> numbers = Flux.range(0, 10)
    			.take(3);  // 앞의 3개의 숫자만 가져옴
    
    		numbers.subscribe(
    			item -> System.out.println("Received: " + item),
    			err -> System.err.println("Error: " + err),
    			() -> System.out.println("Done")
    		);
    	}
    }
    Received: 0
    Received: 1
    Received: 2
    Done

     

    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2025/01   »
    1 2 3 4
    5 6 7 8 9 10 11
    12 13 14 15 16 17 18
    19 20 21 22 23 24 25
    26 27 28 29 30 31
    글 보관함