Skip to content

Notes of "Designing Data-Intensive Applications" by O'Reilly

notes of “Designing Data-Intensive Applications” by O’Reilly

Preface

  • business has large volume of data & traffic to deal with
  • business must be fast & reactive
  • open-source has large adoption
  • parallelism is only way to scale
  • IaaS is key component of any infra
  • maintenance is unacceptable
  • data-intensive app is app, where we deal with too much data, so it becomes bottleneck
  • every choice is trade-off
  • don’t do premature optimization regarding performance and scalability

Foundations of Data Systems

Reliable, Scalable & Maintainable

  • main problem of modern apps is volume, complexity & speed of change for data
  • common needs for data system:
    • have DB to store data
    • have cache to speed-up reads
    • have searching capabilities via search index
    • have async communication via messages & streaming
    • have batch processing of data
    • ---
    • often one system can cover several needs (but more often we combine several general-purpose components into single systems via application code)
  • common concerns for data system:
    • reliability - system should withstand system/human errors
      • app works as expected, tolerates load & human errors, system disallows abuse & unauthorized access
      • we mainly focus on fault tolerance (can system not fail if some part starts deviating from expected behavior)
        • proper error handling & chaos testing can help
      • faults:
        • hardware
          • add redundancy as back-up for each component
          • make it possible to swap your software between machines
            • makes it possible to do patches, rollouts & changes without downtime
        • software (often more disruptive, because we have copies of software all over the hardware)
          • be careful with assumptions about env
          • test
          • allow system to restart, monitor & measure
        • human errors - human operator OR human-written configs are error prone
          • build proper interfaces to intensify proper behavior (avoid too much restrictions)
          • provide testing envs
          • test
          • present data-roll back & deployment roll-outs
          • use metrics
          • train your people
      • we can cut corners here, when doing MVP/PoC, but be careful with it
    • scalability - system must be adaptable to higher loads
      • scalability is about how to coupe with different load scenarios
      • flow
        • measure (choose critical measurements for your system)
          • metrics are done measured in batches, because they can differ for same conditions
          • measure in percentiles to see what your users experience
            • p50 is often good metric, BUT p90+ allows to see more data about outliers
              • don’t focus on high percentiles too much, they often not worse it
          • always measure from client’s perspective
        • increase load (with additional resource, without additional resource given to system)
      • distribute work on write & read times
      • Latency and response time are often used synonymously, but they are not the same. The response time is what the client sees: besides the actual time to process the request (the service time), it includes network delays and queueing delays. Latency is the duration that a request is waiting to be handled—during which it is latent, awaiting service
      • SLOs & SLAs often define what service performance is expected
      • note that bad p90 is ok for low loaded service, but bad for high load, because more requests will hit bad values
      • note that if one operation requires several services, final result will highly damaged by slowest result (tail latency amplification)
      • note that slow operations in queue will slow down all consumers (head-of-line blocking)
      • percentiles must be calculated dynamically on dashboards (or at least approximated)
      • coping with load (try to rethink architecture, when load increases)
        • combine horizontal & vertical scaling
        • it is ok to have manually scaled system (if it has low load changes over time)
        • The architecture of systems that operate at large scale is usually highly specific to the application - there is no such thing as a generic, one-size-fits-all scalable architecture
        • An architecture that scales well for a particular application is built around assumptions of which operations will be common and which will be rare - the load parameters. If those assumptions turn out to be wrong, the engineering effort for scaling is at best wasted, and at worst counterproductive.
        • use common patterns for particular scenarios
    • maintainability - system must be easily changeable (most of cost is maintaining, not creating)
      • mainataine system to avoid legacy
      • design system with maintenance in mind
        • keep system easily operatable
          • operations involve: health & monitoring, restores & restarts, tracking problems, patching & updating, monitor how services affect each other, plan for future problem fixing, establishing platform tooling, complex migration, keep prod stable, writing docs
          • operations must be done easily so system should: be easily monitorable, integrate platform tooling, env agnostic, proper documentation & understandable execution flow, configurable (with sane defaults), self-healing, controllable
        • keep system newcomer friendly & simplistic (avoid big ball of mud, non-simple systems are slow to modification)
          • symptoms of complexity: explosion of the state space, tight coupling of modules, tangled dependencies, inconsistent naming and terminology, hacks aimed at solving performance problems, special-casing to work around issues elsewhere, and many more.
          • fix accidental complexity (such, that created by implementation)
          • use proper abstractions
        • keep system easy to evolve & adaptable for unexpected use-cases
          • constantly refactor
          • keep system agile

Data Models & Query Languages

data model is about how we think & represent our problem in code

  • can be layered (often to each layer hides complexity other other one)
    • each layer is connected via some abstraction
  • you need to choose proper data model for you app

relational model (SQL):

  • data organized in relations (tables), whose are unordered collection of tuples (rows)
  • relations for understandable interface
  • query language acts as abstraction over accessing data (thus it is easier, that manual access and allows for underlying optimizations)

NoSQL model (collection of non relational technologies)

  • driving forces:
    • need for higher scalability or write throughput
    • specialized querying operations
    • desire for less restrictions

object-relational mismatch - SQL must be mapped onto programing language’s objects

  • ORM (object-relational mapping) reduces boilerplate, but has it’s own downsides
  • using JSON reduces mismatch, but we now have problem of been schemaless

dictionaries

  • why to use :
    • consistent style and spelling across profiles
    • avoiding ambiguity (e.g., if there are several cities with the same name)
    • ease of updating - the name is stored in only one place, so it is easy to update across the board if it ever needs to be changed (e.g., change of a city name due to political events)
    • localization support - when the site is translated into other languages, the standardized lists can be localized, so the region and industry can be displayed in the viewer’s language
    • better search - e.g., a search for philanthropists in the state of Washington can match this profile, because the list of regions can encode the fact that Seattle is in Washington (which is not apparent from the string “Greater Seattle Area”)
    • deduplication of stored data
    • note that not always normalization is great, it has it’s own tradeoffs
  • having dictionaries forces 1toN relationship, that requires joins (by DB or via multiple queries & application code)

relational vs document DB

  • different fault tolerance
  • different concurrency handling
  • different data models (thus performance, schema flexibility, mapping, joins, relationship management)
    • document is useful for
      • not very deep tree-like 1toN data
      • schemales data (no explicit schema, force by DB)
        • relational can handle it via often migrations OR JSONB type
      • cases when you use entire document (ex: render whole document on single page)
        • performance boost & no need for aggregation
        • if you often need only part of data omit such data model OR use small documents
        • in some relational DBs you can describe locality to boost performance
    • relational is useful for
      • NtoN data
      • join dependent data
        • you can simulate it by app, but it will be less efficient, that SQL

data querying

  • often don in declarative manner via som language like SQL
    • you avoid implementation details, keep API simple & allow for optimizations and parallelization
  • techniques to parallelize & optimize query execution can be also introduced from outside, like MapReduce pattern (that rarely present as API for querying in modern solutions)

