실시간 데이터 처리를 위한 Apache Kafka의 활용 방법

Apache Kafka 소개

Apache Kafka는 대용량의 실시간 데이터 스트리밍 플랫폼으로, 분산 시스템에서 신뢰성과 확장성을 제공합니다. 데이터를 신속하게 수집, 저장 및 처리할 수 있어서 실시간 데이터 스트리밍 애플리케이션에 이상적입니다.

Kafka 아키텍처

Kafka는 다음과 같은 구성 요소로 이루어져 있습니다.

– **Producer**: 데이터를 생성하고 Kafka에 보내는 역할을 합니다. 예를 들어, 로그 데이터 생성기나 웹 애플리케이션에서 발생한 이벤트를 Kafka로 전송할 수 있습니다.


import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("topic_name", "key", "value"),
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                e.printStackTrace();
                            }
                        }
                    });

        producer.close();
    }
}

– **Consumer**: Kafka로부터 데이터를 읽고 처리하는 역할을 합니다. Consumer는 Topic에 대한 구독(subscription)을 설정하고 데이터를 받아옵니다.


import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic_name"));

        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                System.out.println("Key: " + record.key() + ", Value: " + record.value());
            }
        }
    }
}

Kafka 설치 및 설정

Kafka 설치

1. Apache Kafka의 공식 웹사이트에서 Kafka를 다운로드합니다. (https://kafka.apache.org/downloads)
2. 압축을 푼 후, 다음 명령어를 통해 Kafka 디렉토리로 이동합니다.


cd kafka_2.13-{version}

3. 필요한 설정을 수정하기 위해 `config` 디렉토리로 이동합니다.


cd config

Zookeeper 설정

1. `zookeeper.properties` 파일을 열고 다음 설정을 수정합니다.


dataDir=/tmp/zookeeper
server.1=localhost:2888:3888

2. Zookeeper 데이터 디렉토리를 생성합니다.


mkdir /tmp/zookeeper

Kafka Broker 설정

1. `server.properties` 파일을 열고 다음 설정을 수정합니다.


broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs

2. Kafka 로그 디렉토리를 생성합니다.


mkdir /tmp/kafka-logs

3. Kafka와 Zookeeper를 실행합니다.


zookeeper-server-start.sh config/zookeeper.properties
kafka-server-start.sh config/server.properties

Producer 개발하기

KafkaProducer 클래스 생성

1. KafkaProducer 클래스를 생성하고 필요한 의존성을 추가합니다.


import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // KafkaProducer 설정
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer");

        // KafkaProducer 생성
        Producer producer = new KafkaProducer<>(props);

        // 메시지 전송
        producer.send(new ProducerRecord<>("topic_name", "key", "value"),
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                e.printStackTrace();
                            }
                        }
                    });

        // KafkaProducer 종료
        producer.close();
    }
}

2. `bootstrap.servers` 프로퍼티에는 Kafka bootstrap 서버의 호스트 및 포트를 설정합니다.
3. `key.serializer`와 `value.serializer` 프로퍼티에서는 데이터 직렬화 방식을 설정합니다.
4. `ProducerRecord` 객체를 통해 전송할 토픽, 키, 값 데이터를 지정합니다.
5. 데이터 전송이 완료될 때마다 호출되는 `Callback` 함수를 정의합니다.


Consumer 개발하기

KafkaConsumer 클래스 생성

1. KafkaConsumer 클래스를 생성하고 필요한 의존성을 추가합니다.


import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // KafkaConsumer 설정
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("group.id", "group_name");
        props.put("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");

        // KafkaConsumer 생성
        KafkaConsumer consumer = new KafkaConsumer<>(props);

        // 구독할 토픽 설정
        consumer.subscribe(Arrays.asList("topic_name"));

        // 메시지 수신
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            
            for (ConsumerRecord record : records) {
                System.out.println("Received message: " +
                                   "Topic = " + record.topic() + ", " +
                                   "Partition = " + record.partition() + ", " +
                                   "Offset = " + record.offset() + ", " +
                                   "Key = " + record.key() + ", " +
                                   "Value = " + record.value());
            }
        }
    }
}

