MQTT リアルタイムフィードトピック

概要

UbiBot MQTT リアルタイムフィードサービスは、MQTT プロトコルを介して UbiBot プラットフォームからリアルタイムでデータフィードを配信する機能です。
クライアントは自分自身または共有チャンネルのフィードトピックを購読することで、元のセンサーデータを即座に受信できます。
購読状態を維持するために、ハートビート機構が必要です。

注意:MQTT チャンネルフィードサービスは、UbiBot Plus メンバーシップのブロンズレベル以上のユーザーのみ利用可能です。
FREE ティアのアカウントでは利用できません。

 

接続情報

Host:mqtt-api.ubibot.com

Port:1883 または 8883(SSL 暗号化接続用)

WebSocket Port:8083 または 8084(SSL 暗号化接続用)

WebSocket Path:/mqtt

Username:形式:user_id=USER_ID

Password:形式:account_key=ACCOUNT_KEY

USER_IDACCOUNT_KEY は、UbiBot コンソールから取得した実際の認証情報に置き換えてください。

 

MQTT サブスクリプション ハートビート

リアルタイムフィードデータを受信するには、クライアントは以下の URL に対してハートビートの HTTP GET リクエストを送信する必要があります;

https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping

ハートビートは、購読を維持するために少なくとも 300 秒ごと に送信する必要があります。
もしハートビートが期限内に受信されない場合、サーバーは当該アカウントへの MQTT フィードデータの配信を停止します。
推奨設定は 240 秒ごと の送信です。
ハートビートを過度に頻繁に送信することは避けてください(例:10 秒ごとなど)。

 

ハートビートリクエストパラメータ

  • account_key(文字列、必須)
  • user_id(オプション):他ユーザーのデバイスを購読する場合、カンマ区切りで最大 20 件まで指定可能

 

MQTT トピック

  • 自分のアカウント下の全フィードデータを購読する場合:

user/USER_ID/channel_feeds/#

  • 特定のチャンネルを購読する場合:

/user/USER_ID/channel_feeds/CHANNEL_ID

USER_ID および CHANNEL_ID は適宜置き換えてください。

他のアカウントのフィードを購読する場合は、そのユーザーの user_id を指定できます。

注意:購読対象ユーザーは UbiBot コンソールであなたのアカウントへの データ共有(Share Permission) を有効にしている必要があります。そうでない場合、データは受信できません。

 

Python – MQTT フィード購読の例

# -*- coding: utf-8 -*-
# UbiBot MQTT Feed Subscription with Heartbeat (Python)

import paho.mqtt.client as mqtt
import threading
import requests
import time

# Replace with your actual credentials
USER_ID = "your_user_id"
ACCOUNT_KEY = "your_account_key"
OTHER_USER_IDS = ""  # Optional, e.g., "user1,user2"

# MQTT connection settings
MQTT_HOST = "mqtt-api.ubibot.com"
MQTT_PORT = 1883
MQTT_USERNAME = f"user_id={USER_ID}"
MQTT_PASSWORD = f"account_key={ACCOUNT_KEY}"
MQTT_TOPIC = f"/user/{USER_ID}/channel_feeds/#"

# Heartbeat settings
HEARTBEAT_URL = "https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping"
HEARTBEAT_INTERVAL = 240  # seconds

# Heartbeat function
def send_heartbeat():
    params = {
        "account_key": ACCOUNT_KEY
    }
    if OTHER_USER_IDS:
        params["user_id"] = OTHER_USER_IDS

    try:
        response = requests.get(HEARTBEAT_URL, params=params, timeout=5)
        print(f"[HEARTBEAT] Sent. Status: {response.status_code}, Response: {response.text}")
    except Exception as e:
        print(f"[HEARTBEAT] Failed: {e}")

    # Schedule next heartbeat
    threading.Timer(HEARTBEAT_INTERVAL, send_heartbeat).start()

# MQTT Callbacks
def on_message(client, userdata, msg):
    print(f"[RECV] Topic: {msg.topic}")
    print(f"[RECV] Payload: {msg.payload.decode()}")

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("[INFO] Connected successfully.")
        client.subscribe(MQTT_TOPIC)
        print(f"[INFO] Subscribed to: {MQTT_TOPIC}")
    else:
        print(f"[ERROR] Connection failed with code {rc}")

# Start MQTT client
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message

print("[INFO] Connecting to MQTT broker...")
client.connect(MQTT_HOST, MQTT_PORT, 60)

# Start heartbeat thread
send_heartbeat()

# Start MQTT loop
client.loop_forever()

 

NodeJS – MQTT フィード購読の例

// Node.js – MQTT Feed Subscription Example with Heartbeat

