source

Spring Kafka 통합 테스트 하이 워터마크 파일에 쓰는 중 오류 발생

nicesource 2023. 7. 23. 14:21
반응형

Spring Kafka 통합 테스트 하이 워터마크 파일에 쓰는 중 오류 발생

스프링 부트 애플리케이션에서 spring-kaka-2.2.0을 사용하여 통합 테스트를 작성하고 있습니다. 테스트 케이스가 성공적으로 반환되었지만 그 이후에도 여러 오류가 발생합니다.

2019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)

구성 테스트

@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events");
}


@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

@Bean("consumerFactory")
 public ConsumerFactory<String, Professor> createConsumerFactory() {
     Map<String, Object> props = new HashMap<>();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
     JsonDeserializer<Professor> jsonDeserializer = new JsonDeserializer<>(Professor.class,false);
     return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
 }

@Bean("kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<String, Professor> kafkaListenerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory<String, Professor> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(createConsumerFactory());
     factory.setBatchListener(true);
     factory.getContainerProperties().setAckMode(AckMode.BATCH);
     return factory;
 }

@Bean
public StringJsonMessageConverter converter() {
    return new StringJsonMessageConverter();
}

@Bean
public Listener listener() {
    return new Listener();
}

public class Listener {
    public final CountDownLatch latch = new CountDownLatch(1);

    @Getter
    public List<Professor> list;

    @KafkaListener(topics = "test-events", containerFactory = "kafkaListenerContainerFactory")
    public void listen1(List<Professor> foo) {

        list=foo;
        this.latch.countDown();
       }
    }

}

시험수업

@EnableKafka
@SpringBootTest(classes = { KafkaProducerConfigTest.class })
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Autowired
private Listener listener;

@Test
public void testReceive() throws Exception {
    Professor professor = new Professor("Ajay", new Department("social", 1234));
    List<Professor> pro = new ArrayList<>();
    pro.add(professor);
    System.out.println(pro);
    kafkaConsumerService.professor(pro);
    System.out.println("The professor object is sent to kafka -----------------------------------");
    listener.latch.await();
    List<Professor> result = listener.getList();
    Professor resultPro = result.get(0);
    System.out.println(result);
    System.out.println(resultPro);

    assertEquals(pro.get(0).getName(), result.get(0).getName());

     }

 }

테스트 케이스testReceive()통과 중이지만 여전히 여러 오류 메시지가 표시됨

스택 추적 오류 1

019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)

스택 추적 오류 2

2019-02-21 11:12:35.446  WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$                   : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory)

java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory)

스택 추적 오류 3

2019-02-21 11:12:35.451  WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$                   : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory)

java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method) ~[na:1.8.0_191]

저는 비슷한 문제가 있었고 게리 러셀의 답변을 통해 log dir를 gradle build output dir로 가리키며 해결했습니다.log.dir=out/embedded-kafka또는 만일의 경우.log.dir=target/embedded-kafka.

다음 코드 조각은 다음을 사용하여 수행하는 방법을 보여줍니다.@EmbeddedKafka.

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {Application.class})
@EmbeddedKafka(
        topics = "topic",
        partitions = 1,
        controlledShutdown = true,
        brokerProperties={
                "log.dir=out/embedded-kafka"
        })
@TestPropertySource(
        properties = {
                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
        })
public class OutboxEventsTest {
...
}

실제로 쓸 수 있는 권한이 있습니까?/var/folders/s3 ...?

로 위치를 재정의할 수 있습니다.

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events")
        .brokerProperties(Collections.singletonMap(KafkaConfig.LogDirProp(), "/tmp/foo"));
}

Embedded Kafka의 브로커 속성을 변경하기만 하면 됩니다.

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {MyApplication.class})
@TestPropertySource(locations = "classpath:application-test.properties")
@EmbeddedKafka(
        topics = {"my_topic_name"},
        partitions = 1,
        brokerProperties = {"log.dir=target/kafka"}
        )

언급URL : https://stackoverflow.com/questions/54813073/spring-kafka-integration-test-error-while-writing-to-highwatermark-file

반응형