System Design Interview — Study Notes XII — Designing Data Intensive Applications Book Notes
Notes on Concepts & Components to be used in a System Design Interview
Chapter 11 — Stream Processing
To reduce the delay (as in batch processes), we can run the processing more frequently — processing a second’s worth of data at the end of every second — or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. That is the idea behind stream processing.
Transmitting Event Streams
In a stream processing context, a record is known as an event — a small, self-contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock.
It might be an action that a user took, such as viewing a page or making a purchase, it might originate from a machine, such as a periodic measurement from a temperature sensor, or a CPU utilization metric. Each line of the web server log is an event.
An event may be encoded as a text string, or JSON, or perhaps in some binary form. This encoding allows you to store an event as well as send the event over the network.
An event is generated once by a producer (publisher, sender) and then potentially processed by multiple consumers (subscribers, recipients). Related events are usually grouped together into a topic or stream.
The more often you poll, the higher the overheads become. It is better for consumers to be notified when new events appear.
Databases have triggers but they are very limited in what they can do; this kind of notification mechanism have been an afterthought in database design. Specialized tools have been developed for the purpose of delivering event notifications.
For notifying consumers about new events — use a messaging system.
A messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.
1. What happens if the producers send messages faster than the consumers can process them? drop, buffer or apply backpressure (flow control) — blocking the producer from sending more messages. Unix pipes and TCP also use backpressure.
2. What happens if the nodes crash or temporarily go offline — are any messages lost?
Direct (network communication) messaging from producers to consumers
- UDP multicast — widely used in the financial industry for streams such as stock market feeds. UDP can recover lost packets.
- Some brokerless messaging libraries implement publish/subscribe messaging over TCP or IP multicast.
- If the consumer exposes a service on the network, producers can make a direct HTTP or RPC request to push messages to the consumer — webhooks.
All these require the application code to be aware of the possibility of message loss.
Send messages via a message broker (message queue) — a kind of database that is optimized for handling message streams. It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker. Faced with slow consumers, unbounded queueing is allowed. Consumers are generally asynchronous — only waits for the broker to confirm that it has buffered the message and does not wait for the message to be processed by consumers.
AMQP/JMS-style message broker:
- The broker assigns individual messages to consumers.
- Consumers acknowledge individual messages when they have been successfully processed.
- Messages are deleted from the broker once they have been acknowledged.
- The exact order of message processing is not important and where there is no need to go back and read old messages again after they have been processed.
Log-based message broker:
- The broker assigns all messages in a partition to the same consumer node, and always delivers messages in the same order.
- Parallelism is achieved through partitioning, and consumers track their process by checkpointing the offset of the last message they have processed.
- The broker retains messages on disk, so it is possible to jump back and reread old messages if necessary.
The log-based approach has similarities to the replication logs found in databases and log- structured storage engines. This approach is appropriate for stream processing systems that consume input streams and generate derived state or derived output streams.
Message brokers compared to databases
Some message brokers can even participate in two-phase commit protocols using XA or JTA.
Differences between message brokers and databases:
- Most message brokers automatically delete a message when it has been successfully delivered to its consumers
- Database often support secondary indexes while message brokers often support some way of subscribing to a subset of topics matching some pattern.
Traditional message brokers, encapsulated in standards like JMS and AMQP and implemented in software like RabbitMQ, ActiveMQ, Google Cloud Pub/Sub.
Each message is delivered to one of the consumers. In AMQP, you can implement load balancing by having multiple clients consuming from the same queue, and in JMS it is called a shared subscription.
Each message is delivered to all of the consumers — topic subscriptions in JMS, and exchange bindings in AMQP.
The two patterns can be combined but within each group only one of the nodes receives each message.
Acknowledgements and redelivery
Message brokers use acknowledgements — a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.
Message actually was fully processed, but the acknowledgement was lost — requires an atomic commit protocol. When combined with load balancing, redelivery behaviour leads to messages being reordered. To avoid this issue, you can use a separate queue per consumer (not use the load balancing feature).
Message brokers are built around a transient messaging mindset.
AMQP/JMS-style messaging — receiving a message is destructive if the acknowledgement causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.
If you add a new consumer to a messaging system, it typically only starts receiving messages sent after the time it was registered; any prior messages are already gone and cannot be recovered.
Using logs for message storage
log — an append-only sequence of records on disk. The log can be partitioned. Each partition a separate log that can be read and written independently from other partitions. A topic can then be defined as a group of partitions that all carry messages of the same type.
Within each partition, the broker assigns a monotonically increasing sequence number — offset, to every message. Because a partition is append-only, messages within a partition are totally ordered.
Apache Kafka, Amazon Kinesis Streams, and Twitter’s DistributedLog are log-based message brokers that work like this. Even though these message brokers write all messages to disk, they are able to achieve throughput of millions of messages per second by partitioning across multiple machines, and fault tolerance by replicating messages.
Logs compared to traditional messaging
The log-based approach trivially supports fan-out messaging. Consumers can read independently and reading does not delete the message from the log. To achieve load balancing across a group of consumer clients, the broker can assign entire partitions to nodes in the consumer group.
Each client then consumes all the messages in the partitions it has been assigned, reading sequentially, in a straight-forward single-threaded manner.
- Messages within the same partition are delivered to the same node — same number of nodes with the number of log partitions in that topic.
- If a single message is slow to process — blocking
For expensive to process messages and where message ordering is not so important — parallelize processing — JMS/AMQP style of message broker is preferable.
For fast to process messages and where the message ordering is important and there is high message throughput, the log-based approach works well.
offset — very similar to the log sequence number that is commonly found in single-leader database replication.
The message broker behaves like a leader database, and the consumer like a follower.
Disk space usage
The log is actually divided into segments, and from time to time old segments are deleted or moved to archive storage.
The log implements a bounded-size buffer that discards old messages when it gets full — circular buffer or ring buffer.
In practice, the log can typically keep a buffer of several days’ or even weeks’ worth of messages.
When consumers cannot keep up with producers
The log-based approach is a form of buffering with a large but fixed-size buffer (limited by the available disk space).
You can monitor how far a consumer is behind the head of the log, and raise an alert if it falls behind significantly. Only that consumer is affected; it does not disrupt the service for other consumers.
Replaying old messages
AMPQ and JMS style message brokers, processing and acknowledging messages is a destructive operation, since it causes the messages to be deleted on the broker.
The offset is under the consumer’s control, so it can easily be manipulated if necessary.
Databases and Streams
replication log — a stream of database write events, produced by the leader as it processes transactions. The followers apply that stream of writes to their own copy of the database and thus end up with an accurate copy of the same data. — the state machine replication principle.
Keeping Systems in Sync
No single system that can satisfy all data storage, querying, and processing needs.
As the same or related data appears in several different places, they need to be kept in sync with one another: if an item is updated in the database, it also needs to be updated in the cache, search indexes, and data warehouse. Data warehouses’ synchronization is performed by ETL processes — often taking a full copy of the database. If periodic full database dumps are too slow — dual writes — first writing to the database, then updating the search index, then invalidating the cache entries.
race condition; client1 wants to set the value to A, and client2 wants to set it to B.
Ensuring that they either both succeed or both fail is a case of the atomic commit problem, which is expensive to solve.
There isn’t a single leader; the database may have a leader and the search index may have a leader, but neither follows the other, and so conflicts can occur.
Make the search index a follower of the database.
Change Data Capture
change data capture (CDC) — the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.
You can capture the changes in a database and continually apply the same changes to a search index. If the log changes is applied in the same order, you can expect the data in the search index to match the data in the database.
Implementing change data capture
derived data systems = log consumers
A log-based message broker is well suited for transporting the change events from the source database, since it preserves the ordering of messages (avoiding the reordering issue).
Database triggers can be used to implement change data capture by registering triggers that observe all changes to data tables and add corresponding entries to a changelog table — significant performance overheads. Parsing the replication log — a more robust approach; challenges such as handling schema changes.
LinkedIn’s Databus, Facebook’s Wormhole, Yahoo’s Sherpa. Bottled Water implements CDC for PostgreSQL (decodes the write-ahead log). Maxwell and Debezium for MySQL (parsing the bin log). Mongoriver reads the MongoDB oplog. GoldenGate for Oracle.
Like message brokers, change data capture is usually asynchronous: the system of record database does not wait for the change to be applied to consumers before committing it. Downside is that all issues of replication lag apply.
Building a new full-text index, you need to start with a consistent snapshot.
The snapshot of the database must correspond to a known position or offset in the change log, so that you know at which point to start applying changes after the snapshot has been processed. Some CDC tools integrate this snapshot facility.
Log-structured storage engines periodically look for log records with the same key, throws away any duplicates, and keeps only the most recent update for each key — in the background.
An update with a special null value — a tombstone indicates that a key was deleted.
The same idea works in the context of log-based message brokers and change data capture. If the CDC system is set up such that every change has a primary key, and every update for a key replaces the previous value for that key, then it’s sufficient to keep just the most recent write for a particular key.
To rebuild a derived system such as a search index, you can start a new consumer from offset 0 of the log-compacted topic, and sequentially scan over all messages in the log.
This log compaction feature is supported by Apache Kafka. It allows the message broker to be used for durable storage, not just for transient messaging.
API support for change streams
RethinkDB allows queries to subscribe to notifications when the results of a query change.
Firebase and CouchDB provide data synchronization based on a change feed that is also made available to applications.
Meteor uses the MongoDB oplog to subscribe to data changes and update the user interface.
VoltDB allows transactions to continuously export data from a database in the form of a stream.
Kafka Connect is an effort to integrate change data capture tools for a wide range of database systems with Kafka.
event sourcing — a technique that was developed in the domain-driven design (DDD) community.
Event sourcing involves storing all changes to the application state as a log of change events.
- In change data capture, the application uses the database in a mutable way, updating and deleting records at will. The log of changes is extracted from the database at a low level.
- In event sourcing, the application logic is explicitly built on the basis of immutable events that are written to an event log. In this case, the event store is append-only, and updates or deletes are discouraged or prohibited. Events are designed to reflect things that happened at the application level, rather than low-level state changes.
Event sourcing — record the user’s actions as immutable events, helps with debugging for finding the fact why something happened; storing an event “student cancelled their course enrollment” rather than “one entry was deleted from the enrollments table, and one cancellation reason was added to the student feedback table”.
The approach is independent of any particular tool.
Deriving current state from the event log
- A CDC event for the update of a record contains the newest version of the record — the current value for a primary key is entirely determined by the most recent event for that primary key, and log compaction can discard previous events for the same key.
- With event sourcing — an event expresses the intent of a user action, not the mechanics of the state update that occurred as a result of the action. Later events do not override prior events, you need the full history of events to reconstruct the final state. Log compaction is not possible.
Commands and events
The event sourcing philosophy is careful to distinguish between events and commands. When a request from a user first arrives, it is initially a command. The application must first validate that it can execute the command. The command is accepted, it becomes an event, which is durable and immutable.
When the event is generated, it becomes a fact.
A consumer of the event stream is not allowed to reject an event.
Reserving a seat — two events; first a tentative reservation, then a separate confirmation event. The split allows the validation to take place in an asynchronous process.
State, Streams, and Immutability
This principle of immutability is also what makes event sourcing and change data capture so powerful.
Whenever you have state that changes, that state is the result of the events that mutated it over time.
The log of all changes — changelog — represents the evolution of state over time.
Deriving several views from the same event log
Kafka Connect sinks can export data from Kafka to various different databases and indexed.
Translation step from an event log to a database. You can use the event log to build a separate read-optimized view for a new feature that presents your existing data in some new way.
Running old and new systems side by side is easier than a migration.
You gain a lot of flexibility by separating the form in which data is written from the form it is read, and by allowing several different read views. This idea is sometimes known as command query responsibility segregation (CQRS).
Perform the updates of the read view synchronously with appending the event to the log. This requires a transaction to combine the writes into an atomic unit.
Deriving the current state from an event log also simplifies some aspects of concurrency control — a single user action requiring data to be changed in several different places. With event sourcing, you can design an event that is a self-contained description of a user action.
If the event log and the application state are partitioned in the same way, a straightforward single-threaded log consumer needs no concurrency control for writes.
Limitations of immutability
Version control systems such as Git, Mercurial, and Fossil also rely on immutable data to preserve version history of files.
Privacy regulations may require deleting a user’s personal information after they close their account, data protection legislation.
You actually want to rewrite history and pretend the data was never written in the first place — excision, shunning.
Truly deleting data is surprisingly hard — storage engines, filesystems, and SSDs often write to a new location rather than overwriting, backups are immutable to prevent accidental deletion or corruption. Deletion is more a matter of “making it harder to retrieve the data” than actually “making it impossible to retrieve the data”.
Streams come from user activity events, sensors, and writes to databases, and streams are transported through direct messaging, via message brokers, and in event logs.
- You can take the data in the events and write it to a database, cache, search index, or similar storage system— consumer is the storage system.
- Sending email alerts or push notifications, or by streaming the events to a real-time dashboard where they are visualized — consumer is a human.
Piece of code for processing streams to produce other, derived systems — an operator or a job.
Partitioning and parallelization in stream processors — similar to those in MapReduce and the dataflow engines. Basic mapping operations such as transforming and filtering records also work the same.
Uses of Stream Processing
- Fraud detection systems
- Trading systems — price changes in a financial market
- Monitoring in manufacturing systems
- Military and intelligence systems
These require pattern matching and correlation.
Complex event processing (CEP) — searching for certain event patterns.
CEP allows you to specify rules to search for certain patterns of events in a stream. Internally maintains a state machine that performs the required matching. A complex event is emitted when a match is found.
CEP engines — queries are stored long-term, and events from the input streams continuously flow past them in search of a query that matches an event pattern.
Aggregations and statistical metrics over a large number of events.
- Measuring the rate of some type of event (how often it occurs per time interval)
- Calculating the rolling average of a value over some time period
- Comparing current statistics to previous time intervals (e.g., to detect trends or to alert on metrics that are unusually high or low compared to the same time last week)
The time interval over which you aggregate is known as a window.
Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters.
Many open source distributed stream processing frameworks are designed with analytics in mind: for example, Apache Storm, Spark Streaming, Flink, Concord, Samza, Kafka Streams, Google Cloud Dataflow, Azure Stream Analytics.
Maintaining materialized views
Up to date with a source database — maintaining materialized view.
Search on streams
Besides CEP, which allows searching for patterns consisting of multiple events, there is also sometimes a need to search for individual events based on complex criteria, such as full-text search queries.
For example, media monitoring services subscribe to feeds of news articles and broadcasts from media outlets, and search for any news mentioning companies, products, or topics of interest.
Users of real estate websites can ask to be notified when a new property matching their search criteria appears on the market. The percolator feature of Elasticsearch is one option for implementing this kind of stream search.
The queries are stored, and document run past the queries, like in CEP.
It is possible to index the queries.
Message passing and RPC
- Stream processing is primarily a data management technique.
- Event logs are durable and multi-subscriber.
Apache Storm has a feature called distributed RPC, which allows user queries to be farmed out to a set of nodes that also process event streams; these queries are then interleaved with events from the input streams, and results can be aggregated and sent back to the user.
Reasoning About Time
Analytics purposes, which frequently use time windows such as “the average over the last five minutes”.
Many stream processing frameworks use the local system clock on the processing machine (the processing time) to determine windowing.
Event time versus processing time
Confusing event time and processing time leads to bad data.
Knowing when you’re ready
You can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.
straggler events — that arrive after the window has already been declared complete.
You have two options:
- Ignore — track the number of dropped events as a metric, alert if you start dropping a significant amount of data.
- Publish a correction
Whose clock are you using, anyway?
The app may be used while the device is offline, in which case it will buffer events locally on the device and send them to a server when an in internet connection is next available — extremely delayed stragglers.
One approach is to log three timestamps:
- Event occurred — device clock
- Event was sent to the server — device clock
- Event was received by the server — server clock
Types of windows
Windows can be used for aggregations — to count events or calculate average, etc. Decide how windows over time periods should be defined:
Tumbling window — has a fixed length
Hopping window — has a fixed length, but also allows windows to overlap in order to provide some smoothing.
Sliding window — within some interval of each other. Keeping a buffer of events sorted by time and removing old events when they expire from the window.
Session window — no fixed duration. Grouping together all events for the same user that occur closely together in time. Window ends when the user has been inactive for some time. Sessionization is a common requirement for website analytics.
New events can appear anytime on a stream makes joins on streams more challenging than in batch jobs. Three different types of joins: stream-stream joins, stream-table joins, and table-table joins.
Stream-stream join (window join)
The join operator searches for related events that occur within some window of time. The two join inputs can be the same stream — a self-join.
Search feature — to calculate the click-through rate for each URL, you need to bring together the events for the search action and the click action, which are connected by having the same session ID. Similar analyses are needed in advertising systems.
A stream processor needs to maintain state — all the events that occurred in the last hour, indexed by session ID. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.
Stream-table join (stream enrichment)
One input stream consists of activity events, while the other is a database changelog. The changelog keeps a local copy of the database up to date.
The output is a stream of activity events in which the user ID has been augmented with profile information about the user — enriching the activity events with information from the database.
Load a copy of the database into the stream processor so that it can be queried locally without a network round-trip. The local copy of the database might be an in-memory hash table if it is small enough, or an index on the local disk.
The stream processor’s local copy of the database needs to be kept up to date. This issue can be solved by change data capture. The stream processor updates its local copy. Thus, we obtain a join between two streams: the activity events and the profile updates.
Table-table join (materialized view maintenance)
Both input streams are database changelogs. Every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between two tables.
We want a timeline cache: a kind of per-user “inbox” to which tweets are written as they are sent, so that the timeline is a single lookup.
Streams of events for tweets (sending and deleting) and for follow relationships (following and unfollowing). The stream process needs to maintain a database containing the set of followers for each user so that it knows which timelines need to be updated when a new tweet arrives.
The timelines are effectively a cache of the result of a select query, updated every time the underlying tables change.
Time-dependence of joins
If events on different streams happen around a similar time, in which order are they processed? Which activity events are joined with the old profile and which are joined with the new profile?
In data warehouses, slowly changing dimension (SCD) — often addressed by using a unique identifier for a particular version of the joined record. Log compaction is not possible, since all versions of the records in the table need to be retained.
Microbatching and checkpointing
microbatching — break the system into small blocks, and treat each block like a miniature batch process. It is used in Spark Streaming. The batch size is typically around one second.
It provides a tumbling window equal to the batch size (windowed by processing time, not even timestamps).
Apache Flink — periodically generate rolling checkpoints of state and write them to durable storage.
Atomic commit revisited
Exactly-once processing in the presence of faults.
Including email or push notifications, any database writes, any changes to operator state, and any acknowledgement of input messages including moving the consumer offset forward in a log-based message broker.
Those things either all need to happen atomically, or none of them must happen, but they should not go out of sync with each other.
idempotence — safely retried without taking effect twice.
An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once.
An operation can be made idempotent with a bit of extra metadata. When consuming Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value.
When failing over from one processing node to another, fencing may be required.
Rebuilding state after a failure
Keep state local to the stream processor, the new task can read the replicated state and resume processing without data loss.
Flink periodically captures snapshots of operator state and writes them to a durable storage such as HDFS; Samza and Kafka Streams replicate state changes by sending them to a dedicated Kafka topic with log compaction, similar to change data capture.
If the state consists of aggregations over a fairly short window, replay the input events corresponding to that window. A local replica of a database, maintained by change data capture — the database can also be rebuilt from the log-compacted change stream.