Fundamentals8 min

Queues and Streams: performance in asynchronous processing

Queues and streams are fundamental for scalable systems. Understand the differences, when to use each, and how to optimize performance.

Queues and streams are fundamental patterns for decoupling systems and processing work asynchronously. Although they seem similar, they have very different performance characteristics. Choosing wrong can severely limit scalability.

Queue is for tasks. Stream is for events. Confusing the two is a recipe for problems.

Queues vs Streams

Queues (Message Queues)

Producer → [Queue] → Consumer

Characteristics:
- Message consumed = removed
- One consumer per message
- Ordering per consumer
- Focus on task processing

Examples: RabbitMQ, SQS, ActiveMQ

Streams (Event Streams)

Producer → [Stream] → Consumer A
                   → Consumer B
                   → Consumer C

Characteristics:
- Message stays in stream
- Multiple consumers read same message
- Guaranteed ordering per partition
- Focus on events and history

Examples: Kafka, Kinesis, Pulsar

Performance Comparison

Aspect Queues Streams
Throughput Medium (~10K msg/s) High (~100K+ msg/s)
Latency Low (ms) Variable
Retention Until consumed Configurable (hours/days)
Replay No Yes
Ordering Per queue Per partition
Scale Vertical Horizontal

Optimizing Queues

RabbitMQ

1. Adequate prefetch

# ❌ Prefetch 1: too slow
channel.basic_qos(prefetch_count=1)

# ✅ Balanced prefetch
channel.basic_qos(prefetch_count=50)

Trade-off:

High prefetch: better throughput, risk of loss if consumer fails
Low prefetch: lower risk, worse throughput

2. Batching acks

# ❌ Ack per message
def callback(message):
    process(message)
    channel.basic_ack(delivery_tag)

# ✅ Batch ack
messages_processed = 0
def callback(message):
    process(message)
    messages_processed += 1
    if messages_processed >= 100:
        channel.basic_ack(delivery_tag, multiple=True)
        messages_processed = 0

3. Conscious persistence

# ❌ Everything persistent (slow)
channel.basic_publish(
    properties=pika.BasicProperties(delivery_mode=2)
)

# ✅ Persistence only when needed
if message.is_critical:
    properties = pika.BasicProperties(delivery_mode=2)
else:
    properties = pika.BasicProperties(delivery_mode=1)

4. Multiple queues for parallelism

❌ One queue, multiple consumers
   → Contention on queue lock

✅ Multiple queues with sharding
   → Real parallelism

SQS

1. Long polling

# ❌ Short polling (waste)
response = sqs.receive_message(
    WaitTimeSeconds=0  # Returns immediately
)

# ✅ Long polling
response = sqs.receive_message(
    WaitTimeSeconds=20  # Waits up to 20s for messages
)

2. Batch operations

# ❌ Individual send
for message in messages:
    sqs.send_message(MessageBody=message)

# ✅ Batch send (up to 10)
sqs.send_message_batch(
    Entries=[
        {'Id': str(i), 'MessageBody': msg}
        for i, msg in enumerate(messages[:10])
    ]
)

3. Visibility timeout

# Timeout based on processing time
sqs.receive_message(
    VisibilityTimeout=processing_time * 2
)

Optimizing Streams

Kafka

1. Adequate partitioning

# Partitions = maximum parallelism
# More partitions = more parallel consumers

# ❌ Few partitions
kafka-topics --create --topic orders --partitions 3

# ✅ Partitions based on expected throughput
# If need 100K msg/s and each consumer does 10K msg/s
# Need at least 10 partitions
kafka-topics --create --topic orders --partitions 12

2. Smart partition key

# ❌ No key (round-robin, no order guarantee)
producer.send('orders', value=order)

# ❌ Key with hot spot
producer.send('orders', key=tenant_id, value=order)
# If one tenant has 90% of traffic, one partition overloaded

# ✅ Balanced key
producer.send('orders', key=order_id, value=order)

3. Producer batching

producer = KafkaProducer(
    batch_size=16384,       # 16KB per batch
    linger_ms=5,            # Wait 5ms to accumulate
    compression_type='lz4', # Compress batch
)

4. Consumer configuration

