AWS Kafka는 분산 데이터 스트리밍 플랫폼으로, 대용량 실시간 데이터 처리에 적합합니다. 기본 주체로는 프로듀서, 컨슈머, 브로커로 구성되며, 프로듀서는 데이터를 생성하고 브로커로 전송합니다. 컨슈머는 브로커에서 데이터를 읽어옵니다. 이러한 구성으로 데이터를 안정적으로 처리하고, 병렬적인 처리도 가능합니다. AWS Kafka는 확장성이 우수하여 요구 사항에 따라 자동으로 정확한 용량을 확보합니다. 또한, 관리가 용이하며, 높은 내구성과 성능을 제공합니다. AWS Kafka를 사용하면 기업은 대량의 데이터를 신속하게 처리하고 비즈니스 인텔리전스를 개발할 수 있습니다.
1. AWS Kafka 설치
AWS Kafka를 설치하기 위해서는 몇 가지 단계를 따라야 합니다.
1. AWS Management Console에 로그인합니다.
2. "Kafka"를 검색하여 "Managed Streaming for Apache Kafka (MSK)"를 선택합니다.
3. "Create cluster" 버튼을 클릭하여 새로운 Kafka 클러스터를 생성합니다.
4. 클러스터 설정을 구성합니다.
이 단계에서는 클러스터에 할당할 리소스, 보안 설정 등을 선택할 수 있습니다.
예시 소스코드
from kafka import KafkaProducer
from kafka import KafkaConsumer
# Kafka Producer 생성 예시 코드
def send_message(topic, message):
producer = KafkaProducer(bootstrap_servers='your-bootstrap-servers')
producer.send(topic, message)
producer.close()
# Kafka Consumer 생성 예시 코드
def consume_messages(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='your-bootstrap-servers',
group_id='your-group-id'
)
for message in consumer:
print(message.value)
consumer.close()
위 코드에서 "your-bootstrap-servers"는 Kafka 클러스터의 부트스트랩 서버 주소를 입력해야 합니다. 이 주소는 AWS MSK 클러스터 설정 페이지에서 확인할 수 있습니다. 또한, "your-group-id"는 Consumer Group ID를 나타냅니다. 위 예시 코드를 사용하면 Kafka Producer로 메시지를 보낼 수 있고, Kafka Consumer로 메시지를 소비할 수 있습니다. 이렇게 생성된 Kafka 클러스터를 사용하여 대용량 데이터를 신속하게 처리할 수 있습니다.
2. AWS Kafka 토픽 생성
아래는 AWS에서 Kafka 토픽을 생성하는 실제 소스 코드 예시입니다. 이 코드 예시는 초보자도 이해하기 쉽게 설명되었습니다.
import boto3
def create_kafka_topic(topic_name, partition_count, replication_factor):
# AWS Kafka 클라이언트 생성
kafka_client = boto3.client('kafka')
# 토픽 생성 요청
response = kafka_client.create_topic(
Name=topic_name,
PartitionCount=partition_count,
ReplicationFactor=replication_factor
)
# 토픽 생성 여부 확인
if 'TopicArn' in response:
print("Kafka 토픽 생성 성공")
else:
print("Kafka 토픽 생성 실패")
# 토픽 이름, 파티션 수, 복제 팩터 지정하여 함수 호출
create_kafka_topic("my-topic", 3, 2)
위의 코드는 AWS SDK인 boto3를 사용하여 AWS Kafka 클라이언트를 생성하고, `create_topic` 함수를 호출하여 토픽을 생성하는 예시입니다.
`create_kafka_topic` 함수는 아래와 같은 파라미터를 받습니다
- `topic_name`: 생성할 토픽의 이름입니다.
- `partition_count`: 토픽에 포함할 파티션의 수입니다.
- `replication_factor`: 파티션의 데이터를 복제할 복제 팩터입니다.
함수는 `boto3`를 사용하여 `create_topic` 메소드를 호출하여 토픽을 생성합니다. 그리고 `response` 변수를 통해 토픽 생성 여부를 확인합니다. 위 예시를 실행하면 'my-topic'라는 이름의 토픽이 3개의 파티션과 2개의 복제 팩터로 생성됩니다.
3. AWS Kafka 메시지 생성 및 전송
아래는 초보자가 이해할 수 있도록 간단한 Python 코드 예시를 제공합니다.
이 예시는 AWS SDK for Python인 boto3를 사용하여 AWS Kafka에 메시지를 생성하고 전송하는 방법을 보여줍니다.
우선, 필요한 패키지를 가져옵니다.
import boto3
import json
client = boto3.client('kafka', region_name='ap-southeast-2')
topic = 'your-topic-name'
message = {'key': 'value'}
response = client.put_record(
ClusterArn='your-cluster-arn',
TopicArn='your-topic-arn',
Records=[
{
'Value': json.dumps(message),
'PartitionKey': '1'
}
]
)
print(f"Success: {response['SequenceNumber']}")
위의 예시에서 'your-topic-name', 'your-cluster-arn', 'your-topic-arn'은 실제 환경에 맞게 변경해야 합니다. 'your-topic-name'은 메시지를 전송할 토픽의 이름, 'your-cluster-arn'은 사용할 Kafka 클러스터의 ARN (Amazon Resource Name), 'your-topic-arn'은 토픽의 ARN입니다.
메시지는 JSON 형식으로 생성하고, 'PartitionKey'는 메시지의 파티션을 지정하는 것입니다.
결과를 확인합니다.
위의 예시에서는 메시지 전송에 성공한 후에 메시지의 시퀀스 번호를 출력합니다.
메시지 전송에 실패한 경우 예외가 발생하게 됩니다.
와 같은 방법으로 AWS Kafka에 메시지를 생성하고 전송할 수 있습니다. 메시지 생성 및 전송에 필요한 인증 및 권한 설정이 완료되어야하며, AWS SDK for Python 및 필요한 패키지가 설치되어 있어야 합니다.
4. AWS Kafka 메시지 수신 및 처리
이제 AWS Kafka에서 메시지를 수신하고 처리하는 예제 코드를 살펴보겠습니다.
1. AWS SDK 설치 AWS SDK를 사용하여 Kafka 메시지를 수신하려면 먼저 AWS SDK를 설치해야 합니다.
AWS SDK를 설치하려면 다음 명령어를 실행합니다
pip install boto3
2. Kafka 클라이언트 초기화 AWS Kafka에 연결하기 위해 Kafka 클라이언트를 초기화해야 합니다.
다음은 클라이언트 초기화하는 예제 코드입니다
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'topic_name',
bootstrap_servers='kafka_broker_endpoint',
security_protocol='SSL',
ssl_cafile='path_to_ca_cert',
ssl_certfile='path_to_client_cert',
ssl_keyfile='path_to_client_key'
)
여기서 'topic_name'은 메시지를 소비할 Kafka 토픽의 이름을 의미합니다.
'kafka_broker_endpoint'는 AWS Kafka 브로커의 엔드포인트 주소입니다.
'ssl_cafile', 'ssl_certfile' 및 'ssl_keyfile'은 AWS Kafka에 대한 SSL 인증서 및 키 파일의 경로입니다.
3. 메시지 수신 및 처리 KafkaConsumer 객체에 대한 `poll()` 메서드를 사용하여 메시지를 수신하고 처리할 수 있습니다.
다음은 메시지 수신 및 처리를 담당하는 예제 코드입니다
for message in consumer.poll().items():
value = message.value.decode('utf-8')
print('Received message:', value)
# 여기에 메시지 처리 로직을 추가하세요
poll()` 메서드는 Kafka 토픽에서 메시지를 가져와서 메시지들의 이터레이터를 반환합니다.
각 메시지는 바이트로 인코딩되어 있으므로, `decode('utf-8')` 메서드를 사용하여 문자열로 디코딩해야 합니다.
이후 메시지 처리 로직을 추가하면 됩니다.
위의 예제 코드에서는 메시지를 수신하면서 간단히 메시지를 출력하는 로직을 추가했습니다. 실제로는 메시지를 원하는 방식대로 처리하면 됩니다.
실제 프로젝트에서는 이 예제 코드를 확장하여 원하는 기능을 구현하면 됩니다.
'DevOps > AWS' 카테고리의 다른 글
[해결]Error Code: 1227. Access denied; you need (at least one of) the SUPER or SYSTEM_VARIABLES_ADMIN privilege(s) for this operation (0) | 2023.11.05 |
---|---|
AWS에서 도메인 구입 후 EC2에 HTTPS 적용 방법 (0) | 2021.11.01 |