cd ~/environment && git clone https://github.com/youngwjung/kinesis-producer-library-examples.git
소스코드 리뷰
src/main/java 디렉토리를 열어 보겠습니다.
A, B, C, D로 시작하는 4개의 파일을 있고 각 파일별로 다른 방식으로 KPL을 사용하고 있습니다. A_SimpleProducer.java 파일을 열어보겠습니다.
해당 파일에서 KPL을 이용해서 Kinesis Data Stream에 데이터를 전송하는 코드를 AWS SDK 버전과 비교해 보면 Kinesis Producer Library가 Kinesis Data Stream에 대한 쓰기의 재시도, 일괄 처리 및 샤드 최적화를 단순화 해주는 것을 확인할수 있습니다.
해당 프로그램을 간략히 설명하자면 data/taxi-trips.csv 파일에서 택시 운행 레코드들를 읽어와서 Kinesis Producer Library를 활용하여 해당 레코드들을 통합해서 Kinesis Data Stream에 전송합니다.
Kinesis Producer 구성을 살펴보면
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setRecordMaxBufferedTime(3000)
.setMaxConnections(1)
.setRequestTimeout(60000)
.setRegion(region);
몇 가지 구성 속성을 설정합니다. 자세히 알아보려면 각 항목을 확장하세요.
RecordMaxBufferedTime
RecordMaxBufferedTime은 레코드가 하나 이상의 Kinesis Data Streams로 전송되기 전에 버퍼에 보관될수 있는 최대 시간(밀리초)을 의미합니다. 다른 설정값들에 따라서 레코드가 RecordMaxBufferedTime에 명시된 값보다 더 빨리 Data Stream으로 전송될수 있습니다. RecordMaxBufferedTime는 레코드 전송이 실패해서 재시도 걸리는 시간도 포함될수 있습니다.
RecordMaxBufferedTime을 너무 낮게 설정하면 처리량에 부정적인 영향을 미칠 수 있습니다.
RecordMaxBufferedTime의 기본값은 100ms입니다.
MaxConnections
백엔드 서버와 맺을수 있는 최대 커넥션 수입니다. Kinesis Data Streams에 대한 HTTP 요청은 다수의 커넥션을 통해 병렬로 전송됩니다.
이 값을 너무 높게 설정하면 네트워크 지연이 발생하거나 처리량을 늘리지 않으며 데이터를 전송하는 애플리케이션에서 불필요한 리소스를 낭비할 수 있습니다.
MaxConnections의 기본값은 24입니다.
RequestTimeout
HTTP 요청이 전송된 시점 기준으로 클라이언트에서 응답을 받기전까지 얼마나 기다릴수 있는지를 해당 파라미터를 통해서 설정합니다. 특정 요청에 대한 응답 대기 시간이 이 값을 초과하면 해당 요청에 대한 응답을 더이상 기다리지 않습니다.
특정 요청에 응답이 제한 시간안에 오지 않더라고 해당 요청을 통해서 전송된 레코드는 Kinesis Data Streams에 성공적으로 전송되었을 수도 있으며 재시도하면 중복될 수 있습니다. 따라서 이 값을 너무 낮게 설정하면 데이터 스트림에 중복 레코드(Duplicates)가 생성될 가능성이 높아집니다.
RequestTimeout의 기본값은 6000ms입니다.
Kinesis Producer에 구성 설정을 하고 해당 설정을 가진 Kinesis Producer 객체를 생성합니다.
final KinesisProducer kinesis = new KinesisProducer(config);
생성이 시작되었다는 출력이 표시되면 전송 관련 지표가 CloudWatch가 새로 고쳐질 때까지 약 5분 정도 기다립니다.
lab-user:~/environment/kinesis-producer-library-examples (main) $ java -cp target/amazon-kinesis-replay-1.0-SNAPSHOT.jar A_SimpleProducer
log4j:WARN No appenders could be found for logger (com.amazonaws.services.kinesis.producer.KinesisProducer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Starting to produce data to input-stream in the ap-northeast-2 region.
Kinesis Data Streams 관리 콘솔로 이동해서 input-stream을 선택하고 모니터링을 클릭하여 Put record 및 Incoming data 지표를 확인하세요.
모니터링 페이지에 표시되는 지표를 통해서 레코드가 들어오는 것이 확인 된다면 축하합니다! Kinesis Producer 라이브러리를 사용하여 Kinesis Data Stream에 성공적으로 데이터를 전송했습니다.