2. `bootstrap.servers` 프로퍼티에는 Kafka bootstrap 서버의 호스트 및 포트를 설정합니다.
3. `group.id` 프로퍼티를 통해 컨슈머 그룹을 지정합니다.
4. `key.deserializer`와 `value.deserializer` 프로퍼티에서는 데이터 역직렬화 방식을 설정합니다.
5. `consumer.subscribe()` 메서드를 사용하여 구독할 토픽을 설정합니다.
6. `consumer.poll()` 메서드를 통해 메시지를 수신하고 반복문을 통해 처리합니다.


Topic과 Partition 이해하기

Topic

– Kafka에서 메시지를 주고받는 주제 (Topic)를 의미합니다.
– 하나의 Kafka 클러스터에 여러 개의 토픽이 존재할 수 있습니다.
– 토픽은 관련된 메시지의 그룹이며, 메시지를 보내고 받을 때에는 해당 토픽을 지정하여 처리합니다.

Partition

– Kafka Topic은 여러 개의 파티션으로 나눌 수 있습니다.
– 각 파티션은 메시지를 저장하는 단위로 독립적으로 관리되며, 병렬로 메시지를 처리할 수 있도록 합니다.
– 파티션은 순서가 있는 메시지 스트림이며, 각각의 메시지에는 고유한 오프셋(Offset) 값이 있습니다.

파티션의 예시 코드

1. Producer로 메시지를 보내고 Consumer로 메시지를 받을 때, 파티션을 이용하여 메시지를 처리합니다.
2. 아래의 코드는 파티션을 설정하여 메시지를 송수신하는 예시입니다.


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

public class KafkaPartitionExample {
    public static void main(String[] args) {
        // KafkaProducer 설정
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","localhost:9092");
        producerProps.put("acks", "all");
        producerProps.put("key.serializer",
                          "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
                          "org.apache.kafka.common.serialization.StringSerializer");

        // KafkaConsumer 설정
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers","localhost:9092");
        consumerProps.put("group.id", "group_name");
        consumerProps.put("key.deserializer",
                          "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer",
                          "org.apache.kafka.common.serialization.StringDeserializer");

        // KafkaProducer 생성
        Producer producer = new KafkaProducer<>(producerProps);

        // KafkaConsumer 생성
        KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);

        // 토픽과 파티션 설정
        String topic = "topic_name";
        int partition = 0;

        // Producer로 메시지 전송
        producer.send(new ProducerRecord<>(topic, partition, "key", "value"));

        // Consumer로 메시지 수신
        consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
        consumer.seek(new TopicPartition(topic, partition), 0);

        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            System.out.println("Received message: " +
                               "Topic = " + record.topic() + ", " +
                               "Partition = " + record.partition() + ", " +
                               "Offset = " + record.offset() + ", " +
                               "Key = " + record.key() + ", " +
                               "Value = " + record.value());
        }

        producer.close();
        consumer.close();
    }
}

3. Producer는 `send()` 메서드를 통해 메시지를 지정한 토픽과 파티션으로 전송합니다.
4. Consumer는 `assign()` 메서드를 통해 특정 토픽과 파티션에 할당되고, `seek()` 메서드를 통해 해당 파티션의 오프셋을 설정합니다.
5. 그 후 `poll()` 메서드를 호출하여 메시지를 수신하고, 반복문을 통해 각 메시지의 정보를 출력합니다.


Offset 관리하기

Offset이란?

– Kafka에서는 각 메시지에 대해 유니크한 식별자인 Offset을 지정합니다.
– Offset은 파티션 내에서 메시지의 위치를 가리키는 값으로, 메시지의 순서와 위치를 추적하는 데 사용됩니다.
– Consumer는 오프셋을 사용하여 특정 위치부터 메시지를 읽거나 다시 읽을 수 있습니다.

Consumer Offset의 관리

