티스토리 뷰

728x90
반응형

목차

     

    지난 글에서, 도커로 카프카를 실행하고 자바 프로젝트의 yml파일을 아래와 같이 구성했다.

    spring:
      kafka:
        topic:
          myTopic: test-topic  
        consumer:
          bootstrap-servers: localhost:9092
          group-id: test
          auto-offset-reset: earliest
        producer:
          bootstrap-servers: localhost:9092

    이번 글에선 위 설정을 바탕으로 실제 프로듀서와 컨슈머를 구현한 뒤,

     

    간단한 사칙연산을 구현해 보겠다.

     

    들어가기 전에 먼저, 위 설정은 보안과 관련된 부분이 전부 제거된 상태이기 때문에

     

    MSK 사용 시 해당 설정을 추가해야 한다.

     

    KafkaProducerConfig

     

    먼저 프로듀서에 대한 설정이다. 굉장히 간단하게 아래와 같이 구현할 수 있다.

    @Configuration
    public class KafkaProducerConfig {
    
    	@Value("${spring.kafka.bootstrap-servers}")
    	private String bootstrapServers;
    	@Value("${spring.kafka.consumer.group-id}")
    	private String groupId;
    
    	@Bean
    	public SenderOptions<String, String> senderOptions() {
    
    		Map<String, Object> props = new HashMap<>();
    
    		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
    		props.put(ProducerConfig.ACKS_CONFIG, "all");
    
    		return SenderOptions.create(props);
    	}
    
    	@Bean
    	public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
    		return new ReactiveKafkaProducerTemplate<>(senderOptions());
    	}
    }

    @Value 에는 우리가 지난 글에서 생성한 값이 할당되기 때문에 경로를 잘 신경 써야 한다.

     

    이후의 코드는 자세히 뜯어보면 아래와 같이 구성되어 있다.

    	public SenderOptions<String, String> senderOptions()

    이 메서드는 'SenderOptions' 타입의 객체를 생성해서 반환한다.

     

    여기서 'SenderOptions'는 리액티브 카프카 프로듀서의 설정을 담당한다.

    		Map<String, Object> props = new HashMap<>();

    먼저 prop이라는 이름의 Map을 생성한다. 이 안에 프로듀서의 설정을 담게 된다.

    		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    'ProducerConfig.BOOTSTRAP_SERVERS_CONFIG'는 카프카 서버의 주소를 설정하는 키이다.

     

    여기서는 위에서 설정한 'bootstrapServers'가 그 값이 된다.

    		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    프로듀서가 카프카로 메시지를 보낼 때 키의 직렬화 방식을 설정한다.

     

    보편적으로 범용성이 좋은 문자열을 직렬화하기 때문에 나도 'StringSerializer'로 설정했다.

    		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    이번엔 값의 직렬화 방식이다. 마찬가지로 'StringSerializer'를 설정한다.

    		props.put(ProducerConfig.ACKS_CONFIG, "all");

    이 설정은 프로듀서가 메시지를 성공적으로 전송했다는 것을 어떻게 확인하는지 설정한다.

     

    그러니까 메시지를 전송한 뒤 어떤 종류의 확인 응답(acknowledgment)을 받을 것인지 지정한다.

     

    해당 설정은 다음과 같은 종류가 있다.

     

    1. '0'
      어떠한 확인도 받지 않는다. 즉 메시지 전송 확인을 기다리지 않아 전달 속도가 빠르다. 그러나 메시지 전송의 성공여부를 알 수 없기 때문에 데이터 손실의 위험이 있다.
    2. '1'
      리더 브로커로부터 성공 메시지를 받는다. 이 경우 다른 레플리카 브로커의 상태는 알 수 없다.
    3. 'all' 또는 '-1'
      리더 브로커 뿐 아니라 모든 레플리카들이 메시지를 확정할 때까지 기다린다. 높은 데이터 신뢰성을 보장하지만,
      모든 레플리카의 확정을 기다려야 하기 때문에 지연시간이 생길 수 있다.
    		return SenderOptions.create(props);

    방금 설정한 'props'를 사용해 'SenderOptions' 객체를 생성 및 반환한다.

    	public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate()

    이 메서드는 'ReactiveKafkaProducerTemplate'타입의 객체를 생성해서 반환한다.

     

    여기서 'ReactiveKafkaProducerTemplate'란 이름 그대로 리액티브 하게 카프카에 메시지를 프로듀싱하는 데 사용된다.

     

    이후의 로직에서 해당 타입의 템플릿을 호출해서 사용한다.

    		return new ReactiveKafkaProducerTemplate<>(senderOptions());

    위에서 생성한 'SenderOptions' 객체를 사용해서 'ReactiveKafkaProducerTemplate' 객체를 생성해 반환한다.

     

    이제 이 템플릿을 이용해 카프카로 리액티브 하게 메시지를 전달할 수 있다.

     

    KafkaConsumerConfig

     

    계속해서 컨슈터 설정이다. 달라진 부분이 거의 없기 때문에 몇 가지만 짚고 넘어간다.

    @Configuration
    public class KafkaConsumerConfig {
    
    	@Value("${spring.kafka.bootstrap-servers}")
    	private String bootstrapServers;
    	@Value("${spring.kafka.consumer.group-id}")
    	private String groupId;
    	@Value("${spring.kafka.topic.myTopic}")
    	private String topicName;
    
    	@Bean
    	public ReceiverOptions<String, String> receiverOptions() {
    
    		Map<String, Object> props = new HashMap<>();
    
    		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    		return ReceiverOptions.<String, String>create(props)
    			.subscription(Collections.singleton(topicName));
    	}
    
    	@Bean
    	public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() {
    		return new ReactiveKafkaConsumerTemplate<>(receiverOptions());
    	}
    }
    	@Value("${spring.kafka.topic.myTopic}")
    	private String topicName;

    기존에 설정한 토픽 이름을 바인딩해 준다.

    		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    이외에는 사실상 다른 것이 없기 때문에 그냥 넘어가도록 하겠다.

     

    KafkaService

     

    이어서 비즈니스 로직과 핸들러 메서드를 구현하도록 하겠다. 시나리오는 다음과 같다.

     

    1. 주어진 엔드포인트를 통해 프로듀서가 UUID로 이루어진 키와 두 개의 숫자로 이루어진 값을 카프카에 발행한다.
    2. 컨슈머는 이를 받아 정해진 연산(기본적으로는 더하기)을 실행하고 결과를 콘솔에 출력한다.

    여기서 UUID는 비동기 연산에서 연산의 흐름을 파악하기 위한 것으로, 과정이 복잡해질수록 중요해진다.

     

    어쨌거나 먼저 메시지를 발행하는 쪽의 비즈니스 로직을 구현하자.

    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class KafkaService {
    
    	private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
    
    	public Mono<Void> sendMessage(String topic, String key, String message) {
    		return kafkaProducerTemplate.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic, key, message), key)))
    			.doOnNext(stringSenderResult -> log.info(message))
    			.next()
    			.then()
    			.onErrorResume(e -> {
    				log.error("Failed to send message: " + e.getMessage());
    				return Mono.empty();
    			});
    	}
    }

    역시 한 줄씩 뜯어보면 다음과 같다.

    	private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;

    위에서 정의한 'ReactiveKafkaProducerTemplate' 빈을 주입받는다.

    	public Mono<Void> sendMessage(String topic, String key, String message)

    'sendMessage' 메서드를 정의한다. 이 메서드는 토픽, 키, 메시지를 인자로 받아 비동기로 처리하게 된다.

    		return kafkaProducerTemplate.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic, key, message), key)))

    'ReactiveKafkaProducerTemplate'의 'send'메서드를 호출해 카프카로 메시지를 보낸다.

     

    내부를 더 살펴보면

    Mono.just(SenderRecord.create(new ProducerRecord<>(topic, key, message), key))

    이 부분은 'SenderRecord' 객체를 생성하고 그 안에 'ProducerRecord' 객체를 생성하여

     

    토픽, 키 메시지를 설정한다.

     

    여기서 'ProducerRecord'란 카프카에서 사용하는 일종의 자료구조이며, 메시지를 카프카 브로커로 보낼 때 사용된다.

     

    주된 필드는 다음과 같은 성질을 가진다.

     

    • topic: 메시지를 보낼 토픽의 이름
    • key: 메시지 키. 같은 키를 가진 메시지는 동일한 파티션에 저장된다.
    • value(우리의 경우는 message): 실제로 보낼 메시지의 본문

    계속해서 'SenderRecord'는 리액티브 카프카 라이브러리에서 제공하는 클래스로 'ProducerRecord'와 함께

     

    추가적인 메타데이터나 콜백을 전달할 수 있게 된다. 추가로 전달되는 데이터를 'correlationMetadata'라 하며,

     

    이 메타데이터는 메시지 전송이 성공하면 결과와 함께 반환된다.

     

    즉, 메시지 전송 이후의 로깅이나 저장 등의 로직을 위해 사용되는 클래스라고 할 수 있다.

     

    코드의 나머지 부분은 기초적인 웹플럭스 연산자이기 때문에 생략한다.

     

    ResultService

     

    이어서 결과를 구독해서 실제로 처리할 로직을 작성해 보자.

     

    위 클래스와 함께 하나의 클래스로 작성해도 무방하지만, 역할을 좀 더 명확하게 하기 위해 굳이 나누어주었다.

    public class ResultService {
    
    	private final KafkaService kafkaService;
    	private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
    	@Value("${spring.kafka.topic.myTopic}")
    	private String topicName;
    
    	@PostConstruct
    	public void consumeResult() {
    		consumeAndProcessMessage(topicName)
    			.subscribe(System.out::println);
    	}
    
    	public Flux<String> consumeAndProcessMessage(String outputTopic) {
    		return kafkaConsumerTemplate.receiveAutoAck()
    			.flatMap(acknowledgableConsumerRecord -> {
    				String message = acknowledgableConsumerRecord.value();
    				log.info("Received message: " + message);
    				if (message.startsWith("INPUT:")) {
    					if (!message.matches("^INPUT:\\d+,\\d+$")) {
    						return Flux.just("Invalid message format: " + message);
    					}
    					String[] numbers = message.replace("INPUT:", "").split(",");
    					Integer sum = Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]);
    					String outputMessage = "OUTPUT:" + sum;
    					return kafkaService.sendMessage(outputTopic, acknowledgableConsumerRecord.key(), outputMessage)
    						.thenReturn("Process Key: " + acknowledgableConsumerRecord.key() + ", Output: " + sum);
    				} else if (message.startsWith("OUTPUT:")) {
    					return Flux.just("Received output: " + message);
    				} else {
    					return Flux.just("Invalid message format: " + message);
    				}
    			});
    	}
    }

    위와 마찬가지로 코드를 뜯어보자.

    	private final KafkaService kafkaService;
    	private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
    	@Value("${spring.kafka.topic.myTopic}")
    	private String topicName;

    필요한 빈과 변수를 가져와 할당한다.

    	@PostConstruct
    	public void consumeResult() {
    		consumeAndProcessMessage(topicName)
    			.subscribe(System.out::println);
    	}

    '@PostConstruct' 애너테이션은 표시된 메서드를 클래스 생성 후 초기화 작업을 할 수 있도록 실행하는 기능을 한다.

     

    'consumeResult' 메서드는 아래에 구현될 'consumeAndProcessMessage'를 사용해

     

    Kafka 메시지를 구독하고 처리, 이후 구독한 메시지를 콘솔에 출력한다.

    	public Flux<String> consumeAndProcessMessage(String outputTopic)

    위 메서드는 한 마디로 말하면 메시지를 수신하고 처리하는 로직 그 자체를 담은 메서드이다.

    		return kafkaConsumerTemplate.receiveAutoAck()

    'ReactiveKafkaConsumerTemplate'의 'receiveAutoAck'메서드를 사용하여

     

    메시지를 자동으로 수신하고 확인(ack)한다. 이후의 로직은 수신한 메시지를 처리하는 로직이다.

    			.flatMap(acknowledgableConsumerRecord -> {
    				String message = acknowledgableConsumerRecord.value();
    				log.info("Received message: " + message);
    				if (message.startsWith("INPUT:")) {
    					if (!message.matches("^INPUT:\\d+,\\d+$")) {
    						return Flux.just("Invalid message format: " + message);
    					}
    					String[] numbers = message.replace("INPUT:", "").split(",");
    					Integer sum = Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]);
    					String outputMessage = "OUTPUT:" + sum;
    					return kafkaService.sendMessage(outputTopic, acknowledgableConsumerRecord.key(), outputMessage)
    						.thenReturn("Process Key: " + acknowledgableConsumerRecord.key() + ", Output: " + sum);
    				} else if (message.startsWith("OUTPUT:")) {
    					return Flux.just("Received output: " + message);
    				} else {
    					return Flux.just("Invalid message format: " + message);
    				}
    			});

    먼저 수신한 메시지의 값을 가져와 'INPUT:'으로 시작하는 메시지를 처리한다.

     

    처리 후 메시지를 'OUTPUT:'으로 시작하는 값에 담아 다시 메시지를 발행하며,

     

    이는 else if 다음 메시지에서 받아서 처리하게 된다.

     

    KafkaController

     

    마지막으로 이 코드를 실행시킬 엔드포인트를 담고 있는 핸들러메서드를 작성한다.

     

    위에 적었듯 두 개의 숫자를 UUID와 함께 발행하도록 하겠다.

    @Slf4j
    @RestController
    @RequiredArgsConstructor
    @RequestMapping("/api")
    public class KafkaController {
    
    	private final KafkaService kafkaService;
    
    	@Value("${spring.kafka.topic.myTopic}")
    	private String topicName;
    
    	@PostMapping("/kafka")
    	public Mono<Void> addNumbers(@RequestBody Map<String, Integer> numbers) {
    
    		UUID uuid = UUID.randomUUID();
    
    		log.info("[KAFKA TEST START]");
    		Integer number1 = numbers.get("number1");
    		Integer number2 = numbers.get("number2");
    		String key = uuid.toString();
    		String message = "INPUT:" + number1 + "," + number2;
    
    		return kafkaService.sendMessage(topicName, key, message);
    	}
    }

    여기선 딱히 줄마다 설명할 건 없고, 서비스 클래스를 주입받아 기존의 목적대로 발행하도록 한다.

     

    이와 같이 구현한 뒤에 카프카와 서버를 켜고, 요청을 보내면 다음과 같이 처리되는 것을 확인할 수 있다.

     

    이것으로 자바와 웹플럭스, 카프카를 이용한 리액티브 프로그래밍의 기초 구현이 끝났다.

     

    물론 내가 원하는 목적지까지는 아직 한참이지만..

     

    일단 기초 구현, 끝!

    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2025/01   »
    1 2 3 4
    5 6 7 8 9 10 11
    12 13 14 15 16 17 18
    19 20 21 22 23 24 25
    26 27 28 29 30 31
    글 보관함