consumer = KafkaConsumer(
    'topic',
    group_id='my-group',

    # Fetch optimization
    fetch_min_bytes=1024,      # Wait to accumulate 1KB
    fetch_max_wait_ms=500,     # Or 500ms
    max_poll_records=500,      # Up to 500 records per poll

    # Commit strategy
    enable_auto_commit=False,  # Manual commit for control
)

5. Consumer group rebalancing

# ❌ Slow rebalance stops processing
# ✅ Cooperative rebalancing
consumer = KafkaConsumer(
    partition_assignment_strategy=[
        CooperativeStickyAssignor
    ]
)

Kinesis

1. Adequate shards

# 1 shard = 1MB/s write, 2MB/s read
# Calculate based on throughput

throughput_mb_s = messages_per_second * avg_message_size_kb / 1024
shards_needed = ceil(throughput_mb_s)

2. Enhanced fan-out

# ❌ Standard polling (shares throughput)
# ✅ Enhanced fan-out (2MB/s dedicated per consumer)
kinesis.register_stream_consumer(
    StreamARN=stream_arn,
    ConsumerName='my-consumer'
)

Performance Patterns

1. Dead Letter Queue (DLQ)

def process_with_dlq(message, max_retries=3):
    retries = message.get_header('retry_count', 0)

    try:
        process(message)
        ack(message)
    except Exception as e:
        if retries >= max_retries:
            send_to_dlq(message, error=str(e))
            ack(message)  # Remove from main queue
        else:
            message.set_header('retry_count', retries + 1)
            requeue_with_delay(message, delay=2**retries)

2. Competing Consumers

                    → Consumer 1
Queue → Dispatcher → Consumer 2
                    → Consumer 3

Each message processed by only one consumer
Natural horizontal scale

3. Fan-out

              → Queue A → Consumer A
Event → SNS → Queue B → Consumer B
              → Queue C → Consumer C

Same event processed by multiple systems

4. Aggregation

Events → Stream → Aggregator → Batch → Destination

Groups small events into larger batches
Reduces I/O overhead
def aggregate(events, window_seconds=10, max_batch=1000):
    batch = []
    window_start = time.time()

    for event in events:
        batch.append(event)

        if len(batch) >= max_batch or time.time() - window_start > window_seconds:
            yield batch
            batch = []
            window_start = time.time()

Essential Metrics

For Queues

# Queue size
queue_messages_ready
queue_messages_unacked

# Processing rate
messages_published_per_second
messages_consumed_per_second

# Latency
message_age_seconds
processing_time_seconds

For Streams

# Consumer lag (most important!)
kafka_consumer_group_lag

# Throughput
records_consumed_per_second
records_produced_per_second

# Partition health
under_replicated_partitions
offline_partitions

Alerts

# Queue growing
- alert: QueueBacklog
  expr: queue_messages_ready > 10000
  for: 5m

# High consumer lag
- alert: ConsumerLag
  expr: kafka_consumer_group_lag > 100000
  for: 5m

# Old messages
- alert: OldMessages
  expr: message_age_seconds > 300
  for: 2m

When to Use Each

Use Queues when:

✓ Task processing (jobs)
✓ Work that needs to be done "exactly once"
✓ Don't need replay
✓ Order not critical (or per consumer)
✓ Moderate volume

Use Streams when:

✓ Events that multiple systems consume
✓ Need replay (reprocess history)
✓ High throughput
✓ Order is critical per entity
✓ Event sourcing
✓ Real-time analytics

Use Both:

Events → Kafka → Consumer → SQS → Workers
                     ↓
              (transformation)

Stream for raw events
Queue for derived tasks

Conclusion

Queues and streams are complementary tools:

Queues Streams
Tasks Events
Consumed = deleted Retained
One consumer Multiple consumers
Simple Complex

For performance:

  1. Size correctly: partitions/shards based on throughput
  2. Batch operations: reduce I/O overhead
  3. Monitor lag: most important metric in both
  4. Always DLQ: don't let poisoned messages block system
  5. Test under load: behavior changes drastically at scale

The choice between queue and stream is not technical, it's conceptual. What are you modeling: a task or a fact?

queuesstreamskafkarabbitmqmessaging

Want to understand your platform's limits?

Contact us for a performance assessment.

Contact Us