const mqtt = require('mqtt');
const https = require('https');
const querystring = require('querystring');

// Replace with your actual credentials
const USER_ID = 'your_user_id';
const ACCOUNT_KEY = 'your_account_key';
const OTHER_USER_IDS = ''; // Optional, e.g., 'user1,user2'

const options = {
  username: `user_id=${USER_ID}`,
  password: `account_key=${ACCOUNT_KEY}`
};

const topic = `/user/${USER_ID}/channel_feeds/#`;

const client = mqtt.connect('mqtt://mqtt-api.ubibot.com:1883', options);

client.on('connect', () => {
  console.log('[INFO] Connected to MQTT broker.');
  client.subscribe(topic, (err) => {
    if (!err) {
      console.log('[INFO] Subscribed to:', topic);
    } else {
      console.error('[ERROR] Subscribe failed:', err.message);
    }
  });

  // Start sending heartbeat
  sendHeartbeat();
  setInterval(sendHeartbeat, 240000); // every 240 seconds
});

client.on('message', (topic, message) => {
  console.log(`[RECV] Topic: ${topic}`);
  console.log(`[RECV] Payload: ${message.toString()}`);
});

function sendHeartbeat() {
  const params = {
    account_key: ACCOUNT_KEY
  };

  if (OTHER_USER_IDS) {
    params.user_id = OTHER_USER_IDS;
  }

  const query = querystring.stringify(params);
  const url = `https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping?${query}`;

  https.get(url, (res) => {
    let data = '';
    res.on('data', (chunk) => { data += chunk; });
    res.on('end', () => {
      console.log(`[HEARTBEAT] Status: ${res.statusCode}, Response: ${data}`);
    });
  }).on('error', (err) => {
    console.error(`[HEARTBEAT] Error: ${err.message}`);
  });
}

 

C# – MQTT フィード購読の例

// C# – MQTT Feed Subscription Example with Heartbeat
// Requires MQTTnet (via NuGet) and System.Net.Http

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    private static readonly string USER_ID = "your_user_id";
    private static readonly string ACCOUNT_KEY = "your_account_key";
    private static readonly string OTHER_USER_IDS = ""; // Optional: "user1,user2"
    private static readonly string TOPIC = $"/user/{USER_ID}/channel_feeds/#";
    private static readonly string HEARTBEAT_URL = "https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping";
    private static readonly int HEARTBEAT_INTERVAL = 240; // seconds

    private static readonly HttpClient httpClient = new HttpClient();

    static async Task Main(string[] args)
    {
        var factory = new MqttFactory();
        var mqttClient = factory.CreateMqttClient();

        var options = new MqttClientOptionsBuilder()
            .WithTcpServer("mqtt-api.ubibot.com", 1883)
            .WithCredentials($"user_id={USER_ID}", $"account_key={ACCOUNT_KEY}")
            .WithCleanSession()
            .Build();

        mqttClient.UseConnectedHandler(async e =>
        {
            Console.WriteLine("[INFO] Connected to MQTT broker.");
            await mqttClient.SubscribeAsync(TOPIC);
            Console.WriteLine($"[INFO] Subscribed to: {TOPIC}");

            // Start heartbeat loop
            _ = Task.Run(() => StartHeartbeatLoop());
        });

        mqttClient.UseApplicationMessageReceivedHandler(e =>
        {
            Console.WriteLine($"[RECV] Topic: {e.ApplicationMessage.Topic}");
            Console.WriteLine($"[RECV] Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
        });

        mqttClient.UseDisconnectedHandler(e =>
        {
            Console.WriteLine("[WARN] Disconnected from MQTT broker.");
        });

        Console.WriteLine("[INFO] Connecting...");
        await mqttClient.ConnectAsync(options);

        Console.WriteLine("[INFO] Press any key to exit.");
        Console.ReadLine();
    }

    static async Task StartHeartbeatLoop()
    {
        while (true)
        {
            try
            {
                var uriBuilder = new UriBuilder(HEARTBEAT_URL);
                var query = $"account_key={ACCOUNT_KEY}";
                if (!string.IsNullOrEmpty(OTHER_USER_IDS))
                {
                    query += $"&user_id={OTHER_USER_IDS}";
                }
                uriBuilder.Query = query;

                var response = await httpClient.GetAsync(uriBuilder.Uri);
                var result = await response.Content.ReadAsStringAsync();
                Console.WriteLine($"[HEARTBEAT] Status: {response.StatusCode}, Response: {result}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"[HEARTBEAT] Error: {ex.Message}");
            }

            await Task.Delay(HEARTBEAT_INTERVAL * 1000);
        }
    }
}

ブログに戻る