– Consumer는 메시지를 소비한 후 읽은 Offset 값을 저장하여 어디까지 메시지를 처리했는지 추적합니다.
– Kafka는 Consumer Offset을 저장하기 위한 내부 저장소를 제공하지만, 외부 데이터베이스에 저장하거나 커스텀 로직을 사용할 수도 있습니다.

예시 코드

1. Consumer Offset을 커밋(commit)하고, 다음 번 소비 시 저장된 Offset에서부터 메시지를 가져오는 예시입니다.


import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Properties;
import java.util.Collections;

public class KafkaOffsetExample {
    public static void main(String[] args) {
        // KafkaConsumer 설정
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("group.id", "group_name");
        props.put("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");

        // KafkaConsumer 생성
        KafkaConsumer consumer = new KafkaConsumer<>(props);

        // 토픽과 파티션 설정
        String topic = "topic_name";
        int partition = 0;
        TopicPartition topicPartition = new TopicPartition(topic, partition);

        // Consumer로부터 저장된 Offset 검색
        consumer.assign(Collections.singletonList(topicPartition));
        long committedOffset = consumer.committed(topicPartition);

        // 저장된 Offset이 없으면 가장 오래된 Offset으로 설정
        if (committedOffset == null) {
            consumer.seekToBeginning(Collections.singletonList(topicPartition));
        } else {
            consumer.seek(topicPartition, committedOffset);
        }

        // 메시지 수신
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            
            for (ConsumerRecord record : records) {
                // 메시지 처리 로직
                
                // Offset 커밋
                consumer.commitSync();
            }
        }
    }
}

2. `consumer.committed()` 메서드를 사용하여 특정 토픽과 파티션에서 커밋된 Offset을 검색합니다.
3. 커밋된 Offset이 없으면 `seekToBeginning()` 메서드를 통해 가장 오래된 Offset으로 설정합니다.
4. 커밋된 Offset이 있으면 `seek()` 메서드를 통해 해당 Offset으로 설정합니다.
5. 메시지를 수신하고 처리한 후에는 `commitSync()` 또는 `commitAsync()` 메서드를 호출하여 Offset을 커밋합니다.


Kafka 클러스터 구성하기

개요

Kafka는 고가용성 및 확장성을 위해 여러 대의 브로커(Broker)로 구성된 클러스터를 가질 수 있습니다. 클러스터는 분산된 환경에서 데이터를 안전하게 저장 및 처리하며, 높은 처리량을 지원합니다.

Kafka 클러스터 구성

1. 여러 대의 Kafka 브로커를 실행하고, 각 브로커에 고유한 ID를 부여합니다.
2. 클러스터의 각 브로커는 ZooKeeper를 사용하여 상태 정보를 공유합니다.
3. Producer가 메시지를 보낼 때 Topic 및 Partition에 따라 브로커에게 분배되어 저장되며, Consumer는 해당 브로커에서 메시지를 가져옵니다.

예시 코드

1. 아래의 코드는 3개의 브로커로 구성된 Kafka 클러스터를 생성하는 예시입니다.


# 서버 1
bin/kafka-server-start.sh config/server.properties

# 서버 2
bin/kafka-server-start.sh config/server-1.properties

# 서버 3
bin/kafka-server-start.sh config/server-2.properties

2. `config/server.properties` 파일에는 브로커 ID, 포트 번호, 로그 디렉토리 등의 설정 정보가 포함됩니다.
3. 각 브로커는 공통된 ZooKeeper 인스턴스를 사용하여 클러스터 상태를 공유합니다.

클러스터 관리

– Kafka 클러스터는 브로커의 추가 또는 제거로 확장 및 축소할 수 있습니다.
– ZooKeeper를 통해 클러스터 상태 및 리더 브로커의 변경을 추적합니다.
– 클러스터의 안정성을 위해 복제 팩터(Replication Factor)를 설정하여 데이터의 복제본을 제공할 수 있습니다.

복제 팩터 예시 코드

1. 아래의 코드는 토픽을 생성할 때 복제 팩터를 설정하는 예시입니다.


