Message Queue

Message Queueは非同期処理、サービス間連携、負荷平準化を実現するミドルウェアです。 RabbitMQ、Kafka、SQSの特性を理解し、ユースケースに応じた選択と設計が重要です。

メッセージングパターン


┌─────────────────────────────────────────────────────────────────────────┐
│                      Point-to-Point (Work Queue)                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Producer ──→ [Queue] ──→ Consumer 1                                   │
│                       ──→ Consumer 2   (1メッセージ=1Consumerのみ処理)  │
│                       ──→ Consumer 3                                    │
│                                                                          │
│   用途: タスク分散、ワーカー処理、バッチジョブ                           │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                      Publish/Subscribe (Fan-out)                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│                          ┌──→ [Queue A] ──→ Consumer 1                  │
│   Producer ──→ Exchange ─┼──→ [Queue B] ──→ Consumer 2                  │
│                          └──→ [Queue C] ──→ Consumer 3                  │
│                                                                          │
│   用途: イベント配信、通知、ログ配信(全Consumerが同じメッセージを受信)│
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                      Streaming (Kafka style)                             │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Producer ──→ [Topic: Partition 0] ──┬──→ Consumer Group A             │
│            ──→ [Topic: Partition 1] ──┤   (各Partitionを分担)           │
│            ──→ [Topic: Partition 2] ──┘                                 │
│                                       └──→ Consumer Group B             │
│                                           (全Partitionを読む)           │
│                                                                          │
│   用途: イベントソーシング、ログ集約、リアルタイム分析                   │
└─────────────────────────────────────────────────────────────────────────┘

RabbitMQ vs Kafka vs SQS

特性RabbitMQKafkaSQS
モデルMessage BrokerDistributed LogManaged Queue
プロトコルAMQP, MQTT, STOMP独自(TCP)HTTP/HTTPS
スループット~10K msg/sec~1M msg/sec~3K msg/sec
レイテンシ~1ms~5ms~20-50ms
メッセージ保持消費後削除保持期間設定消費後削除
順序保証キュー単位Partition単位FIFO Queue
リプレイ不可可能不可
運用負荷なし

RabbitMQ 実践

アーキテクチャ


┌─────────────────────────────────────────────────────────────────────────┐
│                         RabbitMQ Architecture                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Publisher ──→ Exchange ──binding──→ Queue ──→ Consumer                │
│                    │                                                     │
│         ┌─────────┼─────────┐                                           │
│         │         │         │                                           │
│     [direct]  [fanout]  [topic]  [headers]                              │
│         │         │         │                                           │
│   routing_key  全Queue  パターン  ヘッダ条件                            │
│    完全一致    にコピー  マッチ                                         │
│                                                                          │
│   Exchange Types:                                                        │
│   - direct:  routing_key完全一致でルーティング                          │
│   - fanout:  バインドされた全Queueにコピー                              │
│   - topic:   パターンマッチ(*.log, order.#)                           │
│   - headers: ヘッダ属性でルーティング                                   │
└─────────────────────────────────────────────────────────────────────────┘

設定例

# /etc/rabbitmq/rabbitmq.conf

# リスナー設定
listeners.tcp.default = 5672
management.tcp.port = 15672

# メモリ制限(システムメモリの40%)
vm_memory_high_watermark.relative = 0.4

# ディスク空き容量閾値
disk_free_limit.absolute = 2GB

# 接続・チャネル上限
channel_max = 2048
heartbeat = 60

# クラスタ設定
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3

# ミラーリングポリシー(HA Queue)
# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Python実装例

import pika
from pika.exchange_type import ExchangeType

# 接続設定
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters(
    host='rabbitmq.example.com',
    port=5672,
    credentials=credentials,
    heartbeat=600,
    blocked_connection_timeout=300,
)

# Publisher
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Exchangeとキュー宣言
channel.exchange_declare(exchange='orders', exchange_type=ExchangeType.topic, durable=True)
channel.queue_declare(queue='order_processing', durable=True)
channel.queue_bind(queue='order_processing', exchange='orders', routing_key='order.created')

# メッセージ送信(永続化設定)
channel.basic_publish(
    exchange='orders',
    routing_key='order.created',
    body='{"order_id": 123}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
        content_type='application/json',
    )
)

# Consumer
def callback(ch, method, properties, body):
    try:
        process_order(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動ACK
    except Exception as e:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)  # DLQへ

channel.basic_qos(prefetch_count=10)  # 同時処理数
channel.basic_consume(queue='order_processing', on_message_callback=callback)

Kafka 実践

アーキテクチャ


┌─────────────────────────────────────────────────────────────────────────┐
│                          Kafka Architecture                              │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Kafka Cluster                                                          │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │ Broker 1          Broker 2          Broker 3                    │   │
│   │ ┌─────────┐       ┌─────────┐       ┌─────────┐                │   │
│   │ │Topic A  │       │Topic A  │       │Topic A  │                │   │
│   │ │ P0(L)   │       │ P0(R)   │       │ P1(R)   │  L=Leader     │   │
│   │ │ P1(R)   │       │ P1(L)   │       │ P0(R)   │  R=Replica    │   │
│   │ └─────────┘       └─────────┘       └─────────┘                │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                              │                                           │
│                         ZooKeeper (メタデータ管理、Kafkaraftに移行中)    │
│                                                                          │
│   Consumer Group A                Consumer Group B                       │
│   ┌──────────────┐                ┌──────────────┐                      │
│   │Consumer 1: P0│                │Consumer 1: P0,P1│ (独立してオフセット│
│   │Consumer 2: P1│                └──────────────┘   管理)              │
│   └──────────────┘                                                       │
│                                                                          │
│   Partition: 並列度の単位。1 Partition = 1 Consumer(Group内)           │
└─────────────────────────────────────────────────────────────────────────┘