graph-like model

  • common for complex NtoN data-sets
  • data consists of nodes, that connected by edges
    • nodes & ecges can have same OR different type/meaning
  • sub-models
    • property graph
      • each node has: ID, incoming/outgoing edges, properties
      • each edge has: ID, tail+head vertex, label, properties
      • has no restrictions on connections
      • we can traverse in any direction
      • easy to extend & evolve
      • great for complex queries, due to ease of traversions
        • you need to have abstract query language to simplify traversion & allow for optimization (travers from more optimized part of graph to least)
    • triple stores (conceptually same to property graph)
      • consist of subject (Jim) predicate (likes) object (bananas)
      • subject is always vertex, while object either other vertex OR primitive value, used to describe property of subject

Storage & Retrieval

  • fundamental job of any DB
  • you need to understand to it understand trade-offs of solutions (and their settings) to match your case

most basic engine can be viewed as log (append only sequence of records, stored in some file, that stores records and reads them from tale of file)

  • getting data is inefficient here, so we can utilize indexes
    • it is additional structure, derived from main data & used as kind of metadata for optimization purposes
    • it will often boost reads & downgrade writes
    • types:
      • hash index (same as hash map, but stored in memory & on disk, in form of snapshots) - common building block for more complex things
  • log file must be compacted over-time (process of removal of duplicated keys)
    • to avoid hitting large sizes of single file we need to break it into several fixed sizes & basically merge them into new, while compacting
  • to make it more usable you need to have: concurrency management, recovery mechanisms etc
  • performance gains:
    • sequential writes
    • easier concurrency & crash management
  • problems:
    • hash index must fit in-memory to be performant enough
    • range queries are hard

SSTables with LSM-Trees

  • Sorted String Tables (SSTables) is basically similar to log files, but we require to keep or keys sorted
    • advantages over log with hash index:
      • more efficient merging (like mergesort)
      • you need to keep partial hash index, because data is sorted and you can traverse, based on neighbors
        • we can compress this chunks before writing to trade-off CPU over I/O
    • as data structure you can use red-black trees or AVL trees, to write data in any order & read ordered (in-memory structures)
      • basically you write into memory for some time, then loadout tree as SSTable
      • we use simple log to track in-memory tree & restore it, in case of a crash
  • Log-Structured Merge-Tree (LSM-Tree) is basically built upon SSTables
    • problems:
      • querying unknown words is slow - use bloom filters

B-Trees (most common indexing data structure)

  • stores sorted key-value pairs (for range queries & fast lookups)
  • breaks storage into fixed-size blocks, that can refer in tree-like manner (but on disk)
    • this blocks can be traversed by their ranges to find needed value
    • number of branches is limited by storage
    • updates are quite fast, additions may require rebalancing/restructuring the tree
  • to keep it crash resilient we store all commands in write-ahead log
    • alternative is to copy modified chunks, instead of updating & keeping WAL
      • great for version control
  • locks need to be done to prevent concurrent writes
  • for performance tree is laid sequentially on disk
  • for faster scanning we can include more pointers
  • pointer can be abbreviated to reduce size

B-Trees vs LSM-Trees

  • read first vs write first
    • B-Trees wears down SSDs faster
    • LSM-Trees faster with HDD due to sequential writes nature
      • modern B-Trees can mitigate this
    • B-Trees have higher fragmentation thus take more storage
  • LSM problems
    • compaction takes down I/O & CPU performance
      • often negligible, but high percentiles are unpredictable
    • compactions can downgrade performance at scale
      • if you have improper config, compaction can be even slower, than incoming writes, thus failing out disk
    • key might be duplicated, thus it is impossible to bind locks to tree (unlike in B-Trees)

other indexing structures

  • for secondary index we can use both LSM & B -trees, BUT we need to account, that each key won’t be uniq, so we need either add row number to ID or store keys in batches
    • this index needed for faster joins
    • values are often stored in heap, while each secondary index just stores pointer to avoid data duplication
      • great for in-place updates, while suffer from heap-overflow (all indexes need to be rewritten to point to new heap)
      • alternative to heap is clustered index, where you just store values in-place to boost reads
        • will downgrade storage, writes & transaction integrity will require additional resource
  • to index multiple columns you can:
    • concatenate their keys (impossible to find one key in index this way, BUT great, when you logically need to query for several values at once)
  • fuzzy index for efficient fuzzy searches (can be done via: algorithms for distance search, ML, document clasifications)

it is easier to work with RAM, so some DBs are in-memory (first of all, because their not large enough)

  • often it is acceptable to loose data in such DBs, but it can be somewhat negated via logs on disk
    • great for backups & analysis
  • main advantages:
    • faster reads
    • faster writes (we don’t need to manage complex data structures on disk)
      • can be async for cost of durability
    • can be used to store complex data-structures

DBs often operate with transactions (several reads and writes, that not necessarily ACID)

  • note that analytics tasks are problematic with classic transactions, because we often query only some columns within huge number of rows with need for some aggregations & transformations
  • we can use one DB for transactions & analytics, but often analytic queries are quite costly, so we better be having some dedicated solutions like warehousing
    • data is dumped or streamed onto separate system in read-only format
    • we can do pre-transformation of data (extract transform load - ETL) OR post-transformation
  • we need to use specialized DSs for analytics data for performance
    • schemas
      • star - we have central analytics goal (fact, ex: sales) & tables (dimensions), that are referenced by central table for additional data
        • dates often can be encoded to store metadata, like: holidays
      • snowflake - variation of star, but we break dimensions into sub-dimensions to avoid data duplication and just reference (ex: products)
      • notes:
        • tables are often wide to include all comprehensive info
    • column based storage is used, because main use-case is to query only several out of 100+ columns, but in large amounts
      • unlike transactional DBs, where we query complete rows at once
      • all files have same ordering of data, so you don’t need to store keys
      • due to data been repetitive & contain some small number of distinct values we can use bitmaps & run-length encoding to compress data
        • some queries can even directly operate with compressed data
      • we can introduce sorting to speed-up reads (even sort several columns)
      • if you do data duplication you can also sort it in different ways for different queries
      • B-trees are ineffective here, so LSM is used
    • for aggregate operations we can create materialized views (copy of data with additional calculations backed in, which might even effect, how we store data)
      • downgrades writes
      • not flexible, BUT has great read performance

Encoding & Evolution

Data evolve (extended OR reformatted)

  • ideally system & updates should be both backwards & forwards compatible

