728x90
반응형
메시지 전달
Message 객체를 이용하여 보내고 header에 아래와 같은 값들을 포함하여 전송 할 수 있다.
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP
- TOPIC: kafka에서 발행할 데이터에 대한 각각의 명세들의 이름
- PARTITION_ID: 어떤 파티션 아이디에 보낼지 정할 수 있음
- MESSAGE_KEY: 데이터 메세지의 키를 설정
- TIMESTAMP: 데이터를 보낸 시간을 정함
폴더 구조
- 컨트롤러: RestAPI에서 요청을 받은 후 카프카에 토픽을 보냄
- 리스너: 카프카에서 데이터를 발행하면 리스너에서 구독하고 있다가 요청이 발생하면 정해진 로직을 처리함
- 메세지: 실제로 보낼 데이터(Json, String...)
- 프로듀서: 카프카에 데이터를 발행할 프로듀서 로직을 처리함
- 모델: 카프카에서 보낼 메세지의 데이터 타입을 정함
- 서비스: 카프카에 메세지를 보내기 전에 유효성 체크 등의 로직을 처리함
의존성 설치
build.gradle 혹은 pom.xml 의존성에 카프카를 추가해줍니다.
dependencies {
...
// kafka
implementation 'org.springframework.kafka:spring-kafka'
...
}
토픽 설정(KafkaTopicConfig.java)
kafka 주소 설정, 토픽의 이름 등을 설정
package com.deliverylab.inspection.kafka;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String baseURL;
@Bean
KafkaAdmin kafkaAdmin() {
System.out.println("kafkaAdmin " + baseURL);
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, baseURL);
return new KafkaAdmin(configs);
}
@Bean
NewTopic logTopic() {
return new NewTopic("log", 1, (short) 1);
}
}
카프카 프로듀서 설정 (KafkaProducerConfig.java)
카프카의 프로듀서에 대해서 설정함. 프로듀서가 메세지를 발행할 서버 주소, 키 직렬화, 값 직렬화 방법 등을 설정함.
package com.deliverylab.inspection.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Serializable> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps, new StringSerializer(), new JsonSerializer<>());
}
@Bean
KafkaTemplate<String, Serializable> jsonKafkaTemplate(ProducerFactory<String, Serializable> jsonProducerFactory) {
return new KafkaTemplate<>(jsonProducerFactory);
}
}
카프카 팩토리 (KafkaFactory.java)
카프카 컨슈머의 보일러 플레이트 코드를 줄이기 위해 팩토리 패턴으로 컨슈머를 설정함.
package com.deliverylab.inspection.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.deliverylab.inspection.kafka.messages.KafkaMessage;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaFactory<T extends KafkaMessage> {
@Value(value = "${kafka.bootstrapAddress}")
private String baseURL;
/**
* 컨슈머 설정
*
* BOOTSTRAP_SERVERS_CONFIG: kafka 첫번째 브로커 URL
* KEY_DESERIALIZER_CLASS_CONFIG: 키 역직렬화 방법
* VALUE_DESERIALIZER_CLASS_CONFIG: 값 역직렬화 방법
* GROUP_ID_CONFIG: 그룹 ID
*
* @return {Map<String, Object>}
*/
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, baseURL);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");
return props;
}
// 기본 타입 매퍼 만들어줌
public DefaultJackson2JavaTypeMapper typeMapper() {
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> classMap = new HashMap<>();
typeMapper.setIdClassMapping(classMap);
typeMapper.addTrustedPackages("*");
return typeMapper;
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory(Class<T> type) {
// 컨슈머 역직렬화 타입 설정
JsonDeserializer<T> jsonDeserializer = new JsonDeserializer<>(type);
jsonDeserializer.setTypeMapper(typeMapper());
jsonDeserializer.setUseTypeMapperForKey(true);
ConsumerFactory<String, T> logConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(), jsonDeserializer);
ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(logConsumerFactory);
log.info("Kafka log consumer configure " + type.toString());
return factory;
}
}
카프카 컨슈머 설정 (KafkaFactory.java)
카프카 팩토리를 이용해서 컨슈머를 초기화 해준다. 컨슈머의 메세지 타입을 정한다.
package com.deliverylab.inspection.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import com.deliverylab.inspection.kafka.messages.LogMessage;
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String baseURL;
@Autowired
private KafkaFactory<LogMessage> logKafkaFactory;
@Bean
ConcurrentKafkaListenerContainerFactory<String, LogMessage> logKafkaListenerContainerFactory() throws Exception {
return logKafkaFactory.kafkaListenerContainerFactory(LogMessage.class);
}
}
카프카 프로듀서 (KafkaProducer.java)
카프카로 정해진 토픽을 보냄
package com.deliverylab.inspection.kafka.producers;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import com.deliverylab.inspection.kafka.messages.LogMessage;
import java.io.Serializable;
@Slf4j
@NoArgsConstructor
@Component
public class LogProducer {
final String logTopic = "log";
@Autowired
private KafkaTemplate<String, Serializable> kafkaTemplate;
public void send(LogMessage msg) throws Exception {
kafkaTemplate.send(logTopic, msg).thenAcceptAsync((SendResult<String, Serializable> result) -> {
log.info("kafka/log/create successfully with offset = " + result.getRecordMetadata().offset());
}).exceptionally(throwable -> {
System.out.println("exception occurred!!");
return null;
});
}
}
카프카 메세지 (KafkaMessage.java)
카프카 메세지의 베이스 타입을 정의함
package com.deliverylab.inspection.kafka.messages;
import java.io.Serializable;
import com.deliverylab.inspection.models.enums.EAction;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage implements Serializable {
EAction action;
}
카프카 로그메세지(예시) (KafkaLogMessage.java)
카프카 메세지를 상속받으며 보낼 메세지의 타입을 정의함
package com.deliverylab.inspection.kafka.messages;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import com.deliverylab.inspection.models.Log;
import com.deliverylab.inspection.models.enums.EAction;
@Getter
@Setter
@NoArgsConstructor
public class LogMessage extends KafkaMessage {
private Log log;
public LogMessage(Log log, EAction action) {
super(action);
this.log = log;
}
}
서비스 (LogServices.java)
메세지를 보내기 전에 로그 찍기, 유효성 체크 등의 사전 처리 로직을 수행함.
package com.deliverylab.inspection.services;
import com.deliverylab.inspection.kafka.messages.LogMessage;
import com.deliverylab.inspection.kafka.producers.LogProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class LogService {
@Autowired
private LogProducer logProducer;
public void sendMessage(LogMessage msg) throws Exception {
log.info("[LogService] send log to topic, message: " + msg.toString());
logProducer.send(msg);
}
}
설정 파일 (application.yml)
카프카의 url을 설정해줌
kafka:
bootstrapAddress: localhost:29092
컨트롤러 (LogController.java)
RestAPI 요청을 받고 카프카에 토픽을 던짐. 아래는 예시이며 엔드 포인트는 어떻게 구조화 하냐에 따라 달라짐. FileUtils는 내가 따로 코딩해놓은 유틸이므로 파일 읽기 등의 로직을 작성하거나 주석처리 하고 실행해야 함.
package com.deliverylab.inspection.controllers.kafka;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.deliverylab.inspection.common.utils.FileUtils;
import com.deliverylab.inspection.kafka.messages.LogMessage;
import com.deliverylab.inspection.models.Log;
import com.deliverylab.inspection.models.enums.EAction;
import com.deliverylab.inspection.payload.request.kafka.log.CreateLogRequest;
import com.deliverylab.inspection.services.LogService;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Valid;
@CrossOrigin(origins = "*", maxAge = 3600)
@RestController
@RequestMapping("/kafka/log")
public class LogController {
@Autowired
LogService logService;
@GetMapping("/check")
public ResponseEntity<String> check() {
return ResponseEntity.ok("Please use methods.");
}
// 로그 생성
@PostMapping("/create")
public ResponseEntity<?> createLog(@Valid @RequestBody CreateLogRequest createLogRequest,
HttpServletRequest request) throws Exception {
Log logData = new Log(createLogRequest);
logData.setDate(new Date());
// 카프카로 데이터 전송
logService.sendMessage(new LogMessage(logData, EAction.CREATE));
return ResponseEntity.ok(logData);
}
// 로그 전부 읽어오기
@GetMapping("/read")
public ResponseEntity<String> readLog() throws Exception {
String logsData = FileUtils.fileReader("./logs/access.json");
return ResponseEntity.ok(logsData);
}
// 로그 뒤에서 부터 읽기
@GetMapping("/tail/{offset}")
public ResponseEntity<String> tailLog(@PathVariable int offset) throws Exception {
String logsData = FileUtils.fileReaderOnTail("./logs/access.json", offset);
return ResponseEntity.ok(logsData);
}
}
728x90
반응형
'MessageQueue > Kafka' 카테고리의 다른 글
Kafka 파티션 증가 없이 동시 처리량을 늘리는 방법 - Parallel Consumer (0) | 2024.03.19 |
---|---|
kafka 왜 좋을까? (1) | 2023.01.30 |