設定例

# server.properties

# ブローカー設定
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka1.example.com:9092

# ログ保持設定
log.retention.hours=168           # 7日間保持
log.retention.bytes=1073741824    # 1GB上限
log.segment.bytes=1073741824      # 1セグメント=1GB

# レプリケーション
default.replication.factor=3
min.insync.replicas=2             # 最低同期レプリカ数

# パフォーマンス
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# ZooKeeper
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

Python実装例

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json

# Producer
producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',                    # 全レプリカ確認
    retries=3,                     # リトライ回数
    retry_backoff_ms=100,
    linger_ms=5,                   # バッチング待機時間
    batch_size=16384,              # バッチサイズ
)

# 送信(非同期)
future = producer.send('orders', value={'order_id': 123}, key=b'user_456')
try:
    record_metadata = future.get(timeout=10)
    print(f"Sent to {record_metadata.topic}:{record_metadata.partition}@{record_metadata.offset}")
except KafkaError as e:
    print(f"Failed: {e}")

# Consumer
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',  # 初回はearliest
    enable_auto_commit=False,      # 手動コミット
    max_poll_records=500,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)

for message in consumer:
    try:
        process(message.value)
        consumer.commit()          # 処理成功後にコミット
    except Exception as e:
        # エラー処理(DLQ送信など)
        pass

Amazon SQS 実践

標準キュー vs FIFOキュー

特性標準キューFIFOキュー
スループット無制限3,000 msg/sec
順序保証ベストエフォート厳密なFIFO
重複稀に重複重複排除
用途高スループット、順序不問金融、順序重要

Python実装例 (boto3)

import boto3
import json

sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789/my-queue'

# 送信
response = sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps({'order_id': 123}),
    MessageAttributes={
        'Type': {'DataType': 'String', 'StringValue': 'order'},
    },
    DelaySeconds=0,  # 遅延配信(最大15分)
)
print(f"MessageId: {response['MessageId']}")

# 受信(Long Polling推奨)
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,      # 最大10件
    WaitTimeSeconds=20,          # Long Polling(最大20秒)
    VisibilityTimeout=300,       # 処理タイムアウト
    MessageAttributeNames=['All'],
)

for message in response.get('Messages', []):
    try:
        body = json.loads(message['Body'])
        process(body)

        # 削除(処理完了後)
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )
    except Exception as e:
        # VisibilityTimeout後に再配信される
        pass

# DLQ設定(Terraform例)
# resource "aws_sqs_queue" "main" {
#   name = "my-queue"
#   redrive_policy = jsonencode({
#     deadLetterTargetArn = aws_sqs_queue.dlq.arn
#     maxReceiveCount     = 3  # 3回失敗でDLQへ
#   })
# }

トラブルシューティング

メッセージ滞留(Queue深度増加)

原因: Consumer処理速度 < Producer送信速度

# RabbitMQ: キュー深度確認
rabbitmqctl list_queues name messages messages_ready

# Kafka: Consumer Lag確認
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-consumer-group

対策:
- Consumer数を増やす(水平スケール)
- prefetch_count / max.poll.records 調整
- 処理ロジック最適化

メッセージ重複処理

原因: ACK前にConsumerクラッシュ、ネットワーク断

対策: 冪等性(Idempotency)の実装

# メッセージIDによる重複チェック
def process_message(message):
    message_id = message['id']
    if redis.setnx(f"processed:{message_id}", 1):
        redis.expire(f"processed:{message_id}", 86400)
        do_actual_processing(message)
    else:
        log.info(f"Skip duplicate: {message_id}")

Consumer Rebalance(Kafka)

原因: Consumerの追加/削除、ハートビートタイムアウト

# 設定調整
session.timeout.ms=30000        # セッションタイムアウト
heartbeat.interval.ms=10000     # ハートビート間隔
max.poll.interval.ms=300000     # 処理タイムアウト

# Cooperative Stickyアサインメント使用
partition.assignment.strategy=
  org.apache.kafka.clients.consumer.CooperativeStickyAssignor

監視すべきメトリクス

メトリクスRabbitMQKafkaアラート条件
メッセージ滞留queue.messagesconsumer_lag増加傾向
メモリ使用量mem_usedheap_used> 80%
接続数connectionsactive_connections上限の80%
DLQメッセージdlq.messages-> 0

選定ガイド

RabbitMQ

  • ✓ 複雑なルーティングが必要
  • ✓ 低レイテンシ要求
  • ✓ 多様なプロトコル対応
  • ✓ メッセージ確認(ACK)重視
  • ✗ 超大量データストリーム

Kafka

  • ✓ 高スループット要求
  • ✓ イベントソーシング
  • ✓ メッセージ再処理必要
  • ✓ ログ集約・分析
  • ✗ シンプルなキュー用途

SQS

  • ✓ 運用負荷ゼロ
  • ✓ AWSサービス連携
  • ✓ 小〜中規模
  • ✓ サーバーレス連携
  • ✗ オンプレ/マルチクラウド