티스토리 뷰
목차
글의 순서는 연산자의 중요도나 로직의 흐름과는 아무 관련이 없다.
그저 내가 코드를 읽다가 마주친 순서를 마구잡이로 적어놓은 메모장에 근거할 뿐.
글 제목에 '+'가 붙어있는 이유는 연산자가 아니더라도 처음 봤거나 알아야 할 것 같은 경우라 판단이 들면
이 글에 같이 추가해 버렸기 때문이다. 굳이 연산자와 그 외 클래스를 나누어야 할 필요를 아직 느끼지 못해서.
말이 나온 김에 말하자면, 자바 17 + WebFlux로 개발을 하다 보면 '이런 게 다 있어..?' 할 정도로
잘 구현된 연산자와 클래스가 많이 있다.
수학 공식 외워서 적재적소에 쓰듯이 연산자의 종류과 맥락을 잘 파악하는 것이 리액티브한 개발자의 소양인가 싶을 정도.
어쨌건, 시작!
AtomicInteger(AtomicLong)
AtomicInteger는 자바에서 원자적(Atomic) 연산을 위해 기본적으로 지원하는 클래스이다.
여기서 원자적이라는 의미는 스레드 세이프, 즉 여러 스레드가 동시에 접근해도 안전하게 수행되는 연산을 의미한다.
내부적으로는 Compare-And-Swap(CAS)라는 알고리즘이 사용되고 있다고 하는데,
해당 알고리즘에 대해서는 기회가 되면 정리하기로 하자.
계속해서 AtomicInteger에서 사용되는 메서드에 대해 몇 개만 정리하자.
AtomicInteger atomicInteger = new AtomicInteger(0); // 초기화
System.out.println(atomicInteger.incrementAndGet()); // += 1
System.out.println(atomicInteger.decrementAndGet()); // -= 1
System.out.println(atomicInteger.addAndGet(5)); // + 5
int expected = 5;
int newValue = 10;
atomicInteger.compareAndSet(expected, newValue); // atomicIngeter가 5라면 10으로 교체
System.out.println(atomicInteger);
atomicInteger.getAndUpdate(operand -> operand * 2); // 현재 값을 가져와 * 2
System.out.println(atomicInteger);
1
0
5
10
20
Flux.fromIterable()
Flux.fromIterable()은 Iterable 객체를 순회하며 데이터를 비동기적으로 배출해 Flux를 생성하는 연산자이다.
기본적으로 객체의 요소를 처음부터 끝까지 반복하며 각 요소를 Flux의 데이터로 발행하며,
모든 요소를 발행한 이후에는 완료신호를 발생시킨다.
코드로 예를 들면 아래와 같다.
List<String> list = Arrays.asList("건희", "딩거", "도단");
Flux.fromIterable(list)
.subscribe(System.out::println);
건희
딩거
도단
repeatWhen()
repeatWhen()은 리액티브 데이터스트림(Mono, Flux)에 대한 반복 실행을 제어하는 연산자이다.
이름대로 특정 조건에 따라 반복 실행을 조절하고 재구독을 결정한다.
이외에도 특정 조건에 따라 데이터 스트림을 반복 처리하는 재시도(retry) 전략을 구현할 수 있다.
간단하게 코드로 예를 들면 아래와 같다.
Flux.range(1, 3)
.repeatWhen(repeat -> repeat.take(2)) // 2번 반복
.subscribe(System.out::println);
1
2
3
1
2
3
1
2
3
Flux.range()를 통해 발행한 데이터를 두 번 반복해서 구독하는 것을 확인할 수 있다.
DatabaseClient
DatabaseClient 클래스는 Spring WebFlux 프레임워크에서 제공하는 R2DBC 기반의 데이터 접근 API이다.
SQL 쿼리를 사용하기 위한 여러 가지 연산자를 제공하며, 너무나 당연하게도 모든 요청은 비동기로 처리된다.
제공하는 연산자 중 몇 가지를 살펴보고 넘어가자.
sql(QUERY)
예를 들어 아래와 같은 SQL 쿼리를 문자열로 입력받아 처리한 뒤에 GenericExecuteSpec 객체를 반환한다.
SELECT u.USER_SEQ as seq,
u.USER_PASSWORD as password,
u.USER_ID as id,
u.USER_NAME as name,
r.USER_ROLE as role
FROM AU_USER_MASTER u
LEFT OUTER JOIN AU_USER_ROLE r
ON u.USER_ID = r.USER_ID
WHERE u.USER_ID = :id
여기서 GenericExecuteSpec란 쿼리를 실행하고 파라미터를 바인딩하는 데 쓰이는 객체이다.
bind()
bind() 연산자는 쿼리의 파라미터에 값을 바인딩하는 데 사용된다.
예를 들면 위에 적어둔 쿼리의 <id> 값에 특정 문자열을 바인딩하려면 아래와 같이 진행하면 된다.
DatabaseClient.sql(query)
.bind("id", "stella")
// ...
위 플로우는 id 값에 "stella"라는 문자열을 대입해 주는 효과를 낸다.
만약 바인딩해야 하는 값이 두 개 이상이라면, 위와 같은 방식이 아닌 아래와 같이 인덱스를 사용할 수도 있다.
DatabaseClient.sql(query)
.bind(0, "stella")
// ...
fetch()
fetch()는 주어진 조건에 맞춰 쿼리를 실행하고 결과를 가져오는 데 사용된다.
실행 뒤 FetchSpec 인터페이스를 반환받는데, 해당 인터페이스는 결과를 가져오는 여러 연산자를 아래와 같이 제공한다.
one()
하나의 데이터를 가져오는 Mono를 반환한다. 결과가 없거나 두 개 이상인 경우 예외를 발생시킨다.
first()
첫 번째 데이터를 가져오는 Mono를 반환한다. 결과가 없어도 예외를 발생시키지 않는다.
all()
모든 데이터를 가져오는 Flux를 반환한다.
위의 예시에서 이어가자면 아래와 같은 흐름이 된다.
DatabaseClient.sql(query)
.bind(0, "stella")
.all()
// ...
isRetryExhausted()
isRetryExhausted()는 반복 실행 로직이 있는 경우, 현재 요청이 재시도를 할 수 있는지,
그러니까 재시도 횟수나 대기시간 초과와 같은 조건에 걸리지 않는지 확인해서 Boolean 값을 반환한다.
당연하게도 'true'일 경우 더 이상 재시도를 하지 않게 되며, 적절한 조치를 수행할 것이 요구된다.
delayElements()
delayElements()는 스트림의 요소를 일정한 시간 간격으로 방출되도록 제어하는 데 사용된다.
이를 이용해 비동기처리를 포함한 모든 작업에 대해 지연시간을 설정할 수 있게 된다.
코드로 예를 들면 아래와 같다.
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.subscribe(System.out::println);
Thread.sleep(6000);
1
2
3
4
5
subscribe()를 호출한 뒤에 프로그램이 즉시 종료되는 것을 방지하기 위해 메인 스레드를 6초 지연시켰다.
limitRate()
limitRate()는 백프레셔를 제어하는 데 사용되는 연산자이다. 백프레셔에 대한 설명은 아래 글에 적여있다.
[Spring]리액티브 스트림즈(Reactive Streams)
연산자에 대해 조금 더 구체적으로 말하자면 구독자가 한 번에 처리할 수 있는 요소의 개수를 제한하는 역할을 한다.
코드로 바로 예를 들면 아래와 같다.
Flux.range(1, 10)
.limitRate(3)
.delayElements(Duration.ofMillis(500))
.subscribe(System.out::println);
Thread.sleep(6000);
위 코드를 실행하면 처음에 3개의 숫자를 처리한 뒤 순서에 따라 처리되는 것을 확인할 수 있다.
'Java+Spring > WebFlux' 카테고리의 다른 글
[WebFlux]자주 사용하는 연산자 정리(5) (0) | 2023.08.27 |
---|---|
[WebFlux]자주 사용하는 연산자 정리(3), 그리고 (0) | 2023.05.21 |
[WebFlux]자주 사용하는 연산자 정리(2) - buffer (0) | 2023.05.21 |
[WebFlux]함수형 엔드포인트에서의 유효성 검증 (4) | 2023.04.07 |
[WebFlux]라우터, 함수형 엔드포인트 (2) | 2023.04.07 |
[WebFlux]현재 요청을 보낸, 로그인 사용자 정보 불러오기 (4) | 2023.04.06 |
- Total
- Today
- Yesterday
- 리스트
- RX100M5
- 남미
- 여행
- Python
- 맛집
- 세모
- 자바
- 지지
- 동적계획법
- 알고리즘
- 중남미
- 스프링
- Algorithm
- 백준
- 칼이사
- 야경
- spring
- 유럽
- Backjoon
- 세계여행
- 기술면접
- 유럽여행
- a6000
- 세계일주
- 면접 준비
- 스트림
- BOJ
- java
- 파이썬
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |