Error Analysis and Resiliency

해당 섹션에서는 Kinesis Producer Library의 RecordTTL을 사용하여 Kinesis Data Stream으로 데이터를 안정적이며 유연하게 전송하는 방법에 대해서 알아보겠습니다.

또한 데이터 전송이 성공하거나 실패했을 경우에 애플리케이션단에서 해당 요청에 대한 응답을 처리하는 방법 및 전략에 대해서 살펴보겠습니다.

이전 세션과 동일한 Cloud9 인스턴스에서...

B_Producer_withErrorAnalysis.java 파일을 열어보겠습니다.

이전 세션에서 사용한 A_Simple_Producer.java 파일과 거의 유사하지만 몇가지 중요한 차이점이 있습니다.

  1. KinesisProducerConfiguration 객체에 RecordTTL이라는 매개변수를 설정하고 있습니다.

  2. Future 객체를 LinkedList 타입으로 정의하고 각 addUserRecord 함수에서 실행하고 반환되는 값을(레코드 생성 요청에 대한 응답) 해당 LinkedList에 추가합니다. 그런 다음 각 응답의 상태값에 따라서 다른 액션을 취합니다.

각각의 동작 방식에 대해 더 자세히 살펴보겠습니다.

Record TTL (time-to-live)

.setRecordTtl(30000);

Kinesis Producer Library에서 각 사용자 레코드에는 TTL(time-to-live)이 있습니다. 기본적으로 이러한 레코드가 할당된 시간 내에 Kinesis Data Stream로 성공적으로 전달되지 않으면 넣기 실패(Failed puts)로 표시됩니다.

만약 레코드가 생성되고 일정 시간이 지난후에 쓸모가 없어진다거나 지연이 발생한 레코드가 Kinesis Data Stream에 유입되서 Consumer로 전달된 경우 문제가 발생할 요지가 있다면 RecordTTL 매개변수를 명시해서 처리할수 있습니다. RecordTTL 값에 레코드가 전송이 성공할때까지 재시도 할 최대 시간으로 설정합니다.

무기한 재시도를 통해서 레코드 손실을 방지하려면 RecordTTL 값에 Integer.MAX_VALUE 와 같은 큰 값을 지정합니다. 그러나 네트워크 지연 또는 Throttling 한글로 문제로 인해 메시지가 전송이 차단될 수 있도 있으니 Kinesis Producer Library 지표 기능을 사용하여 적절하게 이러한 이벤트에 대응할수 있습니다.

fail_if_throttled 매개변수의 값을 true로 지정해서 Throttling이 발생할때 재시도를 하지 않도록 설정할수도 있습니다.

RecordTtl은 기본적으로 30000ms(30초)로 설정됩니다.

Futures for Asynchronous Publishing

KinesisProducer 객체를 정의한 라인 바로 밑에 같은 코드가 있습니다.

List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>();

여기에서 사용자 레코드 전송하는 함수의 결과 Future 객체로 LinkerdList에 담고 해당 LinkerdList는 Kinesis Data Stream에 레코드를 비동기식으로 푸시하는 데 사용됩니다.

putFutures.add(kinesis.addUserRecord(streamName, trip.getId(), data));

해당 List에 있는 모든 레코드를 비동기 방식으로 Kinesis Data Stream 푸시하고 Futures::get() 함수를 호출해서 KPL이 제공하는 각 개별 요청에 대한 응답을 동기적으로 처리할수 있습니다.

여기서 이점은 가능한 한 빨리 Kinesis Data Stream에 데이터를 제출할 수 있지만 여전히 각 결과에 따라 조치를 취하고 대응할 수 있다는 것입니다.

UserRecordResult result = f.get();
if (result.isSuccessful()) 
{
    System.out.println("Put record into shard " +
            result.getShardId());
} 
else 
{
    for (Attempt attempt : result.getAttempts()) 
    {
        System.out.println(attempt);
    }
}

레코드 전송이 실패할 경우 전송이 성공할때 까지 혹은 이전에 설명한 대로 RecordTtl 매개변수로 지정한 시간내에서 계속 재시도합니다. 전달하지 못한 레코드를 Dead Letter Queue에 넣거나 경보를 발생시키는 등의 사용자 지정 로직을 추가할수 있습니다. 애플리케이션 요구 사항에 따라 실패하거나 Throttling된 요청에 대한 적절한 대응방안을 구성하시는것을 추천드립니다.

Last updated