Working with the SDK
Last updated
Last updated
해당 실습에서는 AWS SDK를 사용하여 Kinesis Data Streams에 데이터를 전송합니다.
해당 섹션에서는 PutRecord API 사용해서 Kinesis Data Streams에 데이터를 전송하는 실습을 진행합니다.
1. Cloud9 인스턴스에서 Python 스크립트 생성
2. 이전 단계(Create a Kinesis Data Stream)에서 Data stream 이름을 input-stream이 아닌 다른 이름을 지정한 경우 아래 코드를 입력하고 kdsname 의 값을 바꿉니다. 이 코드는 일부 위치 정보가 포함된 임의의 택시 운행 데이터를 생성하고 Kinesis Data Stream 으로 데이터를 지속적으로 전송합니다.
3. Cloud9 인스턴스의 루트경로에 위에서 생성한 파일을 lab1.py
으로 저장합니다.
위의 코드에서는 Python용 AWS SDK인 boto3를 사용해서 Amazon Kinesis Data Streams에 데이터를 전송합니다.
해당 웹페이지를 통해서 boto3에서 지원되는 AWS 서비스, 각 서비스별 API 및 문법에 대한 정보를 확인할수 있습니다. PutRecord API는 단일 데이터 레코드를 Amazon Kinesis Data Stream에 생성합니다. PutRecord를 호출하여 실시간 수집 및 데이터 프로세싱에 필요한 데이터를 Kinesis Data Stream으로 한 번에 한 레코드씩 보낼수 있습니다. PutRecord API를 사용하는 경우에는 데이터가 저장될 Data Stream 이름, 파티션키, Payload(전송할 데이터)를 명시해야 합니다. 파티션 키는 Kinesis Data Streams에서 샤드 간에 데이터를 분산하는데 사용됩니다. Kinesis Data Streams는 각 데이터 레코드에 명시된 파티션 키를 통해서 Data Stream이 다수의 샤드로 구성된 경우 해당 레코드가 어떤 샤드에 속할지를 결정합니다. 순서가 중요한 경우 파티션키를 무작위값이 아닌 특정 값으로 명시해야 합니다.
Payload는 JSON 형식이어야하고 데이터 BLOB은 Base64로 인코딩되어야 합니다. 데이터 BLOB(Base64로 인코딩 되기 전 Payload)과 파티션키 크기의 합이 1MiB를 초과하지 않아야 합니다.
Data Stream에 레코드를 작성한 후에는 Data Stream 내에서 해당 레코드 또는 샤드 안에서 해당 레코드의 순서를 수정할 수 없습니다.
SDK를 사용하여 Kinesis Data Stream으로 데이터를 수집하려면 PutRecord 또는 PutRecords API를 사용할 수 있습니다. PutRecord는 한번의 API 호출 내에서 하나의 데이터 레코드를 보낼수 있고, PutRecords는 한번의 API 호출 내에서 다수의 데이터 레코드를 보낼수 있습니다.. 각 PutRecords 요청은 최대 500개의 레코드를 포함할 수 있고, 각 레코드는 파티션 키를 포함하여 1MiB까지 허용이 가능하지만 전체 요청의 크기는 5MiB 보다 작아야합니다. PutRecords 작업은 HTTP 요청당 여러 레코드를 Data Stream에 보내고 단일 PutRecord 작업은 Data Stream에 레코드를 한 번에 하나씩 보냅니다(각 레코드에 대해 별도의 HTTP 요청이 필요함). PutRecords를 사용하면 데이터 생산자당 더 많은 처리량을 가져갈 수 있으므로 대부분의 애플리케이션에 사용하는 것이 좋습니다. PutRecord 및 PutRecords 작업에 대한 자세한 내용은 PutRecord 및 PutRecords를 참조하십시오.
PutRecord 또는 PutRecords API는 동기식으로 처리되므로 클라이언트(Producer)에서 발생한 데이터를 지연없이 바로 Data Stream에 전송해야 하는 경우에는 AWS SDK를 통해서 PutRecord 또는 PutRecords API 사용하십시오. KPL(Kinesis Producer Library)를 사용할 경우에는 RecordMaxBufferedTime 파라미터에 명시한 값만큼의 지연이 발생할 수 있습니다. RecordMaxBufferedTime 값이 클수록 패킹 효율성이 높아지고 성능이 향상됩니다. 이러한 추가 지연을 허용할 수 없는 애플리케이션은 AWS SDK를 직접 사용해야 할 수 있습니다. 사용자 지정 Kinesis Producer를 구성해야 할 경우에는 해당 문서(Developing Producers Using the Amazon Kinesis Data Streams API with the AWS SDK for Java)를 참고하세요.
1. 코드 실행을 위해서 Cloud9 환경에 boto3 라이브러리를 설치해야 합니다. 터미널에 다음을 입력하여 boto3를 설치합니다.
해당 라이브러리는 Python 스크립트 내에서 사용되며 코드 실행에 필요합니다.
2. 터미널에 다음 명령어를 입력하여 프로그램을 실행합니다. 해당 스크립트는 Kinesis Data Stream에 레코드 전송을 시작하고 요청 ID(Request ID)와 HTTP 응답 코드(정상일 경우 200)를 출력합니다.
만약 아래와 같은 메시지가 출력될 경우에는 Python 스크립트 안에 명시된 Kinesis Data Stream 이름과 AWS 리전 정보가 제대로 지정되었는지 확인하세요
An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Stream input-stream under account 00000000000 not found
3. Kinesis Data Streams 관리 콘솔로 이동해서 input-stream을 선택하고 모니터링을 클릭하여 Put record 지표를 확인하세요.
Kinesis Data Streams에 데이터가 들어온 후 CloudWatch 지표가 표시되는데 최대 5~7분이 소요될 수 있습니다.
해당 실습에서는 PutRecord API를 사용하여 Kinesis Data Streams에 데이터를 보냈습니다. PutRecords API를 사용하여 단일 요청으로 Kinesis Data Streams에 디수의 레코드를 보낼 수 있습니다. Producer들은 PutRecords를 사용하여 Kinesis Data Stream으로 데이터를 보낼 때 더 높은 처리량을 달성할 수 있습니다. 해당 문서를 통해서 더 자세한 내용을 확인하세요
CloudWatch 지표를 통해서 Kinesis Data Stream에 데이터가 정상적으로 전달된 것을 확인했으면 Ctrl+C를 입력해서 실행중인 Python 스크립트를 종료하세요.