# 토픽 생성
bin/kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 \
--partitions 3 --replication-factor 3

2. `–replication-factor` 옵션에 복제 팩터를 지정하여 각 파티션의 복제본 수를 설정할 수 있습니다.


데이터 전송 보장을 위한 Replication

개요

Kafka는 Replication을 통해 데이터의 안정성과 가용성을 보장합니다. 데이터의 손실을 방지하고 장애 복구 기능을 제공하기 위해 데이터를 여러 개의 브로커에 복제합니다.

Replica와 복제 팩터

– Kafka는 각 토픽의 파티션에 대해 하나 이상의 Replica를 생성합니다.
– Replica는 동일한 파티션의 복사본으로, Leader Replica와 Follower Replica로 구분됩니다.
– 복제 팩터(Replication Factor)는 토픽을 생성할 때 지정되며, 토픽의 각 파티션에 할당되는 Replica의 수를 결정합니다.

데이터 복제 과정

1. Producer가 메시지를 보내면 해당 메시지는 Leader Replica에 저장됩니다.
2. Leader Replica는 메시지를 Followers에게 복제하여 안정성을 보장합니다.
3. Follower Replica는 Leader로부터 메시지를 동기적 또는 비동기적으로 복제합니다.
4. 만약 Leader Replica가 고장났을 경우, Followers 중 하나가 새로운 Leader로 선출되어 데이터의 일관성을 유지합니다.

예시 코드

1. 아래의 코드는 토픽을 생성할 때 복제 팩터를 설정하여 복제본을 생성하는 예시입니다.


# 토픽 생성
bin/kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 \
--partitions 3 --replication-factor 2

2. `–replication-factor` 옵션에 복제 팩터를 지정하여 각 파티션의 복제본 수를 설정합니다.
3. 위의 예시에서는 각 파티션마다 2개의 복제본을 유지하므로, 총 6개의 Replica가 생성됩니다.

복제 동기화

– Leader Replica의 메시지가 Followers에 복제될 때까지 동기화가 이루어집니다.
– 동기식 복제(Replica Sync)를 사용하면 Leader가 모든 복제자의 확인 응답을 받을 때까지 기다립니다.
– 비동기식 복제(Replica Async)를 사용하면 Leader는 복제자로부터의 확인 응답을 기다리지 않고 바로 메시지를 송신합니다.


데이터 처리 성능 최적화 기법

개요

Kafka는 대량의 데이터를 신속하게 처리하기 위해 다양한 방법을 제공합니다. 데이터 처리 성능을 최적화하기 위해 아래의 기법을 적용할 수 있습니다.

1. 파티션 개수 조정

– 파티션은 데이터의 분산 처리를 위한 단위입니다.
– 파티션 개수를 늘리면 데이터가 병렬로 처리되므로 처리량이 증가할 수 있습니다.
– 파티션 개수는 토픽을 생성할 때 설정할 수 있습니다.

2. 배치 처리

– Producer와 Consumer는 메시지를 배치로 전송하거나 가져올 수 있습니다.
– 작은 메시지를 여러 번 전송하는 대신, 큰 메시지를 한 번에 전송하는 것이 효율적입니다.
– 큰 메시지를 배치로 묶어 전송하면 네트워크 비용과 I/O 비용을 줄일 수 있습니다.

예시 코드

1. 아래의 코드는 Kafka Producer에서 배치 처리를 사용하는 예시입니다.


# 프로듀서 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ack", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

Producer<String, String> producer = new KafkaProducer<>(props);

// 메시지를 배치로 전송
for (int i = 0; i < 100000; i++) {
    producer.send(new ProducerRecord<>("my_topic", Integer.toString(i), "message" + i));
}

2. `props.put(“batch.size”, 16384)`는 배치의 크기를 설정하는 코드입니다.
3. 이 예시에서는 16KB로 설정했으며, 배치 사이즈가 이 값을 넘어서면 배치가 전송됩니다.

3. 데이터 압축

