A Simple Producer

소스코드 다운로드

터미널에 다음 명령어를 입력하여 깃허브에서 소스코드를 다운받습니다.

cd ~/environment && git clone https://github.com/youngwjung/kinesis-producer-library-examples.git

소스코드 리뷰

src/main/java 디렉토리를 열어 보겠습니다.

A, B, C, D로 시작하는 4개의 파일을 있고 각 파일별로 다른 방식으로 KPL을 사용하고 있습니다. A_SimpleProducer.java 파일을 열어보겠습니다.

해당 파일에서 KPL을 이용해서 Kinesis Data Stream에 데이터를 전송하는 코드를 AWS SDK 버전과 비교해 보면 Kinesis Producer Library가 Kinesis Data Stream에 대한 쓰기의 재시도, 일괄 처리 및 샤드 최적화를 단순화 해주는 것을 확인할수 있습니다.

해당 프로그램을 간략히 설명하자면 data/taxi-trips.csv 파일에서 택시 운행 레코드들를 읽어와서 Kinesis Producer Library를 활용하여 해당 레코드들을 통합해서 Kinesis Data Stream에 전송합니다.

Kinesis Producer 구성을 살펴보면

KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                                            .setRecordMaxBufferedTime(3000)
                                            .setMaxConnections(1)
                                            .setRequestTimeout(60000)
                                            .setRegion(region);

몇 가지 구성 속성을 설정합니다. 자세히 알아보려면 각 항목을 확장하세요.

RecordMaxBufferedTime

RecordMaxBufferedTime은 레코드가 하나 이상의 Kinesis Data Streams로 전송되기 전에 버퍼에 보관될수 있는 최대 시간(밀리초)을 의미합니다. 다른 설정값들에 따라서 레코드가 RecordMaxBufferedTime에 명시된 값보다 더 빨리 Data Stream으로 전송될수 있습니다. RecordMaxBufferedTime는 레코드 전송이 실패해서 재시도 걸리는 시간도 포함될수 있습니다.

RecordMaxBufferedTime을 너무 낮게 설정하면 처리량에 부정적인 영향을 미칠 수 있습니다.

RecordMaxBufferedTime의 기본값은 100ms입니다.

MaxConnections

백엔드 서버와 맺을수 있는 최대 커넥션 수입니다. Kinesis Data Streams에 대한 HTTP 요청은 다수의 커넥션을 통해 병렬로 전송됩니다.

이 값을 너무 높게 설정하면 네트워크 지연이 발생하거나 처리량을 늘리지 않으며 데이터를 전송하는 애플리케이션에서 불필요한 리소스를 낭비할 수 있습니다.

MaxConnections의 기본값은 24입니다.

RequestTimeout

HTTP 요청이 전송된 시점 기준으로 클라이언트에서 응답을 받기전까지 얼마나 기다릴수 있는지를 해당 파라미터를 통해서 설정합니다. 특정 요청에 대한 응답 대기 시간이 이 값을 초과하면 해당 요청에 대한 응답을 더이상 기다리지 않습니다.

특정 요청에 응답이 제한 시간안에 오지 않더라고 해당 요청을 통해서 전송된 레코드는 Kinesis Data Streams에 성공적으로 전송되었을 수도 있으며 재시도하면 중복될 수 있습니다. 따라서 이 값을 너무 낮게 설정하면 데이터 스트림에 중복 레코드(Duplicates)가 생성될 가능성이 높아집니다.

RequestTimeout의 기본값은 6000ms입니다.

Kinesis Producer에 구성 설정을 하고 해당 설정을 가진 Kinesis Producer 객체를 생성합니다.

final KinesisProducer kinesis = new KinesisProducer(config);

addUserRecord 함수를 호출해서 데이터를 전송합니다.

kinesis.addUserRecord(streamName, trip.getId(), data);

KPL 사용자 레코드는 Kinesis Data Stream 레코드 구분됩니다.

KPL 사용자 레코드는 사용자에게 특정 의미를 갖는 데이터 BLOB입니다. 예를 들어, 웹 사이트에서 UI 이벤트를 나타내는 JSON BLOB이나 웹 서버의 로그 항목이 있습니다.

Kinesis Data Stream 레코드는 Kinesis Data Stream API에 의해 정의된 데이터 구조인 Record 객체의 인스턴스이며 파티션 키, 시퀀스 번호 및 데이터 BLOB이 여기에 포함됩니다.

프로그램 실행

Cloud9 터미널에서 애플리케이션 소스코드가 있는 디렉토리로 이동합니다.

cd ~/environment/kinesis-producer-library-examples

빌드 실행하면 애플리케이션 소스코드가 jar 파일로 컴파일하고 패키징되어 target 디렉토리 안에 저장됩니다.

mvn clean compile package

애플리케이션을 구동합니다.

java -cp target/amazon-kinesis-replay-1.0-SNAPSHOT.jar A_SimpleProducer

생성이 시작되었다는 출력이 표시되면 전송 관련 지표가 CloudWatch가 새로 고쳐질 때까지 약 5분 정도 기다립니다.

lab-user:~/environment/kinesis-producer-library-examples (main) $ java -cp target/amazon-kinesis-replay-1.0-SNAPSHOT.jar A_SimpleProducer
log4j:WARN No appenders could be found for logger (com.amazonaws.services.kinesis.producer.KinesisProducer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Starting to produce data to input-stream in the ap-northeast-2 region.

Kinesis Data Streams 관리 콘솔로 이동해서 input-stream을 선택하고 모니터링을 클릭하여 Put recordIncoming data 지표를 확인하세요.

모니터링 페이지에 표시되는 지표를 통해서 레코드가 들어오는 것이 확인 된다면 축하합니다! Kinesis Producer 라이브러리를 사용하여 Kinesis Data Stream에 성공적으로 데이터를 전송했습니다.

Last updated