Apache Kafka는 LinkedIn에서 2011년 오픈 소스로 공개된 메시지 큐 시스템이다.
MQTT처럼 Broker(중개자) 서버를 두고, Producer/Consumer(생산자/소비자) 방식으로 동작한다. 이는 MQTT의 Broker 서버와 Publish/Subscribe 방식과 유사함을 알 수 있다.


 

 

Apache Kafka는 다음과 같이 Zookeeper 연동을 지원하며, Broker 서버를 클러스터로 구성 가능하다. 이를 통하여 메시징 큐의 안정화 및 분산 처리 작업이 가능하며 그 성능 또한 우수하다고 한다.

출처 : http://notes.stephenholiday.com/Kafka.pdf

 

 

 

1. Apache Kafka 설치
​  - Kafka 다운로드 (https://kafka.apache.org/downloads)


 

 

  - Kafka 압축 풀기

$ tar xvf kafka_2.11-0.10.2.1.tgz

 

Apache Kafka 테스트를 위한 네트워크 환경 구성은 아래와 그림과 같은 구조이며, 이를 바탕으로 설정 방법을 설명한다.

 

 

 

2. Zookeeper 설정
  - config/zookeeper.properties 수정

다음과 같이 각 Broker 서버에 설치된 Kafka의 config/zookeeper.properties에 다음과 같이 각 Broker 서버에 설정 정보를 입력 후 저장한다.

 

 

 

또한, 각 서버의 /tmp/zookeeper에 myid 파일을 생성하여, 각각 서버의 고유 ID값을 부여한다. 만약 /tmp/zookeeper 디렉토리가 없다면 생성한다.

 

  - Broker Server 1 (IP : 192.168.0.101)

$ mkdir /tmp/zookeeper
$ echo 1 > /tmp/zookeeper/myid


  - Broker Server 2 (IP : 192.168.0.102)

$ mkdir /tmp/zookeeper
$ echo 2 > /tmp/zookeeper/myid


  - Broker Server 3 (IP : 192.168.0.103)

$ mkdir /tmp/zookeeper
$ echo 3 > /tmp/zookeeper/myid

 

3. Kafka 설정
  - config/server.properties 수정

각 서버의 config/server.properties에 해당 설정 활성화 및 Broker Server 정보를 입력한다.

 

  - Broker Server 1 (IP : 192.168.0.101)

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.0.101:9092
advertised.host.name=192.168.0.101
advertised.host.name=9092
zookeeper.connect=192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181

 

  - Broker Server 2 (IP : 192.168.0.102)

broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.0.102:9092
advertised.host.name=192.168.0.102
advertised.host.name=9092
zookeeper.connect=192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181

 

  - Broker Server 3 (IP : 192.168.0.103)

broker.id=3
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.0.103:9092
advertised.host.name=192.168.0.103
advertised.host.name=9092
zookeeper.connect=192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181

 

 

4. 서버 구동

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties


서버 구동이 완료되면 다음과 이 Kafka 서버가 정상적으로 시작했음을 알 수 있다.


 

 

 

 

5. Topic 생성 및 삭제

  - Topic 생성

$ bin/kafka-topics.sh --create --zookeeper 192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181 --replication-factor 3 --partitions 1 --topic A

 

 

 

replication-factor : 인스턴스 개수
partitions : 병렬 처리를 위한 개수

 

  - Topic 리스트 확인

$ bin/kafka-topics.sh --list --zookeeper 192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181


 

 

  - Topic 삭제

$ bin/kafka-topics.sh --delete --zookeeper 192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181 --topic A

 

 

 

  -  Topic의 partition과 replication-factor에 대한 상세 정보 확인

$ bin/kafka-topics.sh --describe --zookeeper 192.168.0.101:2181, 192.168.0.102:2181, 192.168.0.103:2181

 

 

 

6. Producer/Consumer 테스트
  - Producer 테스트

$ bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic A


 

 

  - Consumer 테스트

$ bin/kafka-console-consumer.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic A --from-beginning


 

 

테스트 환경 : Ubuntu 16.04

MQTT (Message Queuing Telemetry Transport)는 1999년에 발표한 오픈 프로토콜로 낮은 대역폭, 높은 지연이나 신뢰 할 수 없는 네트워크를 위하여 설계된 경량적인(라즈베리파이에서도 사용가능) 메시지 프로토콜이다.

Google에서는 푸시 알림 서비스로 GCM(Google Cloud Messaging)을 제공하며, Apple에서는 APNs(Apple Push Notification service)을 서비스로 제공한다.

이와 마찬가지로 푸시 알림 서비스를 활용하는데 있어서, 자체적인 서비스를 만들고 싶다면 MQTT로 활용이 가능하다.


대표적으로, Facebook Messenger에서는 푸시 알림 서비스 방식으로 MQTT를 활용하고 있다고 한다.
출처 : https://www.facebook.com/notes/facebook-engineering/building-facebook-messenger/10150259350998920

 

MQTT는 중개자(Broker) 서버를 두고, 발행/구독(Publish/Subscribe) 방식으로 동작하는 환경에서 이용하는 프로토콜이다.

 

 

발행자(Publisher)는 Topic을 발행하고, 중개자(Broker)를 통하여, 구독자(Subscriber)가 요구하는 Topic 정보를 얻는 구조로 되어 있다. Publisher(나)가 해당 Topic과 보낼 메세지를 Broker 서버에 전송하면, 해당 Topic을 구독하는 Subscriber(친구들)에게 Broker 서버에 설정된 일정 Keep Alive Timer를 통해 푸시 알림을 보내게 된다.

 

또한, MQTT 는 메시징에 대한 신뢰성 보장을 위하여 3단계의 QoS(Quality of Service) 지원한다. 물론 QoS 레벨이 높을수록 패킷 손실율은 줄어들지만, 종단 간 통신 지연시간은 늘어나게 된다.

 

 

 

QoS 0 은 메시지를 한번만 전달하고 전달 여부는 확인하지 않기 때문에 큰 페이로드를 가지는 메시지 일 경우, 메시지 손실이 발생하면 메시지가 전달 되지 않고 유실될 가능성이 높다.
QoS 1은 메시지를 전달하고, 1번의 Ack로 전달 여부를 확인한다. 하지만 PUBACK 패킷이 유실 된다면 메시지가 불필요하게 중복 전달 될 가능성이 있다.
QoS 2 는 4-Way Handshake을 통해 정확하게 한번만 전달한다.


또한, Topic은 계층적으로 구성이 가능하다.

 

 

 

다음과 같이 계층적으로 Topic을 생성하여 전달할 수 있다.

 

 

 

1. Broker Server 설치 (Mosquitto Broker Server)

  - 저장소 업데이트

$ sudo apt-get install python-software-properties
$ sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
$ sudo apt-get update


  - 저장소 업데이트 확인

 $ sudo apt-cache search mosquito


 

 

  - Mosquitto 서버 설치

$ sudo apt-get install mosquitto

 

 

 

2. Test용 MQTT Client 설치 및 테스트

  - MQTT Client 설치

$ sudo apt-get install mosquitto-clients

 

 

 

 - MQTT Subscribe Client를 이용한 메시지 구독

$ mosquitto_sub - h Server Address -t /Topic

 

 

 - MQTT Publish Client를 이용한 발행

$ mosquitto_pub - h Server Address  -t /Topic​ -m "Message"

 

 

 

 - MQTT Subscribe Client에서 해당 Topic에 대한 푸시 알림 메시지 구독 완료

 

 

 

MQTT Client는 다양한 언어의 형태와 라이브러리로 제공되고 있으며, 해당 정보에 대한 링크는 아래와 같다.
https://github.com/mqtt/mqtt.github.io/wiki/libraries



테스트 환경 : Ubuntu 16.04


+ Recent posts