data encoding

  • data can be either
    • in-memory (CPU optimized, allows for DSs)
    • encoded (changed to some self-contained format for transfer OR storage)
  • formats
    • lang-specific formats (encode DS to bytes)
      • tied to one lang
      • have security problem, because we can manipulate bites & initialize arbitrary objects
      • bad performance, bloated, has bad versioning
    • standardized (JSON, XML, binary)
      • They are widely known, widely supported, and almost as widely disliked

      • text based (XML, JSON, CSV)
        • human-readable & simple, widely adopted & often good enough
        • problems with: number encoding, binary encoding (base64 can save a bit), lack of schema, optimization problems (text is often larger then bytes)
          • CSV is separate beast, because it is too vague
      • binary
        • binary is often more optimized (even JSON can be changed to BSON to be more optimized)
        • great internally, because you don’t need to conform to standard
        • protobuf & similar to them formats have strict schema
          • we avoid encoding field names, by using numbers (as aliases) and referencing schema
            • aliases can never be reused
        • your code is easily forward compatible, because older code will just skip unknown data, while backwards compatibility allows for appending optional fields only (deletion has reverse concerns)
          • data types is risky, but possible to change, due to byte size diffs
          • you can evolve single to multi values in protobuf
        • some format like Avro allows for two schemas (reader & writer), that may differ, but must be compatible to do translation
          • writer schema can be stored directly with data, referenced or exchanged
          • works great for dynamically generated schemas, because we don’t need to manage aliases
        • for statically typed langs you need codegen to view data, BUT for dynamically typed or to just view the data you can convert it on fly to something operable / readable
        • schema is great docs & can be statically checked for breaking changes & inside programing language
  • data flow - how data transferred from one thing to other
    • via DBs - writer encodes data to DB and reader decodes it from
      • forward & backward compatibility is required
      • be careful with updates, you need to preserve unknown fields
      • for evolution you can: do migration, add defaults, add computed defaults
      • try to normalize schema & add analytically adapted data when archiving
    • via REST/RPC - common way to transfer data over the network
      • REST is common for web, but may be used anywhere (basically you app need to become client to connect to server)
        • allows for microservice architecture, where you chain requests+responses
          • each service can impose business rules & restrictions on it’s clients
          • each service must be owned by some team & evolve independently from other services
        • alternative to REST is SOAP, that is compatible, BUT not dependent on HTTP and uses custom XML format, based upon Web Services Description Language (WSDL)
          • allows for codegen
          • impossible to operate with without tooling
          • have many flavours
          • often more complex, than just REST
      • RPC
        • problems:
          • main problem that it seems like calling a fn, while been over the network
            • hides network layer under abstraction, while we still need to handle network problems
            • response may not been returned due to timeout or error
              • in general function can take a lot of time
            • has no idempotence
            • translation between languages is messy
          • more complex, harder to debug, problems with support, can’t be used as public API
      • versioning:
        • by number inside URL
        • by number inside HTTP Accept Header
        • by mapping client <-> version, if endpoint is authorized
    • via messaging
      • data is transferred over the network & managed by intermediate entity, called message broker
      • allows for async communication
        • note that this implies one way communication, HOWEVER we still can built quests in two ways
      • benefit:
        • improves availability by acting as buffer
        • auto-redirect messages to active processes
        • acts as API-GW
        • message duplication for several receivers
        • low coupling
      • actor model - model when we avoid dealing with concurrency, by working only with set of entities, that has message communication, BUT scheduled one at a time
        • this can be transformed to distributed actor model via network messaging
      • notes:
        • preserve unknown fields to compatibility
        • message is just bytes with any encoding on top

notes:

  • compatibility is important due to impossibility to force client update & requirement for rolling updates

Distributed Data

We need to distribute data over several machines for:

  • scalability
  • fault tolerance
  • latency reduction

it is possible to scale vertically OR have array of disks, but you often will hit bottlenecks, so distributing data might become a necessity