– 데이터 압축을 사용하면 전송할 데이터의 크기를 줄일 수 있습니다.
– 데이터를 압축하여 전송하면 네트워크 대역폭을 절약하고 전송 시간을 단축할 수 있습니다.

예시 코드

1. 아래의 코드는 Kafka Producer에서 데이터 압축을 사용하는 예시입니다.


# 프로듀서 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("compression.type", "gzip");

Producer<String, String> producer = new KafkaProducer<>(props);

// 압축된 메시지를 전송
producer.send(new ProducerRecord<>("my_topic", "key", "compressed message"));

2. `props.put(“compression.type”, “gzip”)`는 Gzip 압축을 사용하는 설정입니다.
3. Producer가 메시지를 전송할 때 해당 메시지는 Gzip으로 압축된 상태로 전송됩니다.


Kafka 모니터링과 관리

개요

Kafka의 모니터링과 관리는 클러스터 상태를 실시간으로 확인하고 성능, 안정성, 가용성 등을 높이기 위해 중요합니다. 아래의 기법과 도구를 활용하여 Kafka 클러스터를 모니터링하고 관리할 수 있습니다.

1. JMX (Java Management Extensions)

– Kafka는 JMX를 지원하여 JVM과 연동하여 모니터링 정보를 제공합니다.
– JConsole, JVisualVM 등의 도구를 사용하여 Kafka 클러스터의 JMX 속성을 모니터링할 수 있습니다.
– JMX를 통해 토픽, 파티션, 브로커 등의 메트릭 정보를 확인할 수 있습니다.

2. Kafka Manager

– Kafka Manager는 Yahoo에서 개발한 Kafka 클러스터 관리 도구입니다.
– Kafka 클러스터의 상태 모니터링, 파티션 리밸런싱, 토픽 생성 및 삭제 등을 제공합니다.
– 웹 인터페이스를 통해 쉽게 Kafka 클러스터를 관리할 수 있습니다.

예시 코드

1. 아래의 코드는 Kafka Manager를 사용하여 Kafka 클러스터를 관리하는 예시입니다.


// Kafka Manager 생성
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 10000, 10000, JaasUtils.isZkSecurityEnabled());
KafkaManager kafkaManager = new KafkaManager(zkUtils);

// 클러스터 상태 확인
String clusterName = "my_cluster";
boolean isClusterActive = kafkaManager.getCluster(clusterName).isDefined();
System.out.println("Cluster is active: " + isClusterActive);

// 파티션 리밸런싱
String topicName = "my_topic";
int numPartitions = 3;
kafkaManager.addTopic(clusterName, topicName + "_new", numPartitions, 1);
kafkaManager.deleteTopic(clusterName, topicName);

// ZooKeeper 연결 종료
zkUtils.close();

2. 위의 예시는 Kafka Manager를 사용하여 클러스터 상태를 확인하고 토픽을 리밸런싱하는 내용을 포함하고 있습니다.
3. `ZkUtils.apply(“localhost:2181”, 10000, 10000, JaasUtils.isZkSecurityEnabled())` 코드는 ZooKeeper 연결을 설정하는 부분입니다.

3. Prometheus와 Grafana

– Prometheus와 Grafana는 오픈 소스인 메트릭스 수집 및 시각화 도구입니다.
– Kafka Exporter의 메트릭을 Prometheus로 수집하고, Grafana를 통해 시각화할 수 있습니다.
– Kafka 클러스터의 실시간 상태, 지연 시간, 처리량 등을 모니터링할 수 있습니다.

예시 코드

1. 아래의 코드는 Kafka 클러스터의 메트릭을 Prometheus로 수집하는 Kafka Exporter의 예시입니다.


# Kafka Exporter 실행
./kafka-exporter --kafka.server=kafka:9092 --telemetry.address=:9999

2. 위의 예시에서는 Kafka Exporter를 실행하고, `kafka.server`를 통해 Kafka 클러스터의 주소를 지정하고, 수집된 메트릭 정보를 `telemetry.address`의 주소로 노출합니다.


Leave a Comment