Message Queue
Message Queueは非同期処理、サービス間連携、負荷平準化を実現するミドルウェアです。 RabbitMQ、Kafka、SQSの特性を理解し、ユースケースに応じた選択と設計が重要です。
メッセージングパターン
┌─────────────────────────────────────────────────────────────────────────┐ │ Point-to-Point (Work Queue) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ Producer ──→ [Queue] ──→ Consumer 1 │ │ ──→ Consumer 2 (1メッセージ=1Consumerのみ処理) │ │ ──→ Consumer 3 │ │ │ │ 用途: タスク分散、ワーカー処理、バッチジョブ │ └─────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────┐ │ Publish/Subscribe (Fan-out) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──→ [Queue A] ──→ Consumer 1 │ │ Producer ──→ Exchange ─┼──→ [Queue B] ──→ Consumer 2 │ │ └──→ [Queue C] ──→ Consumer 3 │ │ │ │ 用途: イベント配信、通知、ログ配信(全Consumerが同じメッセージを受信)│ └─────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────┐ │ Streaming (Kafka style) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ Producer ──→ [Topic: Partition 0] ──┬──→ Consumer Group A │ │ ──→ [Topic: Partition 1] ──┤ (各Partitionを分担) │ │ ──→ [Topic: Partition 2] ──┘ │ │ └──→ Consumer Group B │ │ (全Partitionを読む) │ │ │ │ 用途: イベントソーシング、ログ集約、リアルタイム分析 │ └─────────────────────────────────────────────────────────────────────────┘
RabbitMQ vs Kafka vs SQS
| 特性 | RabbitMQ | Kafka | SQS |
|---|---|---|---|
| モデル | Message Broker | Distributed Log | Managed Queue |
| プロトコル | AMQP, MQTT, STOMP | 独自(TCP) | HTTP/HTTPS |
| スループット | ~10K msg/sec | ~1M msg/sec | ~3K msg/sec |
| レイテンシ | ~1ms | ~5ms | ~20-50ms |
| メッセージ保持 | 消費後削除 | 保持期間設定 | 消費後削除 |
| 順序保証 | キュー単位 | Partition単位 | FIFO Queue |
| リプレイ | 不可 | 可能 | 不可 |
| 運用負荷 | 中 | 高 | なし |
RabbitMQ 実践
アーキテクチャ
┌─────────────────────────────────────────────────────────────────────────┐ │ RabbitMQ Architecture │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ Publisher ──→ Exchange ──binding──→ Queue ──→ Consumer │ │ │ │ │ ┌─────────┼─────────┐ │ │ │ │ │ │ │ [direct] [fanout] [topic] [headers] │ │ │ │ │ │ │ routing_key 全Queue パターン ヘッダ条件 │ │ 完全一致 にコピー マッチ │ │ │ │ Exchange Types: │ │ - direct: routing_key完全一致でルーティング │ │ - fanout: バインドされた全Queueにコピー │ │ - topic: パターンマッチ(*.log, order.#) │ │ - headers: ヘッダ属性でルーティング │ └─────────────────────────────────────────────────────────────────────────┘
設定例
# /etc/rabbitmq/rabbitmq.conf
# リスナー設定
listeners.tcp.default = 5672
management.tcp.port = 15672
# メモリ制限(システムメモリの40%)
vm_memory_high_watermark.relative = 0.4
# ディスク空き容量閾値
disk_free_limit.absolute = 2GB
# 接続・チャネル上限
channel_max = 2048
heartbeat = 60
# クラスタ設定
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3
# ミラーリングポリシー(HA Queue)
# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'Python実装例
import pika
from pika.exchange_type import ExchangeType
# 接続設定
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters(
host='rabbitmq.example.com',
port=5672,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300,
)
# Publisher
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Exchangeとキュー宣言
channel.exchange_declare(exchange='orders', exchange_type=ExchangeType.topic, durable=True)
channel.queue_declare(queue='order_processing', durable=True)
channel.queue_bind(queue='order_processing', exchange='orders', routing_key='order.created')
# メッセージ送信(永続化設定)
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body='{"order_id": 123}',
properties=pika.BasicProperties(
delivery_mode=2, # persistent
content_type='application/json',
)
)
# Consumer
def callback(ch, method, properties, body):
try:
process_order(body)
ch.basic_ack(delivery_tag=method.delivery_tag) # 手動ACK
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) # DLQへ
channel.basic_qos(prefetch_count=10) # 同時処理数
channel.basic_consume(queue='order_processing', on_message_callback=callback)Kafka 実践
アーキテクチャ
┌─────────────────────────────────────────────────────────────────────────┐ │ Kafka Architecture │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ Kafka Cluster │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Broker 1 Broker 2 Broker 3 │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │Topic A │ │Topic A │ │Topic A │ │ │ │ │ │ P0(L) │ │ P0(R) │ │ P1(R) │ L=Leader │ │ │ │ │ P1(R) │ │ P1(L) │ │ P0(R) │ R=Replica │ │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ZooKeeper (メタデータ管理、Kafkaraftに移行中) │ │ │ │ Consumer Group A Consumer Group B │ │ ┌──────────────┐ ┌──────────────┐ │ │ │Consumer 1: P0│ │Consumer 1: P0,P1│ (独立してオフセット│ │ │Consumer 2: P1│ └──────────────┘ 管理) │ │ └──────────────┘ │ │ │ │ Partition: 並列度の単位。1 Partition = 1 Consumer(Group内) │ └─────────────────────────────────────────────────────────────────────────┘
設定例
# server.properties # ブローカー設定 broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://kafka1.example.com:9092 # ログ保持設定 log.retention.hours=168 # 7日間保持 log.retention.bytes=1073741824 # 1GB上限 log.segment.bytes=1073741824 # 1セグメント=1GB # レプリケーション default.replication.factor=3 min.insync.replicas=2 # 最低同期レプリカ数 # パフォーマンス num.network.threads=8 num.io.threads=16 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # ZooKeeper zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
Python実装例
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
# Producer
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # 全レプリカ確認
retries=3, # リトライ回数
retry_backoff_ms=100,
linger_ms=5, # バッチング待機時間
batch_size=16384, # バッチサイズ
)
# 送信(非同期)
future = producer.send('orders', value={'order_id': 123}, key=b'user_456')
try:
record_metadata = future.get(timeout=10)
print(f"Sent to {record_metadata.topic}:{record_metadata.partition}@{record_metadata.offset}")
except KafkaError as e:
print(f"Failed: {e}")
# Consumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
group_id='order-processor',
auto_offset_reset='earliest', # 初回はearliest
enable_auto_commit=False, # 手動コミット
max_poll_records=500,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
for message in consumer:
try:
process(message.value)
consumer.commit() # 処理成功後にコミット
except Exception as e:
# エラー処理(DLQ送信など)
passAmazon SQS 実践
標準キュー vs FIFOキュー
| 特性 | 標準キュー | FIFOキュー |
|---|---|---|
| スループット | 無制限 | 3,000 msg/sec |
| 順序保証 | ベストエフォート | 厳密なFIFO |
| 重複 | 稀に重複 | 重複排除 |
| 用途 | 高スループット、順序不問 | 金融、順序重要 |
Python実装例 (boto3)
import boto3
import json
sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789/my-queue'
# 送信
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({'order_id': 123}),
MessageAttributes={
'Type': {'DataType': 'String', 'StringValue': 'order'},
},
DelaySeconds=0, # 遅延配信(最大15分)
)
print(f"MessageId: {response['MessageId']}")
# 受信(Long Polling推奨)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10, # 最大10件
WaitTimeSeconds=20, # Long Polling(最大20秒)
VisibilityTimeout=300, # 処理タイムアウト
MessageAttributeNames=['All'],
)
for message in response.get('Messages', []):
try:
body = json.loads(message['Body'])
process(body)
# 削除(処理完了後)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
# VisibilityTimeout後に再配信される
pass
# DLQ設定(Terraform例)
# resource "aws_sqs_queue" "main" {
# name = "my-queue"
# redrive_policy = jsonencode({
# deadLetterTargetArn = aws_sqs_queue.dlq.arn
# maxReceiveCount = 3 # 3回失敗でDLQへ
# })
# }トラブルシューティング
メッセージ滞留(Queue深度増加)
原因: Consumer処理速度 < Producer送信速度
# RabbitMQ: キュー深度確認 rabbitmqctl list_queues name messages messages_ready # Kafka: Consumer Lag確認 kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --describe --group my-consumer-group 対策: - Consumer数を増やす(水平スケール) - prefetch_count / max.poll.records 調整 - 処理ロジック最適化
メッセージ重複処理
原因: ACK前にConsumerクラッシュ、ネットワーク断
対策: 冪等性(Idempotency)の実装
# メッセージIDによる重複チェック
def process_message(message):
message_id = message['id']
if redis.setnx(f"processed:{message_id}", 1):
redis.expire(f"processed:{message_id}", 86400)
do_actual_processing(message)
else:
log.info(f"Skip duplicate: {message_id}")Consumer Rebalance(Kafka)
原因: Consumerの追加/削除、ハートビートタイムアウト
# 設定調整 session.timeout.ms=30000 # セッションタイムアウト heartbeat.interval.ms=10000 # ハートビート間隔 max.poll.interval.ms=300000 # 処理タイムアウト # Cooperative Stickyアサインメント使用 partition.assignment.strategy= org.apache.kafka.clients.consumer.CooperativeStickyAssignor
監視すべきメトリクス
| メトリクス | RabbitMQ | Kafka | アラート条件 |
|---|---|---|---|
| メッセージ滞留 | queue.messages | consumer_lag | 増加傾向 |
| メモリ使用量 | mem_used | heap_used | > 80% |
| 接続数 | connections | active_connections | 上限の80% |
| DLQメッセージ | dlq.messages | - | > 0 |
選定ガイド
RabbitMQ
- ✓ 複雑なルーティングが必要
- ✓ 低レイテンシ要求
- ✓ 多様なプロトコル対応
- ✓ メッセージ確認(ACK)重視
- ✗ 超大量データストリーム
Kafka
- ✓ 高スループット要求
- ✓ イベントソーシング
- ✓ メッセージ再処理必要
- ✓ ログ集約・分析
- ✗ シンプルなキュー用途
SQS
- ✓ 運用負荷ゼロ
- ✓ AWSサービス連携
- ✓ 小〜中規模
- ✓ サーバーレス連携
- ✗ オンプレ/マルチクラウド