ways:

  • replication - data is copied
    • improves availability, latency & throughput
    • single leader
      • each node with data is replica
      • only one replica can do writes & tell other replicas via some stream of logs to do same action
        • this can be done in sync, by blocking the response from leader OR in async manner
          • sync can be quite slow, because some replicas can be under high load OR recover flor failure (thus need to reapply huge log)
          • for sync data is always in sync, thus leader can fail & we have no consistency issues
          • you can mix approaches, by having 1+ sync followers & other async, so data is always in sync, while keeping with normal speeds
            • sync followers can be rotated with async
        • to add follower you need to take consistent snapshot of leader, apply snapshot to follower, start applying changes from log after snapshot
      • follower replicas can be used for reads
      • recovery:
        • follower failure - follower keeps track of last operation, so it can re-apply needed operations from leader’s log
        • leader’s failure (failover) - some follower is promoted to leader with reconfigurations of other followers
          • can be detected by timeouts (need to be properly configured)
          • all nodes need to agree on new leader via consensus (often the most up-to-date node wins)
          • all nodes need to listen for writes from new leader (& ignore old one, if it reboots and thinks, that it is leader)
          • notes:
            • in async systems we often ignore failed writes, because we often can’t save them OR merge with new writes
            • data in system might become inconsistent, because new leader might have older state, then some cache etc
            • split brain might happen, when you have several leaders in single-leader system (meaning you don’t have any resolution of conflict writes)
              • ideally system must downgrade one leader, BUT not both
      • methods:
        • leader duplicates raw SQL strings
          • will basically work, BUT be aware that:
            • if query depends on data (WHERE smth), it must be executed in order as leader
            • if query has non-deterministic function (NOW()), they must be replaced by values from leader
            • if query has side-effects, they must be predictable
        • write-ahed log (WAL) - leader shares it’s log over the network
          • great way, but main problem is that this protocol is low level & often disallows version mismatch between replicas (thus zero downtime upgrades are impossible)
          • alternative is logical log, which is somewhat similar, but describes in higher level changes to each row
            • also such log can be used to share data internally
        • via triggers - we can execute, code to change our write in someway, before replicating it
          • quite complex & error prone, but great for filexibility reasons
      • replication lag - to allow for read-heavy workflows you need to have many follower & async replication, which will lead to replication lag inside the system due to eventual consistency
        • common problems:
          • data is not seen for some time after write
          • data appears inconsistently (first you see data, than you refresh & see no data, that you refresh & see unordered data)
        • to mitigate (not all solutions can work for multi-device setups due to local state):
          • read your write - user, who made a write, is guaranteed to read same data
            • great for info, like user profile, that only user can edit, so we always know what requests to route to leader
            • always read from reader data for short period of time to mitigate replication lag (suitable for small lags)
            • client remembers timestamp of write & requests only up-to-date data
              • query can be held, until replication lag is mitigated (suitable for small lags)
          • monotonic reads - to mitigate problem, when different requests are sourced from different replicas, you can assign replica per user
          • consistent prefix reads - to mitigate data been unordered, you can batch writes into single operation & write them in order
          • distributed transactions - execute distributed operations in transaction-like manner with guarantees from DB (not application code), that operation is fully done
    • multi-leader replication
      • each node acts as leader & also follower to other leaders
      • use-case (often redundant, due to complexity spike):
        • multi-datacenter setup (leader per datacenter)
          • in comparison with single-leader:
            • faster writes due to lower delay
            • higher fault tolerance, because each center is independent
            • higher network problems tolerance, because local network of center is more reliable
        • local-first applications (each app instance is basically a leader with it’s state, that later must be synced with “true” leader, app’s backend)
          • sub-variation is app that needs collaborative-editing support
            • to keep it simpler, you can force locking of documents and convert it to a single-leader situation
      • problems:
        • write conflicts between leaders (in single-leader case we resolve conflicts when writing, while in multi-leader we need to do it in async manner, if you will try to do it in-sync, you will mitigate all the benefits)
          • resolution algorithms
            • avoidance - if data can be only changed by on user, link user to fixed leader, that can’t change, so no conflict can occur
            • last write wins - identify writes via timestamp & save last one
            • some replicas always win
            • merge values (depends on business needs)
            • save all values and later ask user to resolve conflicts
            • use two-way or three-way automatic merges
          • DBs often allow to resolve conflicts:
            • on write - when conflict occurs you resolve it via some function automatically
            • on read - when data is read and conflict detected you resolve it via some function automatically or by prompting the user
            • ---
            • often stays at level of row, not transaction
          • conflicts can be just conflicting writes OR business conflicts, because some rule is broken
        • general problems when working with state & dirty fn
      • topologies - for 2+ reader cases we have to decide how to propagate changes:
        • all-to-all - common approach
          • less error prone, because replication won’t break if some (or even one) node fails
          • it is possible that several writes conflict in such way, that older writes will be processed later then newer, so we need to resolve it (ex: via version vectors)
        • star/tree
        • circular
        • ---
        • both star & tree need to keep track of applied changes to avoid re-application in case of duplication
    • leaderless - client is manually sending writes to all replicas OR to node, that relays it to several writers
      • writes will be unordered
      • if some replicas are unavailable we can ignore them (to get ok from operation, we need to have N number of ok responses from replicas)
        • this means, that some replicas can become stale, thus you need to also do several writes and resolve most recent via versioning
        • to fix stale replicas client can either:
          • write correct info, if read is stale (great for often read data)
          • have bg process to actualize data (replication lag is quite large)
        • to avoid stale reads we need to configure DB in such way, that it must achieve quorum, before processing operation, quorum is achieved when (n (replicas), w (write oks), r (read oks)):
          • n - odd
          • w + r > n
          • commonly w = r = (n+1) / 2, but we can reduce w OR r for write or read first cases
          • if we have lower number for w/r, then configured OR gives more then n, operation will fail
            • if node temporary fails, all operations may fail, so be careful with low numbers
            • alternative, for higher availability, is to allow w+r <= n, this way we might get stale data, but can operate with may failed nodes
          • edge-cases:
            • sloppy quorum
            • concurrent writes need to be resolved (note that timestamp ordering won’t work due to possible clock skew)
            • concurrent read & write will have undetermined behavior
            • in case of failed write, other successful writes need to be rolled back
            • if fresh node fails, it can become stale, failing quorum rule
            • many other shitty things
    • notes:
      • monitor health of DB, by checking data staleness thought nodes
        • easy for leader cases, because we have log, BUT problematic for leaderless
      • sloppy quorums - default quorum implies that w&r nodes must be amount some cluster, while sloppy allow storing data in different clusters
        • we need to sync data via hinted handoff, after connection to main cluster is restored
      • for leaderless setup, in case of multi-datacenter, you can have two n values, global and local, so you can mitigate latency via local quorum, which implies async sync process of data-centers
      • handling concurrent writes in leaderless setup
        • last write wins (to achieve it we can attach some form of timestamp to write, because we don’t have natural ordering)
          • durability is lost
        • concurrent means that both operations can happen in any order
          • if operations aren’t concurrent, we can built ordering from knowledge that one operation depends on other
          • time of occurring can’t tell that something is concurrent or not, because network, clock difference etc can result in different time, for operations that happened simultaneously
        • algorithm to handling concurrent requests (in leader setup)
          • all writes have uniq, incremented version number
          • client reads key+data, merges current & received state, sends key+data to server, write returns key+data
          • when server receives key it can overwrite it’s related data
          • when merging data you can either use last write or merge values as union (note that deleted values can’t be just removed, they need to be marked as removed via tombstone to avoid misinterpretation with not selected value)
          • to do it in leaderless setup you need to have second version number per replica (which, in combination with key version give version vesctor)
  • partitioning/sharding - splitting data into separate chunks
    • simple replication might not be sufficient for high-load cases OR when we just need to store too much data
      • potentially queries can be parallelized
      • replication is often used in combination with partitioning
    • commonly:
      • each piece of data is stored on one partition
      • each partition is DB, but system can operate on multiple partitions
    • partitioning must be properly balanced to distribute load across all nodes & avoid hot spots (ideally partition should rebalance itself)
      • simplest approach is random distribution, BUT you won’t be able to query data back
      • you can partition by ranges of data (alphabetic, numeric etc)
        • improper ranges may lead to distribution load only to one partition (ex: if you distribute time records per days, all load will go to only one partition)
        • great for range queries
      • partition by hash of a key (great distribution)
        • you also can use consistent hashing for better rebalancing
        • we can’t query by ranges
          • there are exceptions for compound keys (first part is hash, other parts are plain values, that can be range queried)
      • “celebrity problem” still can cause hot spots, so you may additionally partition “celebrities” separately from other entities OR place one “celebrity” in several partitions
        • if you splitting one entity, you now need to do additional queries among several partitions
    • it is problematic to partition DB with second indexes
      • ways:
        • by document (local index) - keep separate secondary index per partition
          • read will become expensive, because you need to query each partition
          • ideally to store values with single attribute only in one partition, but it is often impossible with several secondary indexes
        • by term (global index)
          • we have single secondary index, that partitioned similarly to main index and can be efficiently queried to retrieve data
          • this makes writes less efficient, because we may need to write to several partitions
          • it might have transactional integrity OR can be done in async manner
    • rebalancing - changing how data is partitioned
      • properties: evenly distribute load between set of nodes, zero downtime, it must be as fast as possible and avoid excessive data transfer
      • stratagies:
        • hash % N - worst strategy, because change in N has dramatic effect for final result, thus rebalancing is expensive
        • static number of partitions - has more partitions then nodes, so each node can hold several partitions
          • in case of transfer, each node can give some number of partitions to other node
          • we transfer only partitions
          • more powerful machines can handle more partitions
          • too high number is not efficient due to overhead, too low number can become bottleneck, because not all DBs allow to change it in the future
        • dynamic number of partitions - partitions are split and merged when meeting some thresholds
          • useful for key range partitions
          • each node can have multiple partitions & each partition can be transferred between them
          • we can pre-split data from the start to distribute the load
        • number of partitions by node - we have fixed number of partitions per node, which changes only by changing number of nodes
      • be careful with auto rebalancing, because, if done purely, it can self-ddos the system
        • it may be reasonable to have human in the rebalancing loop
    • routing - common service discovery problem
      • solutions:
        • client can call any node via simple load-balancer, node can accept or forward request to other node
        • use partition-aware load balancer to properly route requests
        • make client directly connect to any node and route by itself
      • other problem is to have consensus over where to route request
        • industry standard is ZooKeeper, that manages metadata of whole distributed system of nodes
        • alternative is gossip, so each node knows info about other nodes
        • also simple DNS or static configs (for no dynamic rebalancing) can be used

Transactions

  • transaction is a concept that groups several writes & reads into one operation
    • transactions can be retried & never execute partially
  • transactionality can be neglected or replaced with simpler concept to improve performance
    • it is all about tradeoffs
  • transactions are often described via:
    • Atomicity - operation can’t be broken into smaller parts, if part of it fails we roll back whole operation
    • Consistency - operation starts in valid state and ends in valid state (valid means all invariants are passed by data)
    • Isolation - same data can’t be simultaneously operated upon by different applications
      • if you need strong isolation you can use serializable flow (guaranteed to execute one-after-other)
      • for weaker guarantees snapshot isolation can be used
    • Durability - data is successfully preserved and stored
      • depending on context may mean: transferred to HDD/SSD OR replicated by number of nodes
    • white notice that different implementations may have different implications
  • multiple reads and writes are detected as same transaction via BEGIN TRANSACTION and COMMIT statements, that belong to single TCP connection
  • isolation and atomicity is also relevant to single write (write won’t be lost, partially done or interrupted via other write)
    • some DBs allow to do “compare and set” or “increment” operations as single write
    • such single writes aren’t real transactions, they are similar to them, because real transactions have more then one operation
  • transactions are needed to:
    • preserve relations between data
    • keeping denormalized data in sync
  • transactions can be safely retried, so this logic can be included into your client
    • note that successful transaction, that failed due to network can’t be retried and will cause duplication
    • failures due to DB overload will worsen overload
    • retries due to constraint failure is pointless
    • transactions with side-effects will trigger side-effect twice on retry
      • to mitigate it you can use two-phase commit
  • isolation can be weakened for some cases, where concurrency is less of a concern to gain performance, BUT be careful with that
    • read committed (most basic level)
      • you will only read committed (clear) data
        • DB stores old value to be read AND new, while transaction is ongoing
        • locking is too inefficient here
      • you will only overwrite committed (clear) data
        • locking is done on per-row level
      • problems:
        • one write may be done in-between two reads, resulting in incorrect result
          • critical when performing backups OR larges-scale queries
          • fixed via snapshot isolation, where you read only committed data for this snapshot, not the committed data of different snapshots
            • implemented via multi-version concurrency control mechanism to ensure data for each snapshot (avoids locking)
            • MVCC states that transaction can see data if it is not deleted AND the version of it that is created prior to transaction beginning
              • deleted data marked as deleted and later removed from DB via GC
        • write skew - read data and save to variable, verify condition, conditionally write
          • common fix is to lock rows FOR UPDATE, BUT it won’t help if we check for absence of data, because there is nothing to lock
            • to fix use serializable transactions OR materialize conflicts (revers DB structure, ex: store available slots, instead of storing reserved slots, so you have something to lock)
      • preventing lost updates - it is common problem to do concurrent read-modify-write operation without loosing data
        • implementations:
          • atomic write - read+write in single operation
            • done via exclusive locks OR single-threading writes
          • explicit locking (similar to DB locking, BUT on application level)
            • SQL allows it via adding FOR UPDATE to your query
          • automatic detection - DB automatically tries to detect lost update and abort offending operation
          • compare and set - check if current content is same as read content and only then update
            • may not work, depending on DB implementation
          • conflict resolution on application level
  • serializability - concept of strict transactions, that is equivalent to executing transactions one by one as series for high isolation level
    • DB prevents all race conditions
    • implementations (for single node):
      • actual series execution - literally execute transactions 1by1
        • long-running queries are still better be ran on snapshots
        • suffers from efficiency
        • sufficient only for stored procedure, meaning we first collect all needed data and only then execute transaction, not iteratively (because it will overwhelm the DB)
        • potentially you can scale DB on single CPU, by creating partition per core
          • note that you need to partition data to avoid multi-partition operations
        • in-memory data only
      • two-phase locking (2PL) - multiple reads can be done, while write is done exclusively to writes and reads
        • reading snapshot is not allowed here
        • basically a readers-writer exclusive mutex
        • lock is held in transaction lifespan
        • deadlocks are common, so DB has mechanisms to auto-kill one of the locking process (it must be retried by an application)
        • performance is pain here
          • one slow operation can influence others
          • often deadlocks drain performance
        • for some cases you need to have predicate locks to lock no on row level, but all rows that match some condition
          • fixes phantoms and write skews
          • to simplify checks you can utilize index-range locking - add lock per index entry instead of values (simplifies checks, locks additional values)
            • gains performance due to index speed benefits
      • serializable snapshot isolation (SSI) (new approach that provides serializability with quite low performance overhead)
        • based on optimistic approach (expect everything to go alright, abort if bad situation detected)
          • optimistic approach is commonly bad, because too many aborts and retries drain resources
            • atomic writes, performance overhead & not too much conflicts lead to great performance
        • reads are based on snapshots
        • writes are aborted on conflicts
          • if we used value from uncommitted MVCC and then it became committed (avoiding early aborts allows doing long running queries and read-only queries efficiently)
          • verify that read values, that used in transactions, wasn’t changed
        • performance:
          • great for read-heavy flows
          • predictable latency
          • has room for optimization in future
          • multi-core
        • problematic for long-running writes

