Queues and streams are fundamental patterns for decoupling systems and processing work asynchronously. Although they seem similar, they have very different performance characteristics. Choosing wrong can severely limit scalability.
Queue is for tasks. Stream is for events. Confusing the two is a recipe for problems.
Queues vs Streams
Queues (Message Queues)
Producer → [Queue] → Consumer
Characteristics:
- Message consumed = removed
- One consumer per message
- Ordering per consumer
- Focus on task processing
Examples: RabbitMQ, SQS, ActiveMQ
Streams (Event Streams)
Producer → [Stream] → Consumer A
→ Consumer B
→ Consumer C
Characteristics:
- Message stays in stream
- Multiple consumers read same message
- Guaranteed ordering per partition
- Focus on events and history
Examples: Kafka, Kinesis, Pulsar
Performance Comparison
| Aspect | Queues | Streams |
|---|---|---|
| Throughput | Medium (~10K msg/s) | High (~100K+ msg/s) |
| Latency | Low (ms) | Variable |
| Retention | Until consumed | Configurable (hours/days) |
| Replay | No | Yes |
| Ordering | Per queue | Per partition |
| Scale | Vertical | Horizontal |
Optimizing Queues
RabbitMQ
1. Adequate prefetch
# ❌ Prefetch 1: too slow
channel.basic_qos(prefetch_count=1)
# ✅ Balanced prefetch
channel.basic_qos(prefetch_count=50)
Trade-off:
High prefetch: better throughput, risk of loss if consumer fails
Low prefetch: lower risk, worse throughput
2. Batching acks
# ❌ Ack per message
def callback(message):
process(message)
channel.basic_ack(delivery_tag)
# ✅ Batch ack
messages_processed = 0
def callback(message):
process(message)
messages_processed += 1
if messages_processed >= 100:
channel.basic_ack(delivery_tag, multiple=True)
messages_processed = 0
3. Conscious persistence
# ❌ Everything persistent (slow)
channel.basic_publish(
properties=pika.BasicProperties(delivery_mode=2)
)
# ✅ Persistence only when needed
if message.is_critical:
properties = pika.BasicProperties(delivery_mode=2)
else:
properties = pika.BasicProperties(delivery_mode=1)
4. Multiple queues for parallelism
❌ One queue, multiple consumers
→ Contention on queue lock
✅ Multiple queues with sharding
→ Real parallelism
SQS
1. Long polling
# ❌ Short polling (waste)
response = sqs.receive_message(
WaitTimeSeconds=0 # Returns immediately
)
# ✅ Long polling
response = sqs.receive_message(
WaitTimeSeconds=20 # Waits up to 20s for messages
)
2. Batch operations
# ❌ Individual send
for message in messages:
sqs.send_message(MessageBody=message)
# ✅ Batch send (up to 10)
sqs.send_message_batch(
Entries=[
{'Id': str(i), 'MessageBody': msg}
for i, msg in enumerate(messages[:10])
]
)
3. Visibility timeout
# Timeout based on processing time
sqs.receive_message(
VisibilityTimeout=processing_time * 2
)
Optimizing Streams
Kafka
1. Adequate partitioning
# Partitions = maximum parallelism
# More partitions = more parallel consumers
# ❌ Few partitions
kafka-topics --create --topic orders --partitions 3
# ✅ Partitions based on expected throughput
# If need 100K msg/s and each consumer does 10K msg/s
# Need at least 10 partitions
kafka-topics --create --topic orders --partitions 12
2. Smart partition key
# ❌ No key (round-robin, no order guarantee)
producer.send('orders', value=order)
# ❌ Key with hot spot
producer.send('orders', key=tenant_id, value=order)
# If one tenant has 90% of traffic, one partition overloaded
# ✅ Balanced key
producer.send('orders', key=order_id, value=order)
3. Producer batching
producer = KafkaProducer(
batch_size=16384, # 16KB per batch
linger_ms=5, # Wait 5ms to accumulate
compression_type='lz4', # Compress batch
)
4. Consumer configuration
consumer = KafkaConsumer(
'topic',
group_id='my-group',
# Fetch optimization
fetch_min_bytes=1024, # Wait to accumulate 1KB
fetch_max_wait_ms=500, # Or 500ms
max_poll_records=500, # Up to 500 records per poll
# Commit strategy
enable_auto_commit=False, # Manual commit for control
)
5. Consumer group rebalancing
# ❌ Slow rebalance stops processing
# ✅ Cooperative rebalancing
consumer = KafkaConsumer(
partition_assignment_strategy=[
CooperativeStickyAssignor
]
)
Kinesis
1. Adequate shards
# 1 shard = 1MB/s write, 2MB/s read
# Calculate based on throughput
throughput_mb_s = messages_per_second * avg_message_size_kb / 1024
shards_needed = ceil(throughput_mb_s)
2. Enhanced fan-out
# ❌ Standard polling (shares throughput)
# ✅ Enhanced fan-out (2MB/s dedicated per consumer)
kinesis.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName='my-consumer'
)
Performance Patterns
1. Dead Letter Queue (DLQ)
def process_with_dlq(message, max_retries=3):
retries = message.get_header('retry_count', 0)
try:
process(message)
ack(message)
except Exception as e:
if retries >= max_retries:
send_to_dlq(message, error=str(e))
ack(message) # Remove from main queue
else:
message.set_header('retry_count', retries + 1)
requeue_with_delay(message, delay=2**retries)
2. Competing Consumers
→ Consumer 1
Queue → Dispatcher → Consumer 2
→ Consumer 3
Each message processed by only one consumer
Natural horizontal scale
3. Fan-out
→ Queue A → Consumer A
Event → SNS → Queue B → Consumer B
→ Queue C → Consumer C
Same event processed by multiple systems
4. Aggregation
Events → Stream → Aggregator → Batch → Destination
Groups small events into larger batches
Reduces I/O overhead
def aggregate(events, window_seconds=10, max_batch=1000):
batch = []
window_start = time.time()
for event in events:
batch.append(event)
if len(batch) >= max_batch or time.time() - window_start > window_seconds:
yield batch
batch = []
window_start = time.time()
Essential Metrics
For Queues
# Queue size
queue_messages_ready
queue_messages_unacked
# Processing rate
messages_published_per_second
messages_consumed_per_second
# Latency
message_age_seconds
processing_time_seconds
For Streams
# Consumer lag (most important!)
kafka_consumer_group_lag
# Throughput
records_consumed_per_second
records_produced_per_second
# Partition health
under_replicated_partitions
offline_partitions
Alerts
# Queue growing
- alert: QueueBacklog
expr: queue_messages_ready > 10000
for: 5m
# High consumer lag
- alert: ConsumerLag
expr: kafka_consumer_group_lag > 100000
for: 5m
# Old messages
- alert: OldMessages
expr: message_age_seconds > 300
for: 2m
When to Use Each
Use Queues when:
✓ Task processing (jobs)
✓ Work that needs to be done "exactly once"
✓ Don't need replay
✓ Order not critical (or per consumer)
✓ Moderate volume
Use Streams when:
✓ Events that multiple systems consume
✓ Need replay (reprocess history)
✓ High throughput
✓ Order is critical per entity
✓ Event sourcing
✓ Real-time analytics
Use Both:
Events → Kafka → Consumer → SQS → Workers
↓
(transformation)
Stream for raw events
Queue for derived tasks
Conclusion
Queues and streams are complementary tools:
| Queues | Streams |
|---|---|
| Tasks | Events |
| Consumed = deleted | Retained |
| One consumer | Multiple consumers |
| Simple | Complex |
For performance:
- Size correctly: partitions/shards based on throughput
- Batch operations: reduce I/O overhead
- Monitor lag: most important metric in both
- Always DLQ: don't let poisoned messages block system
- Test under load: behavior changes drastically at scale
The choice between queue and stream is not technical, it's conceptual. What are you modeling: a task or a fact?