티스토리 뷰

728x90
반응형

 

목차

     

     

    글의 순서는 연산자의 중요도나 로직의 흐름과는 아무 관련이 없다.

     

    그저 내가 코드를 읽다가 마주친 순서를 마구잡이로 적어놓은 메모장에 근거할 뿐.

     

    글 제목에 '+'가 붙어있는 이유는 연산자가 아니더라도 처음 봤거나 알아야 할 것 같은 경우라 판단이 들면

     

    이 글에 같이 추가해 버렸기 때문이다. 굳이 연산자와 그 외 클래스를 나누어야 할 필요를 아직 느끼지 못해서.

     

    말이 나온 김에 말하자면, 자바 17 + WebFlux로 개발을 하다 보면 '이런 게 다 있어..?' 할 정도로

     

    잘 구현된 연산자와 클래스가 많이 있다.

     

    수학 공식 외워서 적재적소에 쓰듯이 연산자의 종류과 맥락을 잘 파악하는 것이 리액티브한 개발자의 소양인가 싶을 정도.

     

    어쨌건, 시작!

     

    AtomicInteger(AtomicLong)

     

    AtomicInteger는 자바에서 원자적(Atomic) 연산을 위해 기본적으로 지원하는 클래스이다.

     

    여기서 원자적이라는 의미는 스레드 세이프, 즉 여러 스레드가 동시에 접근해도 안전하게 수행되는 연산을 의미한다.

     

    내부적으로는 Compare-And-Swap(CAS)라는 알고리즘이 사용되고 있다고 하는데,

     

    해당 알고리즘에 대해서는 기회가 되면 정리하기로 하자.

     

    계속해서 AtomicInteger에서 사용되는 메서드에 대해 몇 개만 정리하자.

    AtomicInteger atomicInteger = new AtomicInteger(0); // 초기화
    
    System.out.println(atomicInteger.incrementAndGet()); // += 1
    
    System.out.println(atomicInteger.decrementAndGet()); // -= 1
    
    System.out.println(atomicInteger.addAndGet(5)); // + 5
    
    int expected = 5;
    int newValue = 10;
    
    atomicInteger.compareAndSet(expected, newValue); // atomicIngeter가 5라면 10으로 교체
    
    System.out.println(atomicInteger);
    
    atomicInteger.getAndUpdate(operand -> operand * 2); // 현재 값을 가져와 * 2
    
    System.out.println(atomicInteger);
    1
    0
    5
    10
    20

     

    Flux.fromIterable()

     

    Flux.fromIterable()은 Iterable 객체를 순회하며 데이터를 비동기적으로 배출해 Flux를 생성하는 연산자이다.

     

    기본적으로 객체의 요소를 처음부터 끝까지 반복하며 각 요소를 Flux의 데이터로 발행하며,

     

    모든 요소를 발행한 이후에는 완료신호를 발생시킨다.

     

    코드로 예를 들면 아래와 같다.

    List<String> list = Arrays.asList("건희", "딩거", "도단");
    
    Flux.fromIterable(list)
    	.subscribe(System.out::println);
    건희
    딩거
    도단

     

    repeatWhen()

     

    repeatWhen()은 리액티브 데이터스트림(Mono, Flux)에 대한 반복 실행을 제어하는 연산자이다.

     

    이름대로 특정 조건에 따라 반복 실행을 조절하고 재구독을 결정한다.

     

    이외에도 특정 조건에 따라 데이터 스트림을 반복 처리하는 재시도(retry) 전략을 구현할 수 있다.

     

    간단하게 코드로 예를 들면 아래와 같다.

    Flux.range(1, 3)
    	.repeatWhen(repeat -> repeat.take(2)) // 2번 반복
    	.subscribe(System.out::println);
    1
    2
    3
    1
    2
    3
    1
    2
    3

    Flux.range()를 통해 발행한 데이터를 두 번 반복해서 구독하는 것을 확인할 수 있다.

     

    DatabaseClient

     

    DatabaseClient 클래스는 Spring WebFlux 프레임워크에서 제공하는 R2DBC 기반의 데이터 접근 API이다.

     

    SQL 쿼리를 사용하기 위한 여러 가지 연산자를 제공하며, 너무나 당연하게도 모든 요청은 비동기로 처리된다.

     

    제공하는 연산자 중 몇 가지를 살펴보고 넘어가자.

     

    sql(QUERY)

     

    예를 들어 아래와 같은 SQL 쿼리를 문자열로 입력받아 처리한 뒤에 GenericExecuteSpec 객체를 반환한다.

    SELECT u.USER_SEQ as seq,
    	   u.USER_PASSWORD as password,
    	   u.USER_ID as id,
    	   u.USER_NAME as name,
    	   r.USER_ROLE as role
      FROM AU_USER_MASTER u
      LEFT OUTER JOIN AU_USER_ROLE r
    	ON u.USER_ID = r.USER_ID
         WHERE u.USER_ID = :id

    여기서 GenericExecuteSpec란 쿼리를 실행하고 파라미터를 바인딩하는 데 쓰이는 객체이다.

     

    bind()

     

    bind() 연산자는 쿼리의 파라미터에 값을 바인딩하는 데 사용된다.

     

    예를 들면 위에 적어둔 쿼리의 <id> 값에 특정 문자열을 바인딩하려면 아래와 같이 진행하면 된다.

    DatabaseClient.sql(query)
                  .bind("id", "stella")
                  // ...

    위 플로우는 id 값에 "stella"라는 문자열을 대입해 주는 효과를 낸다.

     

    만약 바인딩해야 하는 값이 두 개 이상이라면, 위와 같은 방식이 아닌 아래와 같이 인덱스를 사용할 수도 있다.

    DatabaseClient.sql(query)
                  .bind(0, "stella")
                  // ...

     

    fetch()

     

    fetch()는 주어진 조건에 맞춰 쿼리를 실행하고 결과를 가져오는 데 사용된다.

     

    실행 뒤 FetchSpec 인터페이스를 반환받는데, 해당 인터페이스는 결과를 가져오는 여러 연산자를 아래와 같이 제공한다.

     

    one()

     

    하나의 데이터를 가져오는 Mono를 반환한다. 결과가 없거나 두 개 이상인 경우 예외를 발생시킨다.

     

    first()

     

    첫 번째 데이터를 가져오는 Mono를 반환한다. 결과가 없어도 예외를 발생시키지 않는다.

     

    all()

     

    모든 데이터를 가져오는 Flux를 반환한다.

     

    위의 예시에서 이어가자면 아래와 같은 흐름이 된다.

    DatabaseClient.sql(query)
                  .bind(0, "stella")
                  .all()
                  // ...

     

    isRetryExhausted()

     

    isRetryExhausted()는 반복 실행 로직이 있는 경우, 현재 요청이 재시도를 할 수 있는지,

     

    그러니까 재시도 횟수나 대기시간 초과와 같은 조건에 걸리지 않는지 확인해서 Boolean 값을 반환한다.

     

    당연하게도 'true'일 경우 더 이상 재시도를 하지 않게 되며, 적절한 조치를 수행할 것이 요구된다.

     

    delayElements()

     

    delayElements()는 스트림의 요소를 일정한 시간 간격으로 방출되도록 제어하는 데 사용된다.

     

    이를 이용해 비동기처리를 포함한 모든 작업에 대해 지연시간을 설정할 수 있게 된다.

     

    코드로 예를 들면 아래와 같다.

    Flux.range(1, 5)
    	.delayElements(Duration.ofSeconds(1))
    	.subscribe(System.out::println);
    
    Thread.sleep(6000);
    1
    2
    3
    4
    5

    subscribe()를 호출한 뒤에 프로그램이 즉시 종료되는 것을 방지하기 위해 메인 스레드를 6초 지연시켰다.

     

    limitRate()

     

    limitRate()는 백프레셔를 제어하는 데 사용되는 연산자이다. 백프레셔에 대한 설명은 아래 글에 적여있다.

     

    [Spring]리액티브 스트림즈(Reactive Streams)

     

    [Spring]리액티브 스트림즈(Reactive Streams)

    지난 글에서 리액티브 시스템이란 쉽게 말해 반응 속도가 빠른 프로그램을 설계하는 원칙이며 리액티브 프로그래밍은 그 원칙을 구현하는 논 블로킹, 비동기 방식의 선언형 개발 패러다임이라

    gnidinger.tistory.com

     

    연산자에 대해 조금 더 구체적으로 말하자면 구독자가 한 번에 처리할 수 있는 요소의 개수를 제한하는 역할을 한다.

     

    코드로 바로 예를 들면 아래와 같다.

    Flux.range(1, 10)
    	.limitRate(3)
    	.delayElements(Duration.ofMillis(500))
    	.subscribe(System.out::println);
    
    Thread.sleep(6000);

    위 코드를 실행하면 처음에 3개의 숫자를 처리한 뒤 순서에 따라 처리되는 것을 확인할 수 있다.

    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2025/01   »
    1 2 3 4
    5 6 7 8 9 10 11
    12 13 14 15 16 17 18
    19 20 21 22 23 24 25
    26 27 28 29 30 31
    글 보관함