Transactions in Distributed Systems

  • anything that can go wrong will go wrong given large-enough scale

  • failures

    • when working with monolith we expect system either to work OR doesn’t, BUT in distributed systems it might partially fail, bringing edge-case results
      • network, machines etc might fail us
    • failures in distributed system is mostly non-deterministic
    • problem space
      • most of the time we need the system to be online and serve users with low latency (expect something like batch processing units)
      • hardware & network will fail you
      • large-enough system will most of the time has at least one broken module
        • your system need to tolerate at least some partial failures
        • if you aren’t expecting errors, the behavior of software is unreliable
      • you might need to support hops between data centers
      • rolling updates often required
      • you need to build reliable system from unreliable components (common practice in SE space, like: TCP add corrections and reliability on top of IP)
    • problems with network:
      • delays, missing responses (due to several factors)
        • fixed via sane timeouts
      • some software (ex: load balancer) need to know if server is still alive, thus we need to have some way of knowing it (usually by pinging some endpoint with timeout)
        • when killing nodes remember:
          • don’t kill too often
          • kill is not free, you need to distribute existing load
          • some operations have side-effects, that might be duplicated on retries
        • timeout need to account network time (both ways) & processing time
          • requests also might get queued
          • all-in-all, timeout need to be figured experimentally and (ideally) set up dynamically
      • most of networks have unbounded delays, meaning that they try to serve data ASAP, but have no time limit to execute
        • some networks can establish fixed capacity to server data, but they won’t handle bursty traffic this way (resource utilization is potentially lower this way)
        • to have more reliable delays you need to have proper scheduling & limiters on clients
    • problems with clocks (we measure time for many reasons, we must be prepared to failure of clocks same as for network failures):
      • communication isn’t sync and takes time (we only know that message will be delivered in future)
      • clocks between machines aren’t 100% sync (they synced via Network Time Protocol, but it not ideal)
      • if we compare local value and external timestamp for any reason (ex: check if lease to be leader node is still active) we might get problems, when application checked that it is leader, but
        • din’t executed request in given by lease time
          • GC started and caused code to stop for some time
          • requests was too complex
          • VM suspended execution
          • user (on client) closed app
          • thread change on CPU
          • I/O & network latency
          • (all in all, system must have built-in guarantees to avoid freezes in execution, BUT it is often not the case in hardware, except specialized one)
        • clocks were out of sync
    • types of clocks:
      • time of day (measure time, often as a stamp in ms from unix zero time)
        • might not be in sync (often we could receive only approximate value of time, thus strategies like Last Write Wins might loose sequential data)
        • ignore leap seconds
        • resolution might be coarse
        • might jump back in time as result of sync with NTP
        • NTP server might be misconfigured
        • most systems don’t support leap seconds
        • VM’s clock can jump, because of virtualization
        • client’s clock can’t be trusted at all
      • monolithic (clock that used to calculate time diff between two points)
        • have no meaning on it self
        • often even bound to single CPU core
        • have great resolution
        • speed might be adjusted by NTP (but this won’t cause back in time jumps)
        • great for timeouts
    • living with broken clocks & system pauses
      • monitor clock drift
      • use providers with in-house atomic clocks (or similar instruments), that ensure low time drift & account for it, when commiting changes to DB, so sequential writes are properly ordered
      • treat node as temporary suspended right before GC kicks in
    • distributed system isn’t ideal, BUT we only need for it to be functional, meaning it must provide set of guarantees
      • majority holds the truth - to achieve agreement between nodes quorum is used (N number of nodes must agree on smth for it to be considered truth)
        • ex: alive node must agree that it is dead and do smth with it, if majority decided so (ex: due to delay)
        • common use-case is leader election, with it’s common pitfall, when some node thinks that it is the leader, while quorum re-elected leader
          • you can’t trust node that it is actual leader, you need to double check (ex: use increasing number to reject older operations(works only if node is always telling the truth))
            • to work around lie problem you need to build Byzantine fault-tolerant system, that assumes that some parties in system can lie (it only relevant in p2p & high risk systems, where hardware might get damaged, but still need to operate)
            • weak lies, like corruptions, can be handled via: checksum, sanitization & validation, use of multiple sources of truth
      • common system models
        • by time
          • sync - system can have delays and drifts with upper bound (unrealistic in scope of topic)
          • partially-sync - system is sync most of time, but we expect that something wrong might happen
          • async - we can’t make any time assumptions (used in specific cases only)
        • by fault-tolerance
          • crash-stop - node can only crash & never go back to life
          • crash-recovery - node may crash & start responding again (we assume to have stable storage & some loosable in-memory state)
          • arbitrary faults (Byzantine) - anything can go wrong, including node trying to compromise the system
        • we often design partially-sync crash-recovery systems
          • we design by creating algorithms, that have some defined properties
            • properties are either about safety (nothing bad ever happens to system) or liveness (we eventually recover from bad state)
          • note that most real world systems are Byzantine systems, BUT we simplify them to be crash-recovery for optimization reasons
  • consistency & consensus

    • the easiest solution is to fail on some internal failure, BUT better systems can tolerate some failures and operate upon them
      • often we achieve this via algorithms that provide guarantees & hide complexity
    • you can choose different consistency levels, provided via algorithms (some of them explained further)
      • always be aware, when working with weak guarantees
      • it is somewhat similar to transaction consistency, but with focus on fault tolerance
    • linearizability - we treat distributed system as single node (ex: we avoid pitfalls like replication lag)
      • basic example is: if value of data changed, it can’t flip to old one (happens due to distributed nature of replicated DBs) (it must be possible to arrange responses into valid linear sequence)
      • use-cases: leader election, single source of truth for state when different processes need to communicate in async-manner, uniqueness constraint (foreign key constraint can be implemented without linearizability)
      • only possible with consensus algorithm
      • problems:
        • in multi-datacenter setup, clients need to be able to connect to both data centers
          • basically your trade-off is between availability and consistency (for linearizability availabilty suffers from network delays)
      • linearizability enables ordering of operations, which is important, because some operations may depend on each other & specific order OR consumer might expect certain in certain order
        • weaker form, that allows for ordering, is causality ordering, which allows to compare some operations (via happens-before relationship), while some operations may be concurrent or just not comparable
          • if happens-before requirement is met, all replicas must execute operations in order, otherwise we don’t care
        • more practical implementation can be done via sequential number ordering, that is done via sequence number or timestamp (from logical clock)
          • always implies causality ordering in single-leader systems
          • for multi-leader system different approaches can be used:
            • physical clock timestamps with high resolution
            • sequential number per node WITH embedded uniq node id
            • allow only some ranges per node (A - 1-1000, b - 1001-2000)
            • all approaches prior aren’t imply causality, BUT Lamport’s Timestamp will, by adding uniq node ID and counter, that increased per operation and can be overwritten, if request contains higher value counter then current (client must also remember current counter and pass it with each request)
              • edge cases:
                • can be used to calculate Nth write wins strategy, BUT only after the fact all writes happened, we can’t do calculation in scope of current request
        • linearizability avoids passing and comparing timestamps between different parts of system
        • total order broadcast - related topic, that allows to order messages in distributed system
          • implies:
            • reliable delivery to each node (messages can be buffered in case of network fault OR operations can be held, depending on use-case)
            • all orders delivered in same order to all nodes
          • allows for consistent ordering of operations AND enable some constrains like uniqueness of key in distributed system
          • use-cases:
            • fencing token, leader selection, locks
          • differs from proper linearizability, because we have replication lag here (linearizability in writes, but not in reads)
            • reads can be fixed by: queuing reads as writes, reading all latest writes, read from leader
    • distributed transactions & consensus
      • use-cases of consensus: leader election, distributed transactions (including sync of secondary index)
      • most popular algorithm is two-phase commit, BUT it is not the best due to fault-tolerance issues and can be replaced by Raft or similar
      • 2PC
        • flow: receive transaction, ask if all nodes prepared, ask all nodes to commit if each node agreed on been prepared
        • flow notes:
          • all transactions are trackable via uniq transaction ID
          • node saying it is prepared means that it WILL commit (even if power goes off, because data already written to storage)
          • coordinator keeps log of it’s decisions
          • if decision was to commit coordinator will retry until it succeeds (if node crashes we need to restore it’s state via log)
          • failure of coordinator will result in system been temporarily stuck, until it restores and can continue operations (if operator misses transaction data in it’s log, transaction is automatically marked as aborted)
            • this is main pitfall of algorithm
            • note that failure in really broken state may require human intervention to roll-back changes & release locks
        • often safety doesn’t worst the cost of operational problems & performance penalty
        • problems:
          • coordinator is itself a DB that must be replicated for scalability and fault tolerance
            • also this adds additional state and it’s management to system
      • exactly-once message processing
        • alternative way to implement distributed transactions is by introducing messaging queue with retries and idempotency guarantees, so we eventually will have consistent state
          • be careful with idempotency in cases of side-effects (like email service)
        • supports heterogeneous systems (where we have different technologies in place)
          • 2PC also has this support via protocols like XA
      • more robust alternatives are built on top of total order broadcast with leader election
        • basically nodes elect leader, that can control decision making, BUT nodes can reject decision by quorum from old leader, if new leader is elected
      • consensus is great, but can’t be universally used due to:
        • performance penalty
        • fault-tolerance
        • often impossibility to dynamically change number of nodes
        • timeout dependency to detect failed leaders (even more performance downgrade)
        • operational problems
      • most straightforward way to (in low level) allocate working nodes, do service discovery, do leader election, do consensus, do rebalancing & keeping track of dead/alive nodes is to use membership service (ex: ZooKeeper)

Batch Processing

types of systems

  • online service - common web page, that requires fast response times and high availability
  • batch processing - service to process in async manner large amount of data, that requires great throughput
  • stream processing - near real-time system that works similar to batch processing, but does processing in real-time ASAP

batch processing with Unix

  • by Unix philosophy you can do batch processing by piping data from one command to other to do some transformations
    • this allows for ease of modification & composability
    • piping enable optimized large data processing
    • Unix utilizes both in-memory operations & I/O operations to avoid memory overflows
  • Unix pushes usage of uniform I/O interfaces for programs in order to establish composability
    • additionally I/O is mostly done via stdin & stdout concepts, which allows for loose coupling between programs (despite it’s beauty it can be hard to wire multiple connections)

