티스토리 뷰
목차
지난 글에서, 도커로 카프카를 실행하고 자바 프로젝트의 yml파일을 아래와 같이 구성했다.
spring:
kafka:
topic:
myTopic: test-topic
consumer:
bootstrap-servers: localhost:9092
group-id: test
auto-offset-reset: earliest
producer:
bootstrap-servers: localhost:9092
이번 글에선 위 설정을 바탕으로 실제 프로듀서와 컨슈머를 구현한 뒤,
간단한 사칙연산을 구현해 보겠다.
들어가기 전에 먼저, 위 설정은 보안과 관련된 부분이 전부 제거된 상태이기 때문에
MSK 사용 시 해당 설정을 추가해야 한다.
KafkaProducerConfig
먼저 프로듀서에 대한 설정이다. 굉장히 간단하게 아래와 같이 구현할 수 있다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
return SenderOptions.create(props);
}
@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
return new ReactiveKafkaProducerTemplate<>(senderOptions());
}
}
@Value 에는 우리가 지난 글에서 생성한 값이 할당되기 때문에 경로를 잘 신경 써야 한다.
이후의 코드는 자세히 뜯어보면 아래와 같이 구성되어 있다.
public SenderOptions<String, String> senderOptions()
이 메서드는 'SenderOptions' 타입의 객체를 생성해서 반환한다.
여기서 'SenderOptions'는 리액티브 카프카 프로듀서의 설정을 담당한다.
Map<String, Object> props = new HashMap<>();
먼저 prop이라는 이름의 Map을 생성한다. 이 안에 프로듀서의 설정을 담게 된다.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
'ProducerConfig.BOOTSTRAP_SERVERS_CONFIG'는 카프카 서버의 주소를 설정하는 키이다.
여기서는 위에서 설정한 'bootstrapServers'가 그 값이 된다.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
프로듀서가 카프카로 메시지를 보낼 때 키의 직렬화 방식을 설정한다.
보편적으로 범용성이 좋은 문자열을 직렬화하기 때문에 나도 'StringSerializer'로 설정했다.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
이번엔 값의 직렬화 방식이다. 마찬가지로 'StringSerializer'를 설정한다.
props.put(ProducerConfig.ACKS_CONFIG, "all");
이 설정은 프로듀서가 메시지를 성공적으로 전송했다는 것을 어떻게 확인하는지 설정한다.
그러니까 메시지를 전송한 뒤 어떤 종류의 확인 응답(acknowledgment)을 받을 것인지 지정한다.
해당 설정은 다음과 같은 종류가 있다.
- '0'
어떠한 확인도 받지 않는다. 즉 메시지 전송 확인을 기다리지 않아 전달 속도가 빠르다. 그러나 메시지 전송의 성공여부를 알 수 없기 때문에 데이터 손실의 위험이 있다. - '1'
리더 브로커로부터 성공 메시지를 받는다. 이 경우 다른 레플리카 브로커의 상태는 알 수 없다. - 'all' 또는 '-1'
리더 브로커 뿐 아니라 모든 레플리카들이 메시지를 확정할 때까지 기다린다. 높은 데이터 신뢰성을 보장하지만,
모든 레플리카의 확정을 기다려야 하기 때문에 지연시간이 생길 수 있다.
return SenderOptions.create(props);
방금 설정한 'props'를 사용해 'SenderOptions' 객체를 생성 및 반환한다.
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate()
이 메서드는 'ReactiveKafkaProducerTemplate'타입의 객체를 생성해서 반환한다.
여기서 'ReactiveKafkaProducerTemplate'란 이름 그대로 리액티브 하게 카프카에 메시지를 프로듀싱하는 데 사용된다.
이후의 로직에서 해당 타입의 템플릿을 호출해서 사용한다.
return new ReactiveKafkaProducerTemplate<>(senderOptions());
위에서 생성한 'SenderOptions' 객체를 사용해서 'ReactiveKafkaProducerTemplate' 객체를 생성해 반환한다.
이제 이 템플릿을 이용해 카프카로 리액티브 하게 메시지를 전달할 수 있다.
KafkaConsumerConfig
계속해서 컨슈터 설정이다. 달라진 부분이 거의 없기 때문에 몇 가지만 짚고 넘어간다.
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.topic.myTopic}")
private String topicName;
@Bean
public ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton(topicName));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions());
}
}
@Value("${spring.kafka.topic.myTopic}")
private String topicName;
기존에 설정한 토픽 이름을 바인딩해 준다.
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
이외에는 사실상 다른 것이 없기 때문에 그냥 넘어가도록 하겠다.
KafkaService
이어서 비즈니스 로직과 핸들러 메서드를 구현하도록 하겠다. 시나리오는 다음과 같다.
- 주어진 엔드포인트를 통해 프로듀서가 UUID로 이루어진 키와 두 개의 숫자로 이루어진 값을 카프카에 발행한다.
- 컨슈머는 이를 받아 정해진 연산(기본적으로는 더하기)을 실행하고 결과를 콘솔에 출력한다.
여기서 UUID는 비동기 연산에서 연산의 흐름을 파악하기 위한 것으로, 과정이 복잡해질수록 중요해진다.
어쨌거나 먼저 메시지를 발행하는 쪽의 비즈니스 로직을 구현하자.
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {
private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
public Mono<Void> sendMessage(String topic, String key, String message) {
return kafkaProducerTemplate.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic, key, message), key)))
.doOnNext(stringSenderResult -> log.info(message))
.next()
.then()
.onErrorResume(e -> {
log.error("Failed to send message: " + e.getMessage());
return Mono.empty();
});
}
}
역시 한 줄씩 뜯어보면 다음과 같다.
private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
위에서 정의한 'ReactiveKafkaProducerTemplate' 빈을 주입받는다.
public Mono<Void> sendMessage(String topic, String key, String message)
'sendMessage' 메서드를 정의한다. 이 메서드는 토픽, 키, 메시지를 인자로 받아 비동기로 처리하게 된다.
return kafkaProducerTemplate.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic, key, message), key)))
'ReactiveKafkaProducerTemplate'의 'send'메서드를 호출해 카프카로 메시지를 보낸다.
내부를 더 살펴보면
Mono.just(SenderRecord.create(new ProducerRecord<>(topic, key, message), key))
이 부분은 'SenderRecord' 객체를 생성하고 그 안에 'ProducerRecord' 객체를 생성하여
토픽, 키 메시지를 설정한다.
여기서 'ProducerRecord'란 카프카에서 사용하는 일종의 자료구조이며, 메시지를 카프카 브로커로 보낼 때 사용된다.
주된 필드는 다음과 같은 성질을 가진다.
- topic: 메시지를 보낼 토픽의 이름
- key: 메시지 키. 같은 키를 가진 메시지는 동일한 파티션에 저장된다.
- value(우리의 경우는 message): 실제로 보낼 메시지의 본문
계속해서 'SenderRecord'는 리액티브 카프카 라이브러리에서 제공하는 클래스로 'ProducerRecord'와 함께
추가적인 메타데이터나 콜백을 전달할 수 있게 된다. 추가로 전달되는 데이터를 'correlationMetadata'라 하며,
이 메타데이터는 메시지 전송이 성공하면 결과와 함께 반환된다.
즉, 메시지 전송 이후의 로깅이나 저장 등의 로직을 위해 사용되는 클래스라고 할 수 있다.
코드의 나머지 부분은 기초적인 웹플럭스 연산자이기 때문에 생략한다.
ResultService
이어서 결과를 구독해서 실제로 처리할 로직을 작성해 보자.
위 클래스와 함께 하나의 클래스로 작성해도 무방하지만, 역할을 좀 더 명확하게 하기 위해 굳이 나누어주었다.
public class ResultService {
private final KafkaService kafkaService;
private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
@Value("${spring.kafka.topic.myTopic}")
private String topicName;
@PostConstruct
public void consumeResult() {
consumeAndProcessMessage(topicName)
.subscribe(System.out::println);
}
public Flux<String> consumeAndProcessMessage(String outputTopic) {
return kafkaConsumerTemplate.receiveAutoAck()
.flatMap(acknowledgableConsumerRecord -> {
String message = acknowledgableConsumerRecord.value();
log.info("Received message: " + message);
if (message.startsWith("INPUT:")) {
if (!message.matches("^INPUT:\\d+,\\d+$")) {
return Flux.just("Invalid message format: " + message);
}
String[] numbers = message.replace("INPUT:", "").split(",");
Integer sum = Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]);
String outputMessage = "OUTPUT:" + sum;
return kafkaService.sendMessage(outputTopic, acknowledgableConsumerRecord.key(), outputMessage)
.thenReturn("Process Key: " + acknowledgableConsumerRecord.key() + ", Output: " + sum);
} else if (message.startsWith("OUTPUT:")) {
return Flux.just("Received output: " + message);
} else {
return Flux.just("Invalid message format: " + message);
}
});
}
}
위와 마찬가지로 코드를 뜯어보자.
private final KafkaService kafkaService;
private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
@Value("${spring.kafka.topic.myTopic}")
private String topicName;
필요한 빈과 변수를 가져와 할당한다.
@PostConstruct
public void consumeResult() {
consumeAndProcessMessage(topicName)
.subscribe(System.out::println);
}
'@PostConstruct' 애너테이션은 표시된 메서드를 클래스 생성 후 초기화 작업을 할 수 있도록 실행하는 기능을 한다.
'consumeResult' 메서드는 아래에 구현될 'consumeAndProcessMessage'를 사용해
Kafka 메시지를 구독하고 처리, 이후 구독한 메시지를 콘솔에 출력한다.
public Flux<String> consumeAndProcessMessage(String outputTopic)
위 메서드는 한 마디로 말하면 메시지를 수신하고 처리하는 로직 그 자체를 담은 메서드이다.
return kafkaConsumerTemplate.receiveAutoAck()
'ReactiveKafkaConsumerTemplate'의 'receiveAutoAck'메서드를 사용하여
메시지를 자동으로 수신하고 확인(ack)한다. 이후의 로직은 수신한 메시지를 처리하는 로직이다.
.flatMap(acknowledgableConsumerRecord -> {
String message = acknowledgableConsumerRecord.value();
log.info("Received message: " + message);
if (message.startsWith("INPUT:")) {
if (!message.matches("^INPUT:\\d+,\\d+$")) {
return Flux.just("Invalid message format: " + message);
}
String[] numbers = message.replace("INPUT:", "").split(",");
Integer sum = Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]);
String outputMessage = "OUTPUT:" + sum;
return kafkaService.sendMessage(outputTopic, acknowledgableConsumerRecord.key(), outputMessage)
.thenReturn("Process Key: " + acknowledgableConsumerRecord.key() + ", Output: " + sum);
} else if (message.startsWith("OUTPUT:")) {
return Flux.just("Received output: " + message);
} else {
return Flux.just("Invalid message format: " + message);
}
});
먼저 수신한 메시지의 값을 가져와 'INPUT:'으로 시작하는 메시지를 처리한다.
처리 후 메시지를 'OUTPUT:'으로 시작하는 값에 담아 다시 메시지를 발행하며,
이는 else if 다음 메시지에서 받아서 처리하게 된다.
KafkaController
마지막으로 이 코드를 실행시킬 엔드포인트를 담고 있는 핸들러메서드를 작성한다.
위에 적었듯 두 개의 숫자를 UUID와 함께 발행하도록 하겠다.
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
public class KafkaController {
private final KafkaService kafkaService;
@Value("${spring.kafka.topic.myTopic}")
private String topicName;
@PostMapping("/kafka")
public Mono<Void> addNumbers(@RequestBody Map<String, Integer> numbers) {
UUID uuid = UUID.randomUUID();
log.info("[KAFKA TEST START]");
Integer number1 = numbers.get("number1");
Integer number2 = numbers.get("number2");
String key = uuid.toString();
String message = "INPUT:" + number1 + "," + number2;
return kafkaService.sendMessage(topicName, key, message);
}
}
여기선 딱히 줄마다 설명할 건 없고, 서비스 클래스를 주입받아 기존의 목적대로 발행하도록 한다.
이와 같이 구현한 뒤에 카프카와 서버를 켜고, 요청을 보내면 다음과 같이 처리되는 것을 확인할 수 있다.
이것으로 자바와 웹플럭스, 카프카를 이용한 리액티브 프로그래밍의 기초 구현이 끝났다.
물론 내가 원하는 목적지까지는 아직 한참이지만..
일단 기초 구현, 끝!
'Development > Cloud' 카테고리의 다른 글
[Jenkins]쌓여있는 기존 빌드를 수동으로 삭제하고 싶을 때 (0) | 2024.08.08 |
---|---|
[Kafka]카프카 명령어 모아두기 (0) | 2023.10.10 |
[Kafka]Avro에 대해서 (0) | 2023.09.15 |
[Kafka]Java 17 + WebFlux + Docker 환경에서 카프카 실습 (1) (0) | 2023.08.31 |
[Cloud]Java 17 + WebFlux 환경에 AWS MSK 도입하기(1) - MSK? (0) | 2023.08.01 |
[Trouble Shooting]AWS 타임아웃 설정 (0) | 2023.07.11 |
- Total
- Today
- Yesterday
- spring
- 남미
- a6000
- 동적계획법
- 유럽여행
- Backjoon
- 파이썬
- 알고리즘
- 스프링
- Algorithm
- 기술면접
- 지지
- 면접 준비
- java
- 여행
- 야경
- 중남미
- 리스트
- 자바
- 백준
- 세계일주
- 맛집
- 유럽
- 세모
- Python
- BOJ
- RX100M5
- 세계여행
- 스트림
- 칼이사
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |