본문 바로가기

Flutter

[Flutter]MQTT 프로토콜을 사용한 Pub/Sub 예제

728x90

 

 

MQTT는 경량화된 프로토콜로 IoT 환경에서도 많이 사용되며, Pub/Sub 구조에 적합하다고 한다.

 

해당 예제에서는 발행자(Publisher)가 특정 주제를 발행(Publish)하면, 해당 주제에 관심 있는 구독자(Subscriber)가 이를 자동으로 수신하는 방식으로 데이터가 전달된다.

 

Pub/Sub 시스템의 기본 구조

 

 

1. 발행자(Publisher)

- 데이터를 생성하고 이를 특정 주제(topic)에 게시하는 역할

- 데이터를 직접 구독자에게 전달하지 않고, 메시지 전달을 담당하는 중앙 브로커에게 데이터를 보낸다.

 

2. 주제(Topic)

- 발행자와 구독자가 서로 소통하는 공통의 채널이라고 할 수 있다.

- 예를 들어, 뉴스 앱에서는 '스포츠', '정치'와 같은 주제가 될 수 있으며, 발행자는 특정 주제에 해당하는 뉴스 기사를 발행한다.

 

3. 구독자(Subscriber)

- 특정 주제를 구독하여 해당 주제에 게시된 데이터를 자동으로 수신한다.

- 구독자는 필요한 주제만 선택하여 데이터를 받기 때문에 원하는 데이터만 효율적으로 수신할 수 있다.

 

4. 브로커(Broker)

- 메시지 전달을 중재하는 중앙 시스템이다. 발행자로부터 받은 메시지를 관리하고, 이를 해당 주제를 구독한 구독자에게 전달한다.

- 브로커를 통해 효율적인 데이터 전달이 가능하며, 발행자와 구독자가 서로의 존재를 알 필요 없이 메시지를 주고받을 수 있다.

 

 

MQTT 프로토콜을 사용한 Pub/Sub의 예제 코드

1. mqtt_client 패키지 추가

 

dart pub add mqtt_client

 

dependencies:
  flutter:
    sdk: flutter
  mqtt_client: ^10.5.1

 

 

2. MQTT 브로커에 연결

 

  Future<void> connect() async {
    client.logging(on: true); // 로깅 활성화
    client.keepAlivePeriod = 20; // 연결 유지 시간 설정
    client.onDisconnected = onDisconnected;

    // MQTT 연결 옵션 설정
    final connMessage = MqttConnectMessage()
        .withClientIdentifier('Flutter_Client') // 클라이언트 식별자
        .withWillTopic('willTopic') // 비정상 연결 종료 시 발행할 주제
        .withWillMessage('Client disconnected unexpectedly') // 종료 시 발행할 메시지
        .startClean() // 클린 세션으로 연결
        .withWillQos(MqttQos.atMostOnce); // QoS 설정

    client.connectionMessage = connMessage;

    try {
      log('MQTT 브로커에 연결 중...');

      await client.connect();
    } on Exception catch (e) {
      log('MQTT 연결 실패 :: $e');
      disconnect();
    }

    if (client.connectionStatus!.state == MqttConnectionState.connected) {
      log('MQTT 브로커에 연결됨!');
    } else {
      log('MQTT 연결 실패 :: ${client.connectionStatus}');
      disconnect();
    }
  }

 

MqttQos(Quality of Service)

MQTT 프로토콜에서 메시지 전달 보장 수준을 정의하는 설정이다.

발행자가 보낸 메시지가 구독자에게 얼마나 확실히 도달해야 하는지에 대한 정책을 설정한다.

 

- QoS 0: At most once (최소 1회 전달)는 메시지 전달이 보장되지 않고 재시도가 이루어지지 않기 때문에 네트워크에 문제가 있거나 연결이 끊길 경우, 메시지가 수신되지 않을 수 있다.

- 데이터의 신뢰성이 최우선일 경우, QoS 2: Exactly once (정확히 1회 전달)을 사용한다.

 

 

3. 연결 해제 함수

 

void disconnect() {
  log('MQTT 연결 해제');
  client.disconnect();
}

 

 

4. 연결 끊김 핸들러

 

void onDisconnected() {
  log('MQTT 연결이 끊겼습니다.');
}

 

 

5. 주제 구독 함수

 

  void subscribeToTopic(String topic) {
    log('주제 구독 :: $topic');
    client.subscribe(topic, MqttQos.atMostOnce);

    // 토픽 수신 리스너
    client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>>? message) {
      final MqttPublishMessage recMessage =
          message![0].payload as MqttPublishMessage;

      final String payload =
          MqttPublishPayload.bytesToString(recMessage.payload.message);

      log('메시지 수신 [$topic] :: $payload');
    });
  }

 

 

6. 메시지 발행 함수

 

  void publishMessage(String topic, String message) {
    final builder = MqttClientPayloadBuilder();
    builder.addString(message);
    client.publishMessage(topic, MqttQos.atMostOnce, builder.payload!);

    log('메시지 발행 [$topic] :: $message');
  }

 

 

7. 사용

 

void main() async {
  // MQTT 클라이언트 생성
  final client = MqttServerClient('broker.hivemq.com', '');
  final mqttService = MqttService(client);

  // 브로커 연결
  await mqttService.connect();

  // 특정 주제 구독
  mqttService.subscribeToTopic('test/topic');

  // 메시지 발행
  mqttService.publishMessage('test/topic', 'Hello, MQTT!');

  // 주제에 대한 다른 메시지 발행
  mqttService.publishMessage('test/topic', 'Second message from Flutter!');
}

 

broker.hivemq.com은 무료 MQTT 브로커로, 테스트 용도로 사용할 수 있다.

 

코드를 실행하면, MQTT 클라이언트가 브로커와 성공적으로 연결되는 과정의 로그가 쭉 찍힌다.

각 로그 항목은 연결 상태가 어떻게 변경되고, 연결 프로세스가 어떤 단계에서 종료되었는지 나타낸다.

 

 

HiveMQ의 공용 브로커 서버와 기본 포트 1883에 연결되었다는 로그를 확인할 수 있다.

 

 

 

MQTT 브로커에 성공적으로 연결되고 특정 주제(topic)에 대한 구독 요청 메시지를 보내고 있는 과정을 확인할 수 있다.

 

 

 

 

 

발행된 메시지와 수신된 메시지도 잘 출력되었다.

 

 

 

 

728x90