MapReduce & distributed file systems

  • single MapReduce job can be viewed as single Unix process (jobs often chained into workflows, input is not mutated in-place (allows for reusability, safe retries and fast reverts))
    • it is quite low-level, but powerful and, combines Unix’s strong parts with distributed systems
    • practical implementation, that enables MapReduce is HDFS, which allows to execute distributed data processing over number of machines with durability (via replication) in mind
    • unlike Unix, MapReduce often utilizes schemas to store data
  • chaining is often done by saving previous result and running next job over it, which materializes intermediate state
    • benefits:
      • possibility to reuse state
      • ease of wiring different jobs
    • problems:
      • some state is not useful, thus don’t need to be materialized (it becomes even more overkill when we have high level of replication)
      • next job need to wait until previous is finished
  • when doing joins, you can either do 1by1 lookups (often not optimal), OR do denormalization (often wasteful) OR, for large enough dataset, simply do full table copy onto local machine
    • additionally you can do sort-merge joins, where you have two map pipelines that prepare and sort data, then map both results into single source, that sorted by both values and can be easily reduced
  • map-side joins is more optimized way to do join, which is basically just a map operation with no sorting
    • possible only if you know insights about data been processed
    • example
      • load one DB in memory (or index and keep on disk) as look-up table and join it with processed data
      • partition data in same places, so you can read single partition as source
      • if data is sorted map-side join can performe role of reduce
  • use-cases:
    • heavy analytic queries
    • building search indexes
    • ML/AI training
    • ---
    • often results need to be served on demand via server, so you need to build DB from output (you can do it realtime, but it is not optimal due to large volume of job results, SO it is better to build new DB in MapReduce system and then copy over for read-only queries to server)
  • somewhat predecessor to MapReduce based distributed system is massive parallel processing system, that was focused to provide DB, that can execute heavy analytical queries
    • MapReduce is more general and allows to:
      • execute divers programs
      • store data in different formats
      • store data in raw format before applying schema
    • MapReduce avoids data moving
    • MapReduce have less focus on in-memory data storage & more fault tolerance (note that alternative have less fault tolerance to avoid some CPU load, because often it is less of a requirement (unlike it was originally in Google))
  • data flow engine is alternative to map reduce system, it operates on concept of operators (simple jobs), that combined into workflow (unlike MapReduce’s workflow, this one can be viewed as single unit of work)
    • while workflow is single unit of work, engine still can partition input and break operators to be executed on different machines (this requires network transfer of files)
      • network transfer is often avoided as much as possible by engine
      • intermediate state storage is mostly avoided
        • engine can sometimes create intermediate state as checkpoints to do re-computation, if failure occurred
    • differences:
      • sorting often done in-place on demand
      • no map tasks, mapping is done as part of job on demand
      • data flow is pipe-like, no need to wait for previous operation finishing
    • notes:
      • operations should be deterministic to avoid cascading effect of re-computation on failure
  • MapReduce can be implemented to work with & process graph-structured data
    • often it is used in style of “execute until finished” jobs, which can be implemented in default MapReduce, but won’t be efficient (ex: we can’t do partial graph traversals)
    • it is often implemented by traversing each vertex by edges (do computation, hold some data, travers to next edge)
      • Pregel engine makes system fault tolerant by ensuring message been sent and processed only once & all messages will be delivered in scope if single iteration
        • has similar behavior to modern dataflow engines
      • it has significant performance penalty, due to impossibility (as for now) to properly co-locate graph in distributed system (if you can fit graph in single machine - just do it)
  • modern MapReduce based frameworks introduce high-level APIs
    • enables:
      • ease of use
      • possibility to introduce optimizations & do underlying algorithm migrations
      • domain focused APIs
  • notes:
    • output of mapper can optimally (and often will) be sorted, before reducing
    • often data is stored on machine and map/reduce task is copied their and executed to reduce data transfer
    • in classical implementation reducers are chosen based on hash of a key, BUT, to mitigate hot keys, such data can be split among several keys (this keys are ether listed beforehand OR calculated on fly)
      • this will cause reduce to be done in two stages

stream processing

  • similar to batch processing, but with unbound input, that processed in realtime
    • sort operation in MapReduce becomes impossible in prev form
  • most often data flow is constant, so doing batch processing per some period may be unwanted
  • stream sends data in form of small, self-contained objects, called events
    • event can be encoded in any format
    • event is often emitted by many publishers & consumed by many subscribers of topic (way of grouping events)
    • subscribers have direct channel to receive notifications about new events to avoid pooling
  • questions for system to solve
    • back pressure problem: drop events, queue (how to deal with queue growth), disallow new events
    • durability & throughput tradeoff
      • do we need to replicate and write on disk messages or they might disappear on crash/reboot
      • can we choose UDP (durability can be boosted via application level algorithms) or TCP is required
    • will pub/sub members communicate directly or via broker
      • direct communication may be faster, BUT implies risk of lost messages in case of crashes or network issues
      • broker is a form of DB to store & manage events
        • notes:
          • will delete processed events
          • might have performance degradation due to queue size increase
          • have simpler search mechanisms then classical DB
  • webhooks is a form of public streaming
  • consuming stream
    • types:
      • load balancer - queue acts as load balancer and distributes events to subs
        • may lead to unordered messages (can be fixed with additional queuing, because event ordering is often requirement of a system)
      • fan-out - queue send same event to all subs
    • consumer should acknowledge then event is processed, otherwise broker will retry with current or other consumer
      • note that atomic commit protocol must be used to avoid duplication in case of network faults (sub processed event, bud din’t send acknowledgement)
    • commonly previous events are deleted, so new consumers can read old events
      • to change that append-only log can be used
      • log can be distributed, BUT we will loose ordering between partitions
      • log is great for fan-out, BUT trickier to load-balance
        • one way is to do it in a way of 1 node to 1 partition, BUT this way we limited to number of partitions AND can have hot partition problem AND slow nodes will slow done whole partition
      • append-only log reduce acknowledgement overhead by keeping offset of each consumer
        • in it’s plain form this might cause processing same message second time, if offset wasn’t synced with broker before failure
      • log still won’t be infinite and deletion/archival of old values is allowed
        • to determine size you can alert on consumers that felt behind too much
        • this allows to safely delete consumer before queue (in other approaches, missing consumer might turn on safety mechanisms and lock the broker)
      • allows for easier recovery
  • modern application requires several tools, that each act as data source (ex: DB & warehouse) and all of them need to be synced
    • ways of syncing (all ways imply replication lag)
      • keep one source of truth and load it out from time to time in bulk to other systems
        • won’t scale after some size
        • isn’t realtime
      • write to all systems at once (multi-leader problem)
        • concurrency issues (writes in one system may be in other order then in other)
        • fault tolerance (write to one system may succeed and to other fail)
          • can be solved with atomic commits
      • change data capture - mechanism of streaming data changes from DB (leader, single source of truth) to other data storages
        • can be done via parsing change log (may be internal implementation of DB) OR via triggers (performance overhead & fragileness)
        • logs are truncated, so to set followers you need to start with proper snapshot
          • alternative is to use just log, if it is never truncated and compacted otherwise
            • this will also allow to use message broker as durable storage
  • storing data as timeline of immutable events is separate useful data model
    • it is commonly used in finance systems, BUT also can be beneficial for ease of modeling (event sourcing in DDD), observability (ex: for analytical reasons track all users actions), flexibility (same events can be used to construct different views of data), ease of migrations (potentially you can built new systems from logs and kill old one)
    • note that non-compacted logs must have snapshots in some form
    • concurrency is main issue
      • BUT it also simplifies multi-object writes, because one write can contain all needed info AND writes are done in order
    • problematic with data that frequently changes AND data that is under data retention policy
  • consuming streams
    • saving changes to some form of storage
    • push events to user (ex: emails, dashboards etc)
    • process stream(s) and output stream
      • often called operator/job
      • it is quite similar to MapReduce functionally & to Unix philosophically
        • except no possibility to restart from beginning, do sorting
      • implementations:
        • store queries/patterns/MLs to match against incoming events
          • similar to this is to do matching against aggregation of events
  • timing (same problems exist in butch processing)
    • events have embedded time on them, BUT it can and often will be different from time the event is been processed
    • timing diff is often negligible, but delays may cause problems (in last 5 minutes there were 10 events, but actually they all occurred 10 minutes prior)
      • you can log number of such events and drop them OR you can re-calculate, based on new info
      • publisher can notify that it won’t send new events in current window, but multiple publishers & publisher rotation is tricky this way
    • it is problematic to track true event time, because client-based events are dependent on client clock, that can be falsy
      • we can measure both timing on client & server and calc some meaningful value from it
    • window types:
      • tumbling - fixed size
      • hopping - fixed size, with fixed size overlap
      • sliding - contains all events in any period of fixed size
      • session - contains all related events to some app defined “session” (ex: auth session)
  • joins - sometimes we need to join data
    • types
      • stream-stream - data is joined from both streams by some factor in given window
        • often done by storing state in given window
      • stream-table - stream is enriched with existing data
        • similar to MapReduce, BUT we also need to listen fro DB changes and update local copy (if we have one)
      • table-table - data is stored in DBs & listened by stream
        • basically a result is series of caches for JOIN query (kind of materialized view)
    • main problem is that we need to keep track of historical data (ex: apply tax rate on moment of sale, when doing historical re-calculation)
      • if system allows we can keep joins non-deterministic
      • otherwise we need to embed ID or version tag that signifies what data version must be joined with this event
  • use-cases:
    • monitoring & alerting
    • view materialization
  • fault tolerance (similar to batch system, where you need to process event only once, but less straightforward)
    • if we don’t care about external side effects, we can do checkpoints per some time period to restore system’s state OR process fixed window of tasks as one batch and then output it to the world (if it was successful)
    • otherwise we can utilize atomic transactions between stream and external system
      • it is not so costly due to possibility to batch events in single transaction
      • alternative is to ensure idempotency in system (concurrency must be handled, some operations may not be idempotant SO metadata is used to fix it)
    • state is often kept in-memory & on device with additional replication
      • alternative is to rebuild state from input stream (in case of short term failure)
  • notes
    • streams allow async style of communication

