A Simple Producer
소스코드 다운로드
터미널에 다음 명령어를 입력하여 깃허브에서 소스코드를 다운받습니다.
cd ~/environment && git clone https://github.com/youngwjung/kinesis-producer-library-examples.git소스코드 리뷰
src/main/java 디렉토리를 열어 보겠습니다.

A, B, C, D로 시작하는 4개의 파일을 있고 각 파일별로 다른 방식으로 KPL을 사용하고 있습니다. A_SimpleProducer.java 파일을 열어보겠습니다.
해당 파일에서 KPL을 이용해서 Kinesis Data Stream에 데이터를 전송하는 코드를 AWS SDK 버전과 비교해 보면 Kinesis Producer Library가 Kinesis Data Stream에 대한 쓰기의 재시도, 일괄 처리 및 샤드 최적화를 단순화 해주는 것을 확인할수 있습니다.
해당 프로그램을 간략히 설명하자면 data/taxi-trips.csv 파일에서 택시 운행 레코드들를 읽어와서 Kinesis Producer Library를 활용하여 해당 레코드들을 통합해서 Kinesis Data Stream에 전송합니다.
Kinesis Producer 구성을 살펴보면
몇 가지 구성 속성을 설정합니다. 자세히 알아보려면 각 항목을 확장하세요.
Kinesis Producer에 구성 설정을 하고 해당 설정을 가진 Kinesis Producer 객체를 생성합니다.
addUserRecord 함수를 호출해서 데이터를 전송합니다.
KPL 사용자 레코드는 Kinesis Data Stream 레코드 구분됩니다.
KPL 사용자 레코드는 사용자에게 특정 의미를 갖는 데이터 BLOB입니다. 예를 들어, 웹 사이트에서 UI 이벤트를 나타내는 JSON BLOB이나 웹 서버의 로그 항목이 있습니다.
Kinesis Data Stream 레코드는 Kinesis Data Stream API에 의해 정의된 데이터 구조인 Record 객체의 인스턴스이며 파티션 키, 시퀀스 번호 및 데이터 BLOB이 여기에 포함됩니다.
프로그램 실행
Cloud9 터미널에서 애플리케이션 소스코드가 있는 디렉토리로 이동합니다.
빌드 실행하면 애플리케이션 소스코드가 jar 파일로 컴파일하고 패키징되어 target 디렉토리 안에 저장됩니다.
애플리케이션을 구동합니다.
생성이 시작되었다는 출력이 표시되면 전송 관련 지표가 CloudWatch가 새로 고쳐질 때까지 약 5분 정도 기다립니다.
Kinesis Data Streams 관리 콘솔로 이동해서 input-stream을 선택하고 모니터링을 클릭하여 Put record 및 Incoming data 지표를 확인하세요.

모니터링 페이지에 표시되는 지표를 통해서 레코드가 들어오는 것이 확인 된다면 축하합니다! Kinesis Producer 라이브러리를 사용하여 Kinesis Data Stream에 성공적으로 데이터를 전송했습니다.


Last updated