#4 Distributed message queues
Nice — let’s design a distributed message queue (think Kafka / Pulsar / SQS-style system) end-to-end. I’ll cover scope, functional + non-functional requirements, APIs, the high-level architecture, core algorithms (partitioning, replication, delivery semantics), failure handling, scaling, operational concerns, trade-offs and a short evolution path. You can use this as an interview answer or an implementation blueprint.
Problem statement
Build a fault-tolerant, highly-available distributed message queue that lets producers publish messages and consumers read them reliably. Key capabilities:
High throughput for many producers/consumers
Horizontal scalability (by partitioning)
Durable storage and configurable retention
Consumer groups & load-balanced delivery
Exactly-once or at-least-once semantics where required
Low latency for publish/subscribe patterns
Functional requirements
Produce messages to a topic (appended to end of topic/partition).
Consume messages by offset (pull) and/or push (server push).
Topic/partition model: topics can be partitioned for concurrency.
Consumer groups: distribute partitions among group members (each message to one consumer in group).
Durability: persisted messages survive node crashes until retention/compaction.
Retention & compaction: time/size-based retention; optional log compaction by key.
Ordering: ordering guaranteed within a partition.
Acks & delivery semantics: support at-most-once, at-least-once, and exactly-once (with idempotence/transactions).
Administrative APIs: create/delete topics, set retention, adjust partitions, view offsets.
Monitoring & metrics: throughput, lag, broker health, storage usage.
Non-functional requirements
Throughput: millions of msgs/sec aggregate (configurable).
Latency: low publish-to-deliver latency (tens–hundreds ms).
Availability: 99.9%+ for reads/writes; graceful degradation when components fail.
Scalability: linear scale by adding broker nodes and partitions.
Durability: configurable replication factor (e.g., 3).
Security: TLS, authN/authZ, per-topic ACLs.
Operational simplicity: easy rebalancing, safe node replacement.
External APIs (surface area)
REST/gRPC or binary protocol for performance.
Producer:
Produce(topic, partition?, messages[], acks=leader|quorum|all) -> ack_offsets[]
CreateProducer(client_id, config)
Consumer:
Fetch(topic, partition, offset, max_bytes) -> messages[]
CommitOffset(consumer_group, topic, partition, offset) (or auto-commit)
JoinGroup(consumer_group, client_id) -> assigned_partitions[] (for group coordination)
Optional push: Subscribe(topic, consumer_group, callback_url)
Admin:
CreateTopic(name, partitions, retention_ms, replication_factor)
DescribeTopic(name), AlterTopic(...), GetLag(consumer_group)
High-level architecture
Components:
Brokers: store partition logs, serve produce/fetch requests.
Controller / Cluster Manager: manages metadata (topic → partitions → leader mapping), partition creation, leader elections.
Metadata Store (coordinator): strongly consistent store (e.g., ZooKeeper, Raft) to store cluster state and coordinate leader election.
Storage Layer: append-only logs on local disk per partition, segmented files for efficient deletion/compaction.
Replicator: mechanism to copy segment data from leader to followers (pull or push).
Producer/Consumer Clients: client libraries implement batching, retries, idempotence, and group coordination.
Coordinator service for consumer groups: elects partition assignments (range/round-robin/sticky).
Monitoring and Admin UI.
Core data model & storage
Topic → N partitions. Each partition = ordered append-only log.
Messages appended with monotonically increasing offset (per-partition).
Log segmented into fixed-size files (e.g., 1 GB or time window). Old segments deleted per retention.
Each message stored with metadata: timestamp, key (optional), headers, payload, sequence id (for idempotence), checksum.
Partitioning & ordering
Partition selection: keyed (hash(key) mod P) for ordering by key, or round-robin for load balancing.
Ordering guarantee is per-partition only.
Replication & consistency
Each partition has replicas (replication factor R). One replica is leader; others are followers.
Write path:
Producer sends to leader.
Leader appends to local log, returns ack depending on acks config:
acks=0: no ack (low latency, at-most-once).
acks=1 (leader): leader ack after durable append to leader.
acks=all or acks=quorum: ack after followers replicate (sync) → stronger durability.
Use leader-based replication with ISR (in-sync replicas) tracked by controller.
For durability, require acks=all with quorum to tolerate leader failover without data loss.
Consumer groups & offset management
Consumers in a group get disjoint partitions to process.
Offsets committed by consumer (synchronously or asynchronously). Two models:
Consumer-managed offset: stored in a special internal topic (durable, replicated).
Broker-managed offset: broker stores offsets per group.
On rebalance, the controller coordinates partition reassignment.
Delivery semantics & exactly-once
At-most-once: producer not retried, consumer auto-commits before processing.
At-least-once: consumer processes and then commits; duplicates possible.
Exactly-once (EOS): Achieved with idempotent producers + transactions:
Producers assign a unique producer id (PID) and monotonic sequence numbers per partition. Broker rejects out-of-order duplicates.
Transactional writes across partitions/topics: two-phase commit-like protocol; transactional markers written to log allow consumers to read only committed transactions (Kafka’s transactional API).
This is more complex and has throughput/cost trade-offs.
Rebalance & failure handling
Leader failure: controller detects leader death, picks new leader from ISR (must be in-sync).
Replica catch-up: followers fetch segments; once catch-up, rejoin ISR.
Consumer rebalance: controller/coordinator triggers reassignments; use “sticky” assignment to reduce movement.
Backpressure: Brokers limit producer throughput; clients implement retries with exponential backoff; producers should batch.
Compaction & retention
Time/size-based retention: delete old segments after retention window or size threshold.
Log compaction: retain only latest message per key (useful for changelogs/state stores).
Implement compaction as background maintenance per partition segment.
Scaling & placement
Scale by partitions: add more partitions per topic and spread across brokers.
Storage scaling: add brokers and rebalance partitions.
Hot partitions: if skewed keys cause hotspots, have partitioning strategies (salting, more partitions for high-volume topics).
Multi-tenancy: quotas per tenant/topic on bytes/sec and storage.
Metrics & observability
Broker: CPU, disk I/O, free disk, replication lag, ISR size, network throughput.
Topic: throughput (msg/sec, bytes/sec), consumer lag per partition, segment count, retention pressure.
Alerts: replication lag > threshold, high unclean leader election frequency, disk filling.
Security
TLS encryption for client-broker and inter-broker comms.
Authentication: mTLS, OAuth bearer tokens.
Authorization: ACLs per topic/consumer group (produce/consume/create).
Auditing: admin actions, topic changes.
Back-of-the-envelope capacities (example)
Assume: 1M msgs/sec aggregate, avg msg 1 KB → 1 GB/sec ≈ 86.4 TB/day raw.
With replication factor 3 → ~259 TB/day. (So design for compaction, retention policy, compression.)
If each broker has 4 TB SSD, need ~65 brokers to store one day of data; real systems compress and use larger drives.
Trade-offs & choices
Leader-based replication (simpler) vs consensus-per-write (Raft per partition): leader-based gives good throughput but relies on separate controller; Raft gives stronger consistency per-partition at cost of ops complexity.
Sync replication (acks=all) provides durability at cost of latency.
Exactly-once semantics adds complexity & throughput overhead.
Many partitions increases parallelism but increases metadata churn and controller load.
Evolution path
MVP: single-controller + brokers, leader-based replication, partitioned topics, simple offset commits.
Add consumer group coordinator, admin APIs, monitoring.
Add idempotent producers & transactions for EOS.
Introduce tiered storage (hot SSD + cold object store) to reduce cost.
Multi-cluster geo-replication for global reads/writes.
Example simple RPC flow (produce)
Producer batches M messages → choose partition P (by key/hash).
Send Produce(topic,P,batch, acks=all) to leader L.
L append to local segment, fsync (or async depending on durability), assign offsets.
L replicate to followers; when quorum acks, L responds to producer with offsets.
Final notes (interview tips)
Start by clarifying required delivery semantics and scale (throughput & retention) — these drive choices (replication factor, storage layout, whether to support EOS).
Emphasize partitioning (scaling) and replicas (durability).
Discuss how consumer groups and offsets work; show you understand rebalances and failure scenarios.
Be ready to defend trade-offs: latency vs durability, complexity vs features.
Want a time-boxed 1-hour script for presenting this in an interview (what to say in each 5–10 minute block) or pseudocode for producer/replicator logic?
Last updated