future of data systems (what can be changed to improve current state)

  • modern systems may require several data representations, stored in different formats, so we need to build some system to integrate all of this sources in consistent & reliable manner
    • it would be nice to have some standardized protocol to do messaging and distributed transactions
    • messaging is easier to implement then distributed transactions, BUT you need to deal with eventual consistency (alternatively you can make system sync and less robust)
      • messaging, that relies on total ordering will suffer from scalability OR from miss-ordering on different replicas AND systems with microservices and/or clients with state can’t guarantee proper order
    • causality between different systems is not trackable on software level easily
      • conflict resolution algorithms, logical timestamps, recording read as event may help
  • batch processing and streams
    • framework is always done in func style for ease of reasoning and fault tolerance (easy to retry clean function)
      • secondary index also might be implemented in async manner
    • lambda architecture - historical-ish style of system, where we save all events in log structure, optimized for reads, stream events ASAP, do heavy batch processing on data from time to time
      • problems:
        • need to maintain two systems (unified framework can help)
        • stream and batch outputs need to be merged (fault tolerance mechanisms may help)
        • timing problems in batch part (need to have ability to process by event time)
  • unbundling DBs (in future, it might be beneficial to treat all data in system as one large DB, which composed of smaller data management systems, integrated with some framework)
    • to achieve systems must have:
      • high level query language
      • low level unified protocol to access data
    • data in such system can be managed via reactive model (change in one place triggers required recalculations)
      • todays DBs allow to run code on change (ex: triggers), but it is more of an afterthought AND not greatly implemented (consider deploying your app inside DB env somehow, instead of Docker)
      • it is ok to keep code and state separate, BUT your app might potentially subscribe to changes in DB and update it’s internal state
      • for such system ordering of events & fault tolerance is key, because getting data out of sync is bad for derived data
      • streaming allows building system, where each service can maintain local copy of external DB and query it, instead of doing API calls
        • time dependency can be hard to solve here
      • if client has state, it can be also part of reactive model with it’s materialized state
        • it will optimize offline flow and make clients more reactive, BUT it is not so easy due to lack of tooling and paradigm shift
    • system often has two paths (write - computation is done over data and saved, read - data is read with possibility of computation)
      • we can optimize both paths, ex:
        • doing more computation on write (basically building a cache) or more on read
        • common form of optimization is index
    • treating reads as events and storing them will enable casual dependency detection & possibility to find state of reader (ex: what user saw on page, before he done some write)
      • adds I/O cost
  • making DBs “correct”
    • correctness guarantee is often achieved with transactions, but in distributed systems it is often harder guarantee and often badly understood
      • poor configs can lead to data corruption
    • while we still rely on transactions, some paradigm shifts may apply
      • data corruption examples
        • data deletion (stream-like immutability can help)
        • double writes with side effects (idempotence will help)
        • data duplicate (use uniq ID per operation)
          • ex: user is doing operation twice, because response was never received
      • many systems implement low level corrections (TCP orders & deduplicates packets, we have checksums in protocols), BUT it is not sufficient to provide e2e correctness and additional application level mechanisms must be involved
    • to ensure constraints (ex: only positive balance, uniquness) we rely on consensus
      • solutions (always sync OR with conflict resolution in async model):
        • single leader
        • routing requests to same replica
        • total order broadcast (works for single partition, BUT can also work for multiple, where you have single message, that triggers reads from multiple partitions down the line)
      • constraints is often about consistency, which itself is about:
        • timeliness (can be violated in favor of eventual consistency) - data is viewed in up-to-date state
        • integrity (can’t be violated) - data is not corrupted
        • ---
        • loosely constraints can in async system can be achieved via compensational transactions
      • Another way of looking at coordination and constraints: they reduce the number of apologies you have to make due to inconsistencies, but potentially also reduce the performance and availability of your system, and thus potentially increase the num‐ ber of apologies you have to make due to outages. You cannot reduce the number of apologies to zero, but you can aim to find the best trade-off for your needs—the sweet spot where there are neither too many inconsistencies nor too many availability problems.

    • more about corruptions
      • remember that disc, CPU etc can get corrupted, BUT, like many other events we omit consideration of them, due to low possibility of them
      • remember that many things can fail and after some level you might need to do data auditing (as more e2e as possible) AND backups
        • remember that backups may degrade, do chaos & backups testing
        • most of systems doing audit from point of trust (we trust that most of time system is correct, while some systems may need to implement more cryptography to verify correctness)
  • privacy
    • remember that data belongs to humans and need to be treated properly, like human bean
    • be careful with algorithmic predictions, it may cause critical biases towards some groups
      • data-based decisions are great, BUT be careful with them
    • data is valuable to every company, SO it must be treated properly & securely
    • provide control over data to end user
    • delete data completely after some time