stasis-aries-wal/doc/rosePaper/rose.tex

2367 lines
116 KiB
TeX

% This is "sig-alternate.tex" V1.3 OCTOBER 2002
% This file should be compiled with V1.6 of "sig-alternate.cls" OCTOBER 2002
%
% This example file demonstrates the use of the 'sig-alternate.cls'
% V1.6 LaTeX2e document class file. It is for those submitting
% articles to ACM Conference Proceedings WHO DO NOT WISH TO
% STRICTLY ADHERE TO THE SIGS (PUBS-BOARD-ENDORSED) STYLE.
% The 'sig-alternate.cls' file will produce a similar-looking,
% albeit, 'tighter' paper resulting in, invariably, fewer pages.
%
% ----------------------------------------------------------------------------------------------------------------
% This .tex file (and associated .cls V1.6) produces:
% 1) The Permission Statement
% 2) The Conference (location) Info information
% 3) The Copyright Line with ACM data
% 4) Page numbers
%
% as against the acm_proc_article-sp.cls file which
% DOES NOT produce 1) thru' 3) above.
%
% Using 'sig-alternate.cls' you have control, however, from within
% the source .tex file, over both the CopyrightYear
% (defaulted to 2002) and the ACM Copyright Data
% (defaulted to X-XXXXX-XX-X/XX/XX).
% e.g.
% \CopyrightYear{2003} will cause 2002 to appear in the copyright line.
% \crdata{0-12345-67-8/90/12} will cause 0-12345-67-8/90/12 to appear in the copyright line.
%
% ---------------------------------------------------------------------------------------------------------------
% This .tex source is an example which *does* use
% the .bib file (from which the .bbl file % is produced).
% REMEMBER HOWEVER: After having produced the .bbl file,
% and prior to final submission, you *NEED* to 'insert'
% your .bbl file into your source .tex file so as to provide
% ONE 'self-contained' source file.
%
% ================= IF YOU HAVE QUESTIONS =======================
% Questions regarding the SIGS styles, SIGS policies and
% procedures, Conferences etc. should be sent to
% Adrienne Griscti (griscti@acm.org)
%
% Technical questions _only_ to
% Gerald Murray (murray@acm.org)
% ===============================================================
%
% For tracking purposes - this is V1.3 - OCTOBER 2002
\documentclass{vldb}
\usepackage{xspace,color}
\usepackage{graphicx}
\newcommand{\rows}{Rose\xspace}
\newcommand{\rowss}{Rose's\xspace}
\newcommand{\xxx}[1]{\textcolor{red}{\bf XXX: #1}}
\renewcommand{\xxx}[1]{\xspace}
\begin{document}
\title{{\ttlit \rows}: Compressed, log-structured replication}
%
% You need the command \numberofauthors to handle the "boxing"
% and alignment of the authors under the title, and to add
% a section for authors number 4 through n.
%
\numberofauthors{3}
\author{
\alignauthor
Russell Sears\\
\affaddr{UC Berkeley}
\alignauthor
Mark Callaghan\\
\affaddr{Google}
\alignauthor
Eric Brewer\\
\affaddr{UC Berkeley}
%\author{Russell Sears \and Mark Callaghan \and Eric Brewer}
}
\maketitle
\begin{abstract}
\rows\footnote{Replication Oriented Storage
Engine} is a database storage engine for high-throughput
replication. It targets seek-limited,
write-intensive transaction processing workloads that perform
near real-time decision support and analytical processing queries.
\rows uses {\em log structured merge} (LSM) trees to create full
database replicas using purely sequential I/O, allowing it to provide
orders of magnitude more write throughput than B-tree based replicas.
Also, LSM-trees cannot become fragmented and provide fast, predictable index scans.
\rowss write performance relies on replicas' ability to perform writes without
looking up old values. LSM-tree lookups have
performance comparable to B-tree lookups. If \rows read each value
that it updated then its write throughput would also be comparable
to a B-tree.
% are
%roughly twice as expensive as B-tree lookups. Therefore, if \rows
%read a tuple before each update then its write throughput would be
%bound by random I/O performance.
Although we target replication, \rows provides
high write throughput to any application that updates tuples
without reading existing data, such as append-only, streaming and
versioning databases.
We introduce a page compression
format that takes advantage of LSM-tree's sequential, sorted data
layout. It increases replication throughput by reducing sequential
I/O, and enables efficient tree lookups by supporting small page sizes and doubling as an index of the
values it stores. Any scheme that can
compress data in a single pass and provide random access to compressed
values could be used by \rows.
Replication environments have multiple readers but only one writer.
This allows \rows to provide atomicity, consistency and isolation to
concurrent transactions without resorting to rollback, blocking index
requests or interfering with maintenance tasks.
\rows avoids random I/O during replication and scans, leaving more
I/O capacity for queries than existing systems, and providing
scalable, real-time replication of seek-bound workloads.
Analytical models and experiments show that \rows provides orders of
magnitude greater replication bandwidth over larger databases than conventional
techniques.
%(XXX cut next sentence?) Also, a group of \rows replicas provides a
%highly-available, consistent copy of the database. In many Internet-scale
%environments, decision support query availability is more important
%than update availability.
%\rows targets seek-limited update-in-place OLTP applications, and uses
%a {\em log structured merge} (LSM) tree to trade disk bandwidth for
%seeks. LSM-Trees translate random updates into sequential scans and
%bulk loads. Their performance is limited by the sequential I/O
%bandwidth required by a vacuumer analogous to merges in
%sort-merge join. \rows uses column compression to reduce this
%bottleneck.
%\rowss throughput is limited by sequential I/O bandwidth. We use
%compression to reduce this bottleneck by adapting existing column
%compression algorithms to a simple row-oriented data layout. This
%approach to database compression introduces negligible space overhead
%and can be applied to most single-pass, randomly accessible
%compression formats. Our prototype uses lightweight (superscalar)
%column compression algorithms.
\end{abstract}
%% SIGMOD DOESN'T USE THESE
% A category with the (minimum) three required fields
%\category{H.4}{Information Systems Applications}{Miscellaneous}
%A category including the fourth, optional field follows...
%\category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures]
%\terms{Delphi theory}
%\keywords{ACM proceedings, \LaTeX, text tagging}
\section{Introduction}
\rows is a database replication engine for workloads with high volumes
of in-place updates. It is designed to provide high-throughput,
general purpose transactional replication regardless of
database size, query contention and update patterns. In particular, it
is designed to run real-time decision support and analytical
processing queries against some of today's largest TPC-C style online
transaction processing applications.
Traditional database replication technologies provide acceptable
performance if the application write set fits in RAM or if the
storage system is able to update data in place quickly enough to keep
up with the replication workload.
Transaction processing (OLTP)
systems use fragmentation to optimize for small, low-latency reads and writes. % by fragmenting tables.
They scale by adding memory and additional drives, increasing the number of
I/O operations per second provided by the hardware. Data warehousing technologies introduce
latency, giving them time to reorganize data for bulk insertion, and
column stores optimize for scan performance at the expense of random
access to individual tuples.
\rows combines the best features of the above approaches:
\begin{itemize}
\item High throughput writes, regardless of update patterns
\item Scan performance comparable to bulk-loaded structures
\item Low latency lookups and updates
\end{itemize}
We implemented \rows because existing replication technologies
only met two of these three requirements. \rows achieves all three
goals.
\rows is based on LSM-trees, which reflect updates immediately
without performing disk seeks or resorting to fragmentation. This
allows it to provide better write and scan throughput than B-trees.
Unlike existing LSM-tree implementations, \rows makes use of
compression, further increasing replication and scan performance.
\rows provides extremely high write throughput to applications that
perform updates without performing reads, such as replication, append-only, streaming
and versioning databases.
%However, like LSM-trees, \rowss random reads are up to twice as
%expensive as B-tree lookups. If the application read an old value
%each time it performed a write, \rowss replication performance would
%degrade to that of other systems that rely upon random I/O.
%Therefore, we target systems that write data without performing reads.
%We focus on replication, but append-only, streaming and versioning
%databases would also achieve high write throughput with \rows.
%It requires complete transactional semantics and avoids
%reading data during updates regardless of application behavior.
Replication is a particularly attractive application of LSM-trees because
it is common, well-understood and requires complete transactional
semantics. Also, we know of no other scalable replication approach
that provides real-time analytical queries over seek-bound transaction
processing workloads.
%\rows provides much greater write throughput than the database master
%would on comparable hardware, increasing the amount of I/O available to
%read-only queries.
% Alternatively, a single \rows instance
%could replicate the workload of multiple database masters,
%simplifying read-only queries that span databases.
%is designed to service analytical processing queries over
%transaction processing workloads in real time. Such workloads
%typically read data before updating it. However, replication
%envrionments are able to apply updates without reading existing data.
%This allows \rows to apply updates without randomly accessing existing
%data. Because updates perform sequential I/O, write throughput is
%orders of magnitude higher than random read throughput, and is
%independent of the application's working set size.
%Applying these techniques to replication environments provides write
%throughput comparable to high-latency techniques optimized for bulk
%operations, and latency comparable to transaction processing systems.
%\rowss compression techniques provide compression ratios similar to
%those achieved by compressed column stores without sacrificing
%efficient reconstruction of tuples.
%% When faced with random access patterns, traditional database
%% scalability is limited by the size of memory. If the system's working
%% set does not fit in RAM, any attempt to update data in place is
%% limited by the latency of hard disk seeks. This bottleneck can be
%% alleviated by adding more drives, which increases cost and decreases
%% reliability. Alternatively, the database can run on a cluster of
%% machines, increasing the amount of available memory, CPUs and disk
%% heads, but introducing the overhead and complexity of distributed
%% transactions and partial failure.
%% These problems lead to large-scale database installations that partition
%% their workloads across multiple servers, allowing linear scalability,
%% but sacrificing consistency between data stored in different
%% partitions. Fortunately, updates often deal with well-defined subsets
%% of the data; with an appropriate partitioning scheme, one can achieve
%% linear scalability for localized updates.
%% The cost of partitioning is that no globally coherent version of the
%% data exists. In the best case, queries that rely on a global view of
%% the data run against each master database instance, then attempt to
%% reconcile inconsistencies in their results. If the queries are
%% too expensive to run on master database instances they are delegated
%% to data warehousing systems and produce stale results.
%% In order to address the needs of such workloads, \rows gives up the ability to
%% directly process SQL updates. In exchange, it is able to replicate
%% conventional database instances at a small fraction of the cost of a
%% general-purpose database server.
%% Like a data warehousing solution, this decreases the cost of large,
%% read-only analytical processing and decision support queries, and scales to extremely
%% large database instances with high-throughput updates. Unlike data
%% warehousing solutions, \rows does this without introducing significant
%% replication latency.
%While B-tree replicas consume as many I/O operations per second (IOPs)
%as the database master, \rows applies many updates with each
%sequential write that it performs. Our experiments show that \rows
%replica maintenance is orders of magnitude more efficient than
%conventional replication techniques. The remaining IOPs are made
%available to additional database replicas or to read-only queries.
%Furthermore, \rows index scans never resort to random I/O, making them
%considerably less expensive than B-tree index scans.
%However, \rows does not provide highly-optimized single tuple lookups
%required by an OLTP master database. \rowss on-disk tuple lookups are
%approximately two times slower than in a conventional B-tree, and therefore
%up to two to three orders of magnitude slower than \rows updates.
%During replication, writes can be performed without reading modified data. Therefore, the overhead of random index probes can easily be offset by
%\rowss decreased update and range scan costs, especially in situations where the
%database master must resort to partitioning or large disk arrays to
%keep up with the workload.
%Because \rows provides much greater write throughput than the database
%master would on comparable hardware, a single \rows instance can use a
%few disks to replicate multiple database partitions. The techniques
%we use are applicable to any workload that overwrites data without
%performing reads.
%% The expense associated with such systems
%% prevents conventional database replicas from scaling. The additional
%% read throughput they provide is nearly as expensive as read throughput
%% on the master. Because their performance is comparable to that of the
%% master database, they are unable to consolidate multiple database
%% instances for centralized processing.
%% Unlike existing systems, \rows provides inexpensive, low-latency, and
%% scalable replication of write-intensive relational databases,
%% regardless of workload contention, database size, or update patterns.
%% \subsection{Fictional \rows deployment}
%% Imagine a classic, disk-bound TPC-C installation. On modern hardware,
%% such a system would have tens of disks, and would be seek limited.
%% Consider the problem of producing a read-only, low-latency replica of
%% the system for analytical processing, decision support, or some other
%% expensive read-only workload. If the replica uses the same storage
%% engine as the master, its hardware resources would be comparable to
%% (certainly within an order of magnitude) those of the master database
%% instances. Worse, a significant fraction of these resources would be
%% devoted to replaying updates from the master. As we show below,
%% the I/O cost of maintaining a \rows replica can be less than 1\% of
%% the cost of maintaining the master database.
%% Therefore, unless the replica's read-only query workload is seek
%% limited, a \rows replica requires many fewer disks than the
%% master database instance. If the replica must service seek-limited
%% queries, it will likely need to run on a machine similar to the master
%% database, but will use almost none of its (expensive) I/O capacity for
%% replication, increasing the resources available to queries.
%% Furthermore, \rowss indices are allocated sequentially, reducing the
%% cost of index scans, and \rowss buffer pool stores compressed
%% pages, increasing the effective size of system memory.
%% The primary drawback of this approach is that it roughly doubles the
%% cost of each random index lookup. Therefore, the attractiveness of
%% \rows hinges on two factors: the fraction of the workload devoted to
%% random tuple lookups, and the premium one would have paid for a piece
%% of specialized storage hardware that \rows replaces.
%% \subsection{Paper structure}
%% \xxx{ cut subsection }
%% We begin by providing an overview of \rowss system design and then
%% present a simplified analytical model of LSM-tree I/O behavior.
%% Applying this model to our test hardware predicts that \rows will
%% greatly outperform database replicas that store data in B-trees. Next, we
%% present a row-oriented page layout that allows many
%% database compression schemes to be used in \rows.
%% Next, we measure \rowss replication performance on real world data and
%% demonstrate orders of magnitude greater throughput and scalability
%% than a MySQL InnoDB B-tree index. We compare \rowss replication
%% throughput to that of an ideal LSM-tree implementation. We then
%% introduce a hybrid of the TPC-C and TPC-H benchmarks that is
%% appropriate for the environments targeted by \rows. We use this
%% benchmark to compare \rowss and InnoDB's random access times. We
%% defer related work to the end of the paper, as recent research
%% suggests a number of ways in which \rows could be improved.
\section{System overview}
A \rows replica takes a replication log as input and stores the
changes it contains in a {\em log structured merge} (LSM)
tree~\cite{lsm}.
\begin{figure}
\centering \epsfig{file=lsm-tree.pdf, width=3.32in}
\caption{The structure of a \rows LSM-tree}
\label{fig:lsm-tree}
\end{figure}
An LSM-tree is an index that consists of multiple sub-trees called
{\em components} (Figure~\ref{fig:lsm-tree}). The smallest component, $C0$, is a memory resident
binary search tree that is updated in place. The next-smallest component, $C1$, is a bulk-loaded B-tree. $C0$
is merged with $C1$ as it grows. The merge process consists of index scans
and produces a new bulk-loaded version of $C1$ that contains the
updates from $C0$. LSM-trees can have arbitrarily many components,
though three (two on-disk trees) is generally adequate.
All other
components are produced by repeated merges with the next smaller
component. Therefore, LSM-trees are updated without using
random disk I/O.
Lookups are performed by examining tree components, starting with the
in-memory component and moving on
to progressively larger and out-of-date trees until a match is found.
This involves a maximum of two on-disk tree lookups unless a merge is
in process, in which case three lookups may be required.
\rowss first contribution is to use compression to increase merge
throughput. Compression reduces the amount of sequential I/O
required by merges, trading surplus
computational power for scarce storage bandwidth. Lookup performance
is determined by page cache hit ratios and the number of seeks
provided by the storage system. \rowss compression increases the
effective size of the page cache.
The second contribution is the application of LSM-trees to database
replication workloads. In order to take full advantage of LSM-trees'
write throughput, the replication environment must not force \rows to
obtain pre-images of overwritten values. Therefore, we assume that the
replication log stores each transaction {\tt begin}, {\tt commit},
and {\tt abort} performed by the master database, along with the pre-
and post-images associated with each update. The ordering of
these entries must match the order in which they are applied at the
database master. Workloads that do not contain transactions or do not
update existing data may omit some of the mechanisms described here.
\rows immediately applies updates to $C0$, making them
available to queries that do not require a consistent view of the
data. \rows supports transactional isolation by delaying the
availability of new data for the duration of a few update
transactions. It then produces a new, consistent snapshot that
reflects the new data. Multiple snapshots can coexist, allowing
read-only transactions to run without creating lock contention or being
aborted due to concurrent updates.
Long-running transactions do not block \rowss maintenance tasks or
index operations. However, long-running read-only queries delay
the deallocation of stale data, and long-running updates introduce
replication delay. \rowss concurrency control
mechanisms are described in Section~\ref{sec:isolation}.
%To prevent the in-memory tree from growing without bound, a merge
%process iterates over the (sorted) tree entries, and merges them with
%existing on-disk tree entries. As the database increases in size, the
%on disk tree grows, forcing \rows to compare an ever-increasing number
%of existing tuples with each new tuple. To mitigate this effect, the
%on-disk tree is periodically emptied by merging it with a second,
%larger tree.
\rows merges tree components in background threads, allowing
it to continuously process updates and service index lookup requests.
Index lookups minimize the overhead of thread synchronization by
latching entire tree components at a time. On-disk tree
components are read-only so these latches only block reclamation of
space used by deallocated tree components. $C0$ is
updated in place, preventing inserts from occurring concurrently with
lookups. However, operations on $C0$ are comparatively
fast, reducing contention for $C0$'s latch.
Recovery, allocation and atomic updates to \rowss metadata are
handled by Stasis~\cite{stasis}, an extensible transactional storage
system. \rows is implemented as a set of custom Stasis page formats
and tree structures.
In order to provide inexpensive queries to clients
\rows assumes the replication log is made durable by some other
mechanism. Rather than commit each replicated transaction
independently, it creates one Stasis transaction each time a tree
component is created. These transactions generate negligible amounts of log
entries.
Redo and undo information for tree component metadata and headers are
written to the Stasis log. Data contained in tree components are
never written to the log and tree component contents are sequentially
forced to the page file at Stasis commit. During recovery,
partially written tree components are deallocated and tree headers are
brought to a consistent state.
After Stasis recovery completes, \rows is in a consistent state that
existed at some point in the past. The replication log must be
replayed from that point to finish recovery.
\rows determines where to begin replaying the log by consulting its
metadata, which is updated each time a tree merge
completes. This metadata contains the timestamp of the last update
that is reflected in $C1$, which is simply the timestamp of the
last tuple written to $C0$ before the merge began.
%an extension to the transaction system and stores its
%data in a conventional database page file. \rows does not use the
%underlying transaction system to log changes to its tree components.
%Instead, it force writes tree components to disk after each merge completes, ensuring
%durability without significant logging overhead.
%Stasis force writes \rows tree components to disk at commit, providing
%coarse-grained durability without generating a significant amount of
%log data. Portions of \rows (such as tree component
%positions and index metadata) are updated in place and are stored using preexisting Stasis transactional
%storage primitives. Tree components are allocated, written, and
%registered with \rows within a single Stasis transaction. During
%recovery, any partially written \rows tree components are be
%deallocated and unregistered, leaving \rows in a consistent state.
%Stasis transactions are not visible to end users, and may contain many
%\rows transactions. Therefore, after Stasis recovery completes, \rows
%uses the replication log to reapply any transactions lost because of the
%crash.
\subsection{Tree merging}
%\xxx{This information doesn't belong here... place it throughout this section}
%The remainder of this section provides an overview of LSM-trees and quantifies the
%cost of tuple insertions. It then steps through an analysis of
%LSM-tree performance on current hardware. More thorough analytical discussions of LSM
%performance~\cite{lsm}, and comparisons between LSM-trees and a wide variety of other indexing techniques~\cite{partexp} are available elsewhere.
%Next,
%we explain how our implementation provides transactional isolation and
%exploits hardware parallelism. The adaptation of LSM-trees to
%database replication is an important contribution of this work and is
%the focus of the rest of this section. We defer discussion of
%compression to the next section.
% XXX figures?
%An LSM-tree consists of a number of underlying trees.
\rowss LSM-trees always consist of three components ($C0$, $C1$ and
$C2$), as this provides a good balance between insertion throughput
and lookup cost.
Updates are applied directly to the in-memory tree, and repeated tree merges
limit the size of $C0$. These tree
merges produce a new version of $C1$ by combining tuples from $C0$ with
tuples in the existing version of $C1$. When the merge completes
$C1$ is atomically replaced with the new tree and $C0$ is atomically
replaced with an empty tree. The process is eventually repeated when
$C1$ and $C2$ are merged.
Replacing entire trees at once introduces a number of problems. It
doubles the number of bytes used to store each component, which is
important for $C0$, as it doubles memory requirements. It is also
important for $C2$, as it doubles the amount of disk space used by
\rows. Finally, ongoing merges force index probes to access both versions of $C1$,
increasing random lookup times.
The original LSM-tree work proposes a more sophisticated scheme that
addresses these issues by replacing one sub-tree at a time. This
reduces peak storage and memory requirements but adds complexity
by requiring in-place updates of tree components.
%Truly atomic replacement of portions of an LSM-tree would cause ongoing
%merges to block insertions and force the mergers to run in lock step.
%We address this issue by allowing data to be inserted into
%the new version of the smaller component before the merge completes.
%This forces \rows to check both versions of components $C0$ and $C1$
%in order to look up each tuple, but it handles concurrency between merge steps
%without resorting to fine-grained latches.
An alternative approach would partition \rows into multiple LSM-trees and
merge a single partition at a time~\cite{partexp}. This would reduce
the frequency of extra lookups caused by ongoing tree merges.
Partitioning also improves write throughput when updates are skewed,
as unchanged portions of the tree do not participate in merges and
frequently changing partitions can be merged more often. We do not
consider these optimizations in the discussion below; including them
would complicate the analysis without providing any new insight.
\subsection{Amortized insertion cost}
This section provides an overview of LSM-tree performance. A more
thorough analytical discussion~\cite{lsm}, and
comparisons between LSM-trees and a wide variety of other indexing
techniques~\cite{partexp} are available elsewhere.
To compute the amortized LSM-tree insertion cost
we consider the cost of comparing the inserted tuple with
existing tuples. Each tuple insertion ultimately causes two rounds of I/O
operations. One merges the tuple into $C1$; the other merges it into
$C2$. Insertions do not initiate more I/O once they reach $C2$.
Write throughput is maximized when the ratio of the
sizes of $C1$ to $C0$ is equal to the ratio between $C2$ and
$C1~\cite{lsm}$. This ratio is called $R$, and:
\[size~of~tree\approx~R^2*|C0|\]
Merges examine an average of $R$ tuples from $C1$ for each tuple
of $C0$ they consume. Similarly, $R$ tuples from $C2$ are examined
each time a tuple in $C1$ is consumed. Therefore: %, in a steady state:
\[insertion~rate*R(t_{C2}+t_{C1})\approx~sequential~i/o~cost\]
Where $t_{C1}$ and $t_{C2}$ are the amount of time it takes to read
from and write to C1 and C2.
%, insertion rate times the sum of $R *
%cost_{read~and~write~C2}$ and $R * cost_{read~and~write~C1}$ cannot
%exceed the drive's sequential I/O bandwidth. Note that the total size
%of the tree is approximately $R^2 * |C0|$.
% (neglecting the data stored
%in $C0$ and $C1$)\footnote{The proof that keeping R constant across
% our three tree components follows from the fact that the mergers
% compete for I/O bandwidth and $x(1-x)$ is maximized when $x=0.5$.
% The LSM-tree paper proves the general case.}.
\subsection{Replication Throughput}
LSM-trees have different asymptotic performance characteristics than
conventional index structures. In particular, the amortized cost of
insertion is $O(\sqrt{n}~log~n)$ in the size of the data and is proportional
to the cost of sequential I/O. In a B-tree, this cost is
$O(log~n)$ but is proportional to the cost of random I/O.
%The relative costs of sequential and random
%I/O determine whether or not \rows is able to outperform B-trees in
%practice.
This section describes the impact of compression on B-tree
and LSM-tree performance.
We use simplified models of each structure's performance
characteristics. In particular, we assume that the leaf nodes do not
fit in memory, that tuples are accessed randomly with equal
probability, and that internal tree nodes fit in RAM. Without a
skewed update distribution, reordering and batching I/O into
sequential writes only helps if a significant fraction of the tree's
data fits in RAM. Therefore, we do not consider B-tree I/O batching
here.
%If we assume uniform access patterns, 4 KB pages and 100 byte tuples,
%this means that an uncompressed B-tree would keep $\sim2.5\%$ of the
%tuples in memory. Prefix compression and a skewed update distribution
%would improve the situation significantly, but are not considered
%here.
In B-trees, each update reads a page and eventually writes it back:
\[
cost_{Btree~update}=2~cost_{random~io}
\]
In \rows, we have:
\[
cost_{LSMtree~update}=2*2*2*R*\frac{cost_{sequential~io}}{compression~ratio} %% not S + sqrt S; just 2 sqrt S.
\]
We multiply by $2R$ because each new
tuple is eventually merged into both on-disk components, and
each merge involves an average of $R$ comparisons with existing tuples.
The second factor of two reflects the fact that the merger must read
existing tuples into memory before writing them back to disk.
An update of a tuple is handled as an insertion of the new
tuple and a deletion of the old tuple. Deletion is an insertion
of a special tombstone tuple that records the deletion of the old version of the tuple, leading to the third factor of two.
Updates that do not modify primary key fields avoid this final factor
of two. The delete and
insert share the same primary key and snapshot number, so the
insertion always supersedes the deletion. Therefore, there is no
need to insert a tombstone.
%
% Merge 1:
% read C1 R times, write C1 R times.
%
% Merge 2:
% read C1 1 times
% read C2 R times, write C2 R times.
%
The $compression~ratio$ is
$\frac{uncompressed~size}{compressed~size}$, so improved compression
leads to less expensive LSM-tree updates. For simplicity, we assume
the compression ratio is the same throughout each component of
the LSM-tree; \rows addresses this at runtime by reasoning in terms
of the number of pages used by each component.
Our test hardware's hard drive is a 7200RPM, 750 GB Seagate Barracuda
ES.
%has a manufacturer-reported average rotational latency of
%$4.16~msec$, seek times of $8.5/9.5~msec$ (read/write), and a maximum
%sustained throughput of $78~MB/s$.
Third party
benchmarks~\cite{hdBench} %\footnote{http://www.storagereview.com/ST3750640NS.sr}
report random access times of 12.3/13.5~msec (read/write) and 44.3-78.5~megabytes/sec
sustained throughput. Timing {\tt dd if=/dev/zero of=file; sync} on an
empty ext3 file system suggests our test hardware provides 57.5~megabytes/sec of
storage bandwidth, but running a similar test via Stasis' buffer manager produces
a multimodal distribution ranging from 22 to 47~megabytes/sec.
%We used two hard drives for our tests, a smaller, high performance drive with an average seek time of $9.3~ms$, a
%rotational latency of $4.2~ms$, and a manufacturer reported raw
%throughput of $150~mb/s$. Our buffer manager achieves $\sim 27~mb/s$
%throughput; {\tt dd if=/dev/zero of=file} writes at $\sim 30.5~mb/s$.
Assuming a fixed hardware configuration and measuring cost in disk
time, we have:
%\[
% cost_{sequential~io}=\frac{|tuple|}{30.5*1024^2}=0.000031268~msec
%\]
%% 12.738854
\[
cost_{sequential}=\frac{|tuple|}{78.5MB/s}=12.7~|tuple|~~nsec/tuple~(min)
\]
%% 22.573363
\[
cost_{sequential}=\frac{|tuple|}{44.3MB/s}=22.6~|tuple|~~nsec/tuple~(max)
\]
and
\[
cost_{random}=\frac{12.3+13.5}{2} = 12.9~msec/tuple
\]
Assuming $44.3mb/s$ sequential bandwidth:
\[
2~cost_{random}\approx1,000,000\frac{cost_{sequential}}{|tuple|}
\] yields: \[
\frac{cost_{LSMtree~update}}{cost_{Btree~update}}=\frac{2*2*2*R*cost_{sequential}}{compression~ratio*2*cost_{random}}
% \frac{cost_{LSMtree~update}}{cost_{Btree~update}} \approx \frac{(S + \sqrt{S})}{|tuple|~compression~ratio~250,000}
\]
\[
\approx\frac{R*|tuple|}{250,000*compression~ratio}
\]
If tuples are 100 bytes and we assume a compression ratio of 4, which is lower
than we expect to see in practice, but numerically convenient, then the
LSM-tree outperforms the B-tree when:
\[
R < \frac{250,000*compression~ratio}{|tuple|}
\]
\[
R < 10,000
\]
%750 gb throughput = 1 / (((8 * 27 * 22.6 * 100) / 4) * (ns)) = 8.00198705 khz
% 1 / (((8 * 2.73 * 100 * 22.6) / 4) * (ns))
%on a machine that can store 1 GB in an in-memory tree.
%, this yields a
%maximum ``interesting'' tree size of $R^2*1GB = $ 100 petabytes, well
%above the actual drive capacity of $750~GB$.
A 750~GB tree with a 1GB $C0$ would have an $R$ of $\sqrt{750}\approx27$.
If tuples are 100 bytes such a tree's sustained insertion throughput will be
approximately 8000~tuples/sec or 800~kilobytes/sec.
%\footnote{It would
% take 11 days to overwrite every tuple on the drive in random order.}
%given our 100 byte tuples.
Our hard drive's average access time allows the drive to deliver 83 I/O operations/sec.
Therefore, we can expect an insertion throughput of 41.5~tuples/sec from a B-tree
that does not cache leaf pages. With 1~GB of RAM, \rows should outperform the
B-tree by more than two orders of magnitude. Increasing \rowss system
memory to cache 10~GB of tuples would increase write performance by a
factor of $\sqrt{10}$.
% 41.5/(1-80/750) = 46.4552239
Increasing memory another ten fold to 100~GB would yield an LSM-tree
with an R of $\sqrt{750/100} = 2.73$ and a throughput of 81,000~tuples/sec.
In contrast, the B-tree would cache less than 100~GB of leaf pages
in memory and would write fewer than $\frac{41.5}{1-(100/750)} = 47.9$
tuples/sec. Increasing memory further yields a system that
is no longer disk bound.
Assuming CPUs are fast enough to allow \rows to make use of the bandwidth supplied
by the disks, we conclude that \rows will provide significantly higher
throughput than a seek-bound B-tree.
\subsection{Indexing}
Our analysis ignores the cost of bulk-loading and storing
LSM-trees' internal nodes. Each time the merge process fills a page it
inserts an entry into the rightmost position in the tree, allocating
additional internal nodes if necessary. Our prototype does not
compress internal tree nodes.
The space overhead of building these tree nodes depends on the number
of tuples that fit in each page. If tuples take up a small fraction
of each page then \rows gets good fan-out on internal nodes, reducing
the fraction of storage used by those nodes. Therefore, as page
size increases, tree overhead decreases. Similarly, as tuple sizes
increase, the fraction of storage dedicated to internal tree nodes
increases.
\rows pages are 4KB. Workloads with
larger tuples would save disk and page cache space by using larger
pages for internal nodes. For very large trees, larger internal tree
nodes also avoid seeks during index lookups.
%Consider a tree that can store 10 compressed tuples in each 4K page.
%If an uncompressed tuple is 400 bytes long then roughly a tenth of
%the pages are dedicated to the lowest level of tree nodes, with a
%tenth that number devoted to the next highest level, and so on. With
%a 2x compression ratio, uncompressed tuples occupy 800 bytes and each
%higher level in the tree is only a fifth the size of the level below
%it. Larger page sizes and compression of internal nodes would reduce
%the amount of space dedicated to internal tree nodes.
%Table~\ref{table:treeCreationTwo} provides a comparison of compression
%performance with and without tree creation enabled\footnote{Our
% analysis ignores page headers, per-column, and per-tuple overheads;
% these factors account for the additional indexing overhead.}. The
%data was generated by applying \rowss compressors to randomly
%generated five column, 1,000,000 row tables. Across five runs, in
%Table~\ref{table:treeCreation} RLE's page count had a standard
%deviation of $\sigma=2.35$; the other page counts had $\sigma=0$. In
%Table~\ref{table:treeCreationTwo}, $\sigma < 7.26$ pages.
%Throughput's $\sigma<6MB/s$.
%% As the size of the tuples increases, the number of compressed pages
%% that each internal tree node points to decreases, increasing the
%% overhead of tree creation. In such circumstances, internal tree node
%% compression and larger pages should improve the situation.
\subsection{Isolation}
\label{sec:isolation}
\rows handles two types of transactions: updates from the
master and read-only queries from clients. \rows has no control over the order or
isolation of updates from the master database. Therefore, it must
apply these updates in an order consistent with the master database
in a way that isolates queries from concurrent modifications.
LSM-tree updates do not immediately remove pre-existing data
from the tree. Instead, they provide a simple form of versioning
where data from the newest component ($C0$) is always the most up-to-date.
We extend this idea to provide snapshots. Each transaction is
assigned to a snapshot, and no more than one version of each tuple
exists within a snapshot.
To lookup a tuple as it existed at some point in time, we examine all
snapshots that were taken before that point and return the most recent
version of the tuple that we find. This is inexpensive because all versions of the same tuple are
stored in adjacent positions in the LSM-tree. Furthermore, \rowss versioning is meant to be
used for transactional consistency and not time travel, limiting
the number of snapshots.
To insert a tuple, \rows first determines to which snapshot it
should belong. At any given point in time up to two snapshots
may accept new updates. One snapshot accepts new
transactions. If the replication log may contain interleaved transactions,
such as when there are multiple master databases, then
a second, older snapshot waits for any pending transactions that
it contains to complete. Once all of those transactions are complete,
the older snapshot stops accepting updates, the first snapshot
stops accepting new transactions, and a snapshot for new
transactions is created.
\rows examines the timestamp and transaction ID associated with the tuple insertion
and marks the tuple with the appropriate snapshot ID. It then
inserts the tuple into $C0$, overwriting any tuple that matches on
primary key and has the same snapshot ID.
The merge thread uses a list of currently accessible snapshots
to decide which versions of the tuple in $C0$ and $C1$ are accessible
to transactions. Such versions are the most recent copy of the tuple that
existed before the end of an accessible snapshot. After finding all
such versions of a tuple (one may exist for each accessible snapshot),
the merge process writes them to the new version of $C1$. Any other
versions of the tuple are discarded. If two matching tuples from the
same snapshot are encountered then one must be from $C0$ and the other
from $C1$. The version from $C0$ is more recent, so the merger
discards the version from $C1$. Merges between $C1$ and $C2$ work the
same way as merges between $C0$ and $C1$.
\rows handles tuple deletion by inserting a tombstone. Tombstones
record the deletion event and are handled similarly to insertions. A
tombstone will be kept if it is the newest version of a tuple in at
least one accessible snapshot. Otherwise, the tuple was reinserted
before next accessible snapshot was taken and the tombstone is
deleted. Unlike insertions, tombstones in $C2$ are deleted if they
are the oldest remaining reference to a tuple.
%% The correctness of \rows snapshots relies on the isolation mechanisms
%% provided by the master database. Within each snapshot, \rows applies
%% all updates in the same order as the primary database and cannot lose
%% sync with the master. However, across snapshots, concurrent
%% transactions can write non-conflicting tuples in arbitrary orders,
%% causing reordering of updates. These updates are guaranteed to be
%% applied in transaction order. If the master misbehaves and applies
%% conflicting updates out of transaction order then \rows and the
%% master's version of the database will lose synchronization. Such out
%% of order updates should be listed as separate transactions in the
%% replication log.
%% If the master database provides snapshot isolation using multiversion
%% concurrency control, \rows can reuse the timestamp the database
%% applies to each transaction. If the master uses two phase locking,
%% the situation becomes more complex, as \rows has to use the commit time
%% (the point in time when the locks are released) of each transaction.
%% Until the commit time is known, \rows would store the transaction id
%% in the LSM-tree. As transactions are committed it would record the
%% mapping from transaction id to snapshot. Eventually, the merger would
%% translate transaction ids to snapshots, preventing the mapping from
%% growing without bound.
\rowss snapshots have minimal performance impact, and provide
transactional concurrency control without rolling back transactions
or blocking the merge and replication processes. However,
long-running updates prevent queries from accessing the results of
recent transactions, leading to stale results. Long-running queries
increase \rowss disk footprint by increasing the number of accessible
snapshots.
%\label{sec:isolation}
%\rows combines replicated transactions into snapshots. Each transaction
%is assigned to a snapshot according to a timestamp; two snapshots are
%active at any given time. \rows assigns incoming transactions to the
%newer of the two active snapshots. Once all transactions in the older
%snapshot have completed, that snapshot is marked inactive, exposing
%its contents to new queries that request a consistent view of the
%data. At this point a new active snapshot is created, and the process
%continues.
%(XXX explain life of tuple here)
%%The timestamp is simply the snapshot number.
%In the case of a tie
%during merging, such as two tuples with the same primary key and
%timestamp, the version from the newer (lower numbered) component is
%taken. If a tuple maintains the same primary key while being updated
%multiple times within a snapshot, this allows \rows to discard all but
%the last update before writing the tuple to disk.
%New snapshots are created in two steps. First, all transactions in
%epoch $t-1$ must complete (commit or abort) so that they are
%guaranteed to never apply updates to the database again. In the
%second step, \rowss current snapshot number is incremented, new
%read-only queries are assigned to snapshot $t-1$, and new updates
%are assigned to snapshot $t+1$. Each such query is granted a
%shared lock on the existence of the snapshot, protecting that version
%of the database from garbage collection. In order to ensure that new
%snapshots are created in a timely and predictable fashion, the time
%between them should be acceptably short, but still slightly longer
%than the longest running transaction. Using longer snapshots
%increases coalescing of repeated updates to the same tuples,
%but increases replication delay.
%
%\subsubsection{Isolation performance impact}
%
%Although \rowss isolation mechanisms never block the execution of
%index operations or merge processes, their behavior degrades in the presence of long
%running transactions.
%
%Long running updates do not prevent tree merges from continuing,
%but they can block the creation of new snapshots. Upon
%encountering such a transaction, \rows can either wait or ask the
%master database to abort the offending transaction, then wait until
%appropriate rollback (or commit) entries appear in the replication
%log. While waiting for a long running transaction in snapshot $t-1$
%to complete, \rows continues to process replication requests by
%extending snapshot $t$, and services requests for consistent data from
%the increasingly stale snapshot $t-2$.
%
%%simply asks the master database to abort the offending update. It
%%then waits until appropriate rollback (or perhaps commit) entries
%%appear in the replication log, and creates the new snapshot. While
%%waiting for the transactions to complete, \rows continues to process
%%replication requests by extending snapshot $t$.
%
%%Of course, proactively aborting long running updates is simply an
%%optimization. Without a surly database administrator to defend it
%%against application developers, \rows does not send abort requests,
%%but otherwise behaves identically. Read-only queries that are
%%interested in transactional consistency continue to read from (the
%%increasingly stale) snapshot $t-2$ until $t-1$'s long running
%%updates commit.
%
%Long running queries present a different set of challenges to \rows.
%%Although \rows provides fairly efficient time-travel support,
%%versioning databases are not our target application. \rows
%%provides each new read-only query with guaranteed access to a
%%consistent version of the database.
%They force \rows to keep old versions of overwritten tuples around
%until the query completes. These tuples increase the size of \rowss
%LSM-trees, increasing merge overhead. If the space consumed by old
%versions of the data is a serious issue, extremely long running
%queries should be disallowed. Alternatively, historical or
%long-running queries could be run against certain snapshots (every
%1000th, or the first one of the day, for example), reducing the
%number of snapshots maintained by \rows.
%
%\subsubsection{Merging and Garbage collection}
%
%\rows merges components by iterating over them in order, garbage collecting
%obsolete and duplicate tuples and writing the rest into a new version
%of the larger component. Because \rows provides snapshot consistency
%to queries, it must be careful not to collect a version of a tuple that
%is visible to any outstanding or future query. \rows
%deletes data by
%inserting special tombstone tuples into the tree. A tombstone's
%purpose is to record the deletion event until all older versions of
%the tuple have been garbage collected. Sometime after that point, the tombstone
%is collected as well.
%
%(XXX clarify this paragraph) In order to determine whether or not a tuple can be collected, \rows
%compares the tuple's timestamp with any matching tombstones, and with any tuples that
%match on primary key. Upon encountering such candidates for garbage collection,
%\rows compares their timestamps with the set of locked snapshots. If
%there are no snapshots between the tuple being examined and the
%updated version, then the tuple can be collected. Tombstone tuples can
%also be collected once they reach $C2$ and any older matching tuples
%have been removed.
%
%Actual reclamation of pages is handled by Stasis; each time a tree
%component is replaced, \rows simply tells Stasis to free the regions of
%pages that contain the obsolete tree.
%
%%the underlying transaction
%%system; once \rows completes its scan over existing components (and
%%registers new ones in their places), it tells the transaction system
%%to reclaim the regions of the page file that stored the old components.
\subsection{Parallelism}
\rows operations are concurrent; readers and writers work
independently, avoiding blocking, deadlock and livelock. Index probes
must latch $C0$ to perform a lookup, but the more costly
probes into $C1$ and $C2$ are against read-only trees. Beyond locating
and pinning tree components against deallocation, probes of these
components do not interact with the merge processes.
Each tree merge runs in a separate thread. This
allows our prototype to exploit two to three processor cores while
inserting and recompressing data during replication. Remaining cores
could be exploited by range partitioning a replica into multiple
LSM-trees, allowing more merge processes to run concurrently.
Therefore, we expect the throughput of \rows replication to increase
with memory size, compression ratios and I/O bandwidth for the foreseeable future.
%[XXX need experimental evidence...] During bulk
%load, the buffer manager, which uses Linux's {\tt sync\_file\_range}
%function allows \rows to asynchronously force regions [XXX currently,
% we do a synchronous force with sync\_file\_range....] of the page
%file to disk. \rows has control over region size; if the system is
%CPU bound \rows can ensure that the time spent waiting for synchronous
%page writes is negligible, by choosing an appropriate region size. On
%the other hand, if the system is disk bound, the same asynchronous
%force mechanism ensures that \rows overlaps computation with I/O. [XXX
% this is probably too detailed; we should just say that \rows uses
% standard techniques to overlap computation and I/O]
%% \subsection{Recovery}
%% Like other log structured storage systems, \rowss recovery process is
%% inexpensive and straightforward. However, \rows does not attempt to
%% ensure that transactions are atomically committed to disk, and is not
%% meant to replace the master database's recovery log.
%% Instead, recovery occurs in two steps. Whenever \rows writes a tree
%% component to disk, it does so by beginning a new transaction in the
%% underlying transaction system. Next, it allocates
%% contiguous regions of disk pages (generating one log entry per
%% region), and performs a bulk load of the new tree into
%% these regions (this bulk load does not produce any log entries).
%% Then, \rows forces the tree's regions to disk, and writes the list
%% of regions used by the tree and the location of the tree's root to
%% normal (write ahead logged) records. Finally, it commits the
%% underlying transaction.
%% After the underlying transaction system completes recovery, \rows
%% will have a set of intact and complete tree components. Space taken
%% up by partially written trees was allocated by an aborted
%% transaction, and has been reclaimed by the transaction system's
%% recovery mechanism. After the underlying recovery mechanisms
%% complete, \rows reads the last committed timestamp from the LSM-tree
%% header, and begins playback of the replication log at the appropriate
%% position. Upon committing new components to disk, \rows allows the
%% appropriate portion of the replication log to be truncated.
\section{Row compression}
\begin{table}
\caption{Compression ratios and index overhead - five columns (20 bytes/column)}
\centering
\label{table:treeCreation}
\begin{tabular}{|l|c|c|c|} \hline
Format & Compression & Page count \\ \hline %& Throughput\\ \hline
PFOR & 1.96x & 2494 \\ \hline %& 133.4 MB/s \\ \hline
PFOR + tree & 1.94x & +80 \\ \hline %& 129.8 MB/s \\ \hline
RLE & 3.24x & 1505 \\ \hline %& 150.6 MB/s \\ \hline
RLE + tree & 3.22x & +21 \\ %& 148.4 MB/s \\
\hline\end{tabular}
\end{table}
\begin{table}
\caption{Compression ratios and index overhead - 100 columns (400 bytes/column)}
\centering
\label{table:treeCreationTwo}
\begin{tabular}{|l|c|c|c|} \hline
Format & Compression & Page count \\ \hline %& Throughput\\ \hline
PFOR & 1.37x & 7143 \\ \hline %& 133.4 MB/s \\ \hline
PFOR + tree & 1.17x & 8335 \\ \hline %& 129.8 MB/s \\ \hline
RLE & 1.75x & 5591 \\ \hline %& 150.6 MB/s \\ \hline
RLE + tree & 1.50x & 6525 \\ %& 148.4 MB/s \\
\hline\end{tabular}
\end{table}
\rows stores tuples in a sorted, append-only format. This
simplifies compression and provides a number of new opportunities for
optimization. Compression reduces sequential I/O, which is \rowss primary bottleneck.
It also
increases the effective size of the buffer pool, allowing \rows to
service larger read sets without resorting to random I/O.
Row-oriented database compression techniques must cope with random,
in-place updates and provide efficient random access to compressed
tuples. In contrast, compressed column-oriented database layouts
focus on high-throughput sequential access, and do not provide in-place
updates or efficient random access. \rows never updates data in
place, allowing it to use append-only compression techniques
from the column database literature. Also, \rowss tuples never span pages and
are stored in sorted order. We modified column compression techniques to provide an
index over each page's contents and efficient random access within
pages.
\rowss compression format is straightforward. Each page is divided into
a header, a set of compressed segments and a single exception section
(Figure~\ref{fig:mc-fmt}). There is one compressed segment per column
and each such segment may be managed by a different compression
algorithm, which we call a {\em compressor}. Some compressors cannot
directly store all values that a column may take on. Instead, they
store such values in the exception section. We call
\rowss compressed page format the {\em multicolumn} format, and have
implemented it to support efficient compression,
decompression and random lookups by slot id and by value.
\subsection{Compression algorithms}
The \rows prototype provides three compressors. The first, {\em NOP},
simply stores uncompressed integers. The second, {\em RLE},
implements run length encoding, which stores values as a list of
distinct values and repetition counts. The third, {\em PFOR}, or
patched frame of reference, stores values as a single per-page
base offset and an array of deltas~\cite{pfor}. The values are reconstructed by
adding the offset and the delta or by traversing a pointer into the
exception section of the page. Currently, \rows only supports integer
data although its page formats could be easily extended with support for strings and
other types. Also, the compression algorithms that we
implemented would benefit from a technique known as {\em bit-packing},
which represents integers with lengths other than 8, 16, 32 and 64
bits. Bit-packing would increase \rowss compression ratios,
and can be implemented with a modest performance overhead~\cite{pfor}.
%(XXX this paragraph makes no sense; it's been moved out of context...)
%We implemented two compression formats for \rowss multicolumn pages.
%The first is PFOR; the other is {\em run length encoding} (RLE), which
%stores values as a list of distinct values and repetition counts.
We chose these techniques because they are amenable to optimization;
our implementation makes heavy use of C++ templates, static code
generation and g++'s optimizer to keep \rows from becoming
CPU-bound. By hardcoding table formats at compilation time, this
implementation removes length and type checks, and allows the compiler
to inline methods, remove temporary variables and optimize loops.
\rows includes a second, less efficient implementation that uses
dynamic method dispatch to support runtime creation of new table
schemas. A production-quality system could use runtime code
generation to support creation of new schemas while taking full
advantage of the compiler's optimizer.
\subsection{Comparison with other approaches}
Like \rows, the PAX~\cite{PAX} page format stores tuples in a way that never
spans multiple pages and partitions data within pages by column.
Partitioning pages by column
improves processor cache locality for queries that do not need to
examine every column within a page. In particular, many queries
filter based on the value of a few attributes. If data is clustered
into rows then each column from the page will be brought into cache,
even if no tuples match. By clustering data into columns, PAX ensures
that only the necessary columns are brought into cache.
\rowss use of lightweight compression algorithms is similar to
approaches used in column databases~\cite{pfor}. \rows differs from
that work in its focus on small, low-latency updates and efficient random reads on commodity
hardware. Existing work targeted RAID and used page sizes ranging from 8-32MB. Rather
than use large pages to get good sequential I/O performance, \rows
relies upon automatic prefetch and write coalescing
provided by Stasis' buffer manager and the underlying operating
system. This reduces the amount of data handled by \rows during index
probes.
\subsection{Tuple lookups}
Because we expect index lookups to be a frequent operation, our
page format supports efficient lookup by tuple value. The original
``patched'' compression algorithms store exceptions in a linked list
and perform scans of up to 128 tuples in order to materialize compressed
tuples~\cite{pfor}. This reduces the number of branches
required during compression and sequential decompression. We
investigated a number of implementations of tuple lookup by value
including per-column range scans and binary search.
The efficiency of random access within a
page depends on the format used by column compressors. \rows
compressors support two access methods. The first looks up a value by
slot id. This operation is $O(1)$ for frame of reference columns and
$O(log~n)$ in the number of runs of identical values on the page for
run length encoded columns.
The second operation is used to look up tuples by value and is based
on the assumption that the the tuples are stored in the
page in sorted order. The simplest way to provide access to tuples by
value would be to perform a binary search, materializing each tuple
that the search needs to examine. Doing so would materialize many
extra column values, potentially performing additional binary searches.
To lookup a tuple by value, the second operation takes a range of slot
ids and a value, and returns the offset of the first and last instance
of the value within the range. This operation is $O(log~n)$ in the
number of slots in the range for frame of reference columns and
$O(log~n)$ in the number of runs on the page for run length encoded
columns. The multicolumn implementation uses this method to look up
tuples by beginning with the entire page in range and calling each
compressor's implementation to narrow the search until the
correct tuple(s) are located or the range is empty.
Partially matching tuples are only partially decompressed during the
search, reducing the amount of data brought into processor cache.
%(XXX future work:)
%
%Compressed \rows pages provide access to their data in three ways: via
%an iterator, by slot id, and by value. Page iterators store
%information about the next tuple to be returned, minimizing branches
%and guaranteeing O(1) access to the next value. \rowss page format is
%able to support the techniques of~\cite{XXX}, which provide vectors of
%tuples depending on the nature of the operator scanning over the page,
%but we chose not to implement this.
%
%To materialize a tuple based on its slot id (logical position on the
%page), we ask the compression algorithm responsible for each
%column to return the appropriate value. For some compression
%algorithms this is a simple array lookup, while for others (such as
%run length encoding) the procedure involves a binary search.
%
%\rows is also able to lookup tuples by value. Tuples are always
%stored in sorted order, first on the first column, then on the second
%and so on. The simplest way to look up a tuple by value would be to
%perform a binary search, materializing each tuple that the search
%needs to examine. Doing so would materialize many extra column
%values. Since these column values are compressed, each tuple
%materialization could involve multiple binary searches.
%
%Instead, \rowss compression implementations provide search methods
%that take a column value and a range of slot ids. To lookup a tuple,
%we first perform a search for the appropriate value on the entire
%first column. This returns a range of candidate tuple ids. Since each
%of these tuples has the same value in its first column, we know the
%second column is sorted within the range. Therefore, we can
%efficiently search the second column of the candidate range. We
%repeat this procedure until the range becomes empty (and no match is
%found), or until all columns have been matched. This reduces the
%number of operations that must be performed while looking up a tuple,
%and also reduces the amount of data brought into CPU cache.
%% More stuff ->
%\rows uses compression to improve performance. Conserving storage space is of
%%secondary concern. Sequential I/O throughput is \rowss primary
%replication and table scan bottleneck, and is proportional to the
%compression ratio. Compression increases the effective
%size of the buffer pool and therefore, the size of the largest read set
%\rows can service without resorting to random I/O.
%Because \rows never updates data in place, it
%is able to make use of read-only compression formats that cannot be
%efficiently applied to B-trees.
%% Disk heads are the primary
%% storage bottleneck for most OLTP environments, and disk capacity is of
%% secondary concern. Therefore, database compression is generally
%% performed to improve system performance, not capacity. In \rows,
%% sequential I/O throughput is the primary replication bottleneck; and
%% is proportional to the compression ratio.
%This allows us to service small updates using
%techniques from column-oriented databases. This is because, like column-oriented
%databases, \rows can provide sorted, projected data to bulk-loaded indexes.
%\rowss compression formats are based on our
%{\em multicolumn} compression format. In order to store data from
%an $N$ column table, we divide the page into $N+1$ variable length
%regions. $N$ of these regions each contain a compressed column. The
%remaining region contains ``exceptional'' column data (Figure~\ref{fig:mc-fmt}).
\begin{figure}
\centering \epsfig{file=multicolumn-page-format-2.pdf, width=3.3in}
\caption{Multicolumn page format. Many compression algorithms
can coexist on a single page. Tuples never span multiple pages.}
\label{fig:mc-fmt}
\end{figure}
%For example, a column might be encoded using the {\em frame of
% reference} (FOR) algorithm, which stores a column of integers as a
%single offset value and a list of deltas. When a value too different
%from the offset to be encoded as a delta is encountered, an offset
%into the exceptions region is stored. When applied to a page that
%stores data from a single column, the resulting algorithm is MonetDB's
%{\em patched frame of reference} (PFOR)~\cite{pfor}.
%\rowss multicolumn pages extend this idea by allowing multiple columns
%(each with its own compression algorithm) to coexist on each page and preventing tuples from spanning pages.
%This reduces the cost of reconstructing tuples during index lookups.
%This section discusses the computational and storage overhead of our
%multicolumn compression approach. We suspect this technique has been
%used in the past. However, this type of compression format is
%conspicuously absent in the recent literature, and represents a
%different set of tradeoffs than other current approaches.
\subsection{Multicolumn computational overhead}
%sears@davros:~/stasis/benchmarks$ ./rose -n 10 -p 0.01
%Compression scheme: Time trial (multiple engines)
%Page size: 4096
%P(bump): 0.010000
%Random seed: 0
%Column count: 10
%WARNING: Pstar will only use the first column.
%Compression scheme #pages ratio comp gb/s decom gb/s
%Pstar (For) 493 3.96x 0.544 2.982
%Multicolumn (For) 5129 3.81x 0.253 0.712
%Pstar (Rle) 40 48.83x 0.961 1.593
%Multicolumn (Rle) 416 46.95x 0.377 0.692
\begin{table}
\caption{Compressor throughput - Random data Mean of 5 runs, $\sigma<5\%$, except where noted}
\centering
\label{table:perf}
\begin{tabular}{|l|c|c|c|} \hline
Format & Ratio & Compress & Decompress\\ \hline %& Throughput\\ \hline
PFOR - 1 column & 3.96x & 547 mb/s & 2959 mb/s \\ \hline %& 133.4 MB/s \\ \hline
PFOR - 10 column & 3.86x & 256 & 719 \\ \hline %& 129.8 MB/s \\ \hline
RLE - 1 column & 48.83x & 960 & 1493 $(12\%)$ \\ \hline %& 150.6 MB/s \\ \hline
RLE - 10 column & 47.60x & 358 $(9\%)$ & 659 $(7\%)$ \\ %& 148.4 MB/s \\
\hline\end{tabular}
\end{table}
Our multicolumn page format introduces additional computational
overhead. \rows compresses each column in a separate buffer then uses
{\tt memcpy()} to gather this data into a single page buffer before
writing it to disk. This happens once per page allocation.
Bit-packing is typically implemented as a post-processing
technique that makes a pass over the compressed data~\cite{pfor}. \rowss
gathering step can be performed for free by such a bit-packing
implementation, so we do not see this as a significant disadvantage
compared to other approaches.
Also, \rows needs to translate tuple insertions into
calls to appropriate page formats and compression implementations.
Unless we hardcode the \rows executable to support a predefined set of
page formats and table schemas, each tuple compression and decompression operation must execute an extra {\tt for} loop
over the columns. The {\tt for} loop's body contains a {\tt switch} statement that chooses between column compressors, since each column can use a different compression algorithm and store a different data type.
This form of multicolumn support introduces significant overhead;
these variants of our compression algorithms run significantly slower
than versions hard-coded to work with single column data.
Table~\ref{table:perf} compares a fixed-format single column page
layout with \rowss dynamically dispatched (not custom generated code)
multicolumn format.
% explain how append works
\subsection{The {\tt \large append()} operation}
\rowss compressed pages provide a {\tt tupleAppend()} operation that
takes a tuple as input and returns {\tt false} if the page does not have
room for the new tuple. {\tt tupleAppend()} consists of a dispatch
routine that calls {\tt append()} on each column in turn.
%Each
%column's {\tt append()} routine secures storage space for the column
%value, or returns {\tt false} if no space is available.
{\tt append()} has the
following signature:
%\begin{quote}
\begin{verbatim}
void append(COL_TYPE value, int* exception_offset,
void* exceptions_base, void* column_base,
int* freespace)
\end{verbatim}
%\end{quote}
where {\tt value} is the value to be appended to the column, {\tt
exception\_offset} is a pointer to the offset of the first free byte in the
exceptions region, and {\tt exceptions\_base} and {\tt column\_base} point
to page-sized buffers used to store exceptions and column data as
the page is being written. One copy of these buffers exists for
each LSM-tree component; they do not significantly increase \rowss memory
requirements.
{\tt freespace} is a pointer to the number of free bytes
remaining on the page. The multicolumn format initializes these
values when the page is allocated. As {\tt append()} implementations
are called they update this data accordingly. Each time a column
value is written to a page the column compressor must allocate
space to store the new value. In a naive allocation approach
the compressor would check {\tt freespace} to decide if the page is full
and return an error code otherwise. Then its caller would check the
return value and behave appropriately. This would lead to two branches
per column value, greatly decreasing compression throughput.
We avoid these branches by reserving a portion of the page approximately the
size of a single incompressible tuple for speculative allocation.
Instead of checking for freespace, compressors decrement the current
freespace count and append data to the end of their segment.
Once an entire tuple has been written to a page,
it checks the value of freespace and decides if another tuple may be
safely written. This also simplifies corner cases; if a
compressor cannot store more tuples on a page, but has not run out of
space\footnote{This can happen when a run length encoded page stores a
very long run of identical tuples.} it simply sets {\tt
freespace} to -1 and returns.
%% Initially, our multicolumn module managed these values
%% and the exception space. This led to extra arithmetic operations and
%% conditionals and did not significantly simplify the code. Note that,
%% compared to techniques that store each tuple contiguously on the page,
%% our format avoids encoding the (variable) length of each tuple; instead
%% it encodes the length of each column.
% contrast with prior work
The original PFOR implementation~\cite{pfor} assumes it has access to
a buffer of uncompressed data and is able to make multiple
passes over the data during compression. This allows it to remove
branches from loop bodies, improving compression throughput. We opted
to avoid this approach in \rows because it would increase the complexity
of the {\tt append()} interface and add a buffer to \rowss merge threads.
%% \subsection{Static code generation}
%% % discuss templatization of code
%% After evaluating the performance of a C implementation of \rowss
%% compression routines, we decided to rewrite the compression routines
%% as C++ templates. C++ template instantiation performs compile-time
%% macro substitutions. We declare all functions {\tt inline}, and place
%% them in header files (rather than separate compilation units). This
%% gives g++ the opportunity to perform optimizations such as
%% cross-module constant propagation and branch elimination. It also
%% allows us to write code that deals with integer data types instead of
%% void pointers without duplicating code or breaking encapsulation.
%% Such optimizations are possible in C, but, because of limitations of
%% the preprocessor, would be difficult to express or require separate
%% code-generation utilities. We found that this set of optimizations
%% improved compression and decompression performance by roughly an order
%% of magnitude. Although compressor throughput varies with data distributions
%% and type, optimizations yield a similar performance improvement across
%% varied datasets and random data distributions.
%% We performed one additional set of optimizations. Rather than
%% instantiate each compressor template once for each column type at
%% compile time, we instantiate a multicolumn page format template for
%% each page format we wish to support. This removes the {\tt for} loop
%% and {\tt switch} statement that supporting multiple columns per page
%% introduced, but hardcodes page schemas at compile time.
%% The two approaches could coexist in a single runtime environment,
%% allowing the use of hardcoded implementations for performance critical
%% tables, while falling back on slower, general purpose implementations
%% for previously unseen table layouts.
\subsection{Buffer manager interface extensions}
In the process of implementing \rows, we added a new API to
Stasis' buffer manager implementation. It consists of four
functions: {\tt pageLoaded()}, {\tt pageFlushed()}, {\tt pageEvicted()},
and {\tt cleanupPage()}.
Stasis supports custom page layouts. These %Custom page
layouts control the byte level format of pages and must register
callbacks that will be invoked by Stasis at appropriate times. The
first three are invoked by the buffer manager when it loads an
existing page from disk, writes a page to disk, and evicts a page
from memory.
The fourth is invoked by page allocation
routines immediately before a page is reformatted to use a different
layout. This allows the page's old layout's implementation to
free any in-memory resources that it associated with the page during
initialization or when {\tt pageLoaded()} was called.
% a preexisting, conventional database buffer manager. Each
%page contains an LSN (which is largely unused, as we bulk-load \rowss
%trees) and a page implementation number. This allows it to coexist
%with conventional write ahead logging mechanisms. As mentioned above,
%this greatly simplifies crash recovery without introducing significant
%logging overhead.
%Memory resident pages are stored in a
%hashtable keyed by page number, and replaced using an LRU
%strategy\footnote{LRU is a particularly poor choice, given that \rowss
% I/O is dominated by large table scans. Eventually, we hope to add
% support for explicit eviction of pages read by the merge processes.}.
%In implementing \rows, we made use of a number of generally useful
%callbacks that are of particular interest to \rows and other database
%compression schemes. The first,
%{\tt pageLoaded()} instantiates a new
%multicolumn page implementation when the page is first read into
%memory. The second, {\tt pageFlushed()} informs our multicolumn
%implementation that the page is about to be written to disk, and the
%third {\tt pageEvicted()} invokes the multicolumn destructor.
We register implementations for these functions because
Stasis maintains background threads that control eviction
of \rowss pages from memory. As we mentioned above, multicolumn pages
are split into a number of temporary buffers while they are being
created and are then packed into a contiguous buffer before being
flushed. Multicolumn's {\tt pageFlushed()} callback guarantees
that this happens before the page is written to disk. {\tt pageLoaded()}
parses the page headers and associates statically generated
compression code with each page as it is read into memory.
{\tt pageEvicted()} and {\tt cleanupPage()} free memory that is
allocated by {\tt pageLoaded()}.
%Registering these callbacks provides an extra
%benefit; we parse the page headers, calculate offsets,
%and choose optimized compression routines when a page is read from
%disk instead of each time we access it.
%Although this operation is
%expensive, it does present an opportunity for parallelism. \rows
%provides a per-page operation, {\tt pack()} that performs the
%translation. We can register {\tt pack()} as a {\tt pageFlushed()}
%callback or we can explicitly call it during (or shortly after)
%compression.
{\tt pageFlushed()} could be safely executed in a background thread
with minimal impact on system performance. However, the buffer
manager was written under the assumption that the cost of in-memory
operations is negligible. Therefore, it blocks all buffer management
requests while {\tt pageFlushed()} is being executed. In practice,
calling {\tt pack()} from {\tt pageFlushed()} would block multiple
\rows threads.
Also, {\tt pack()} reduces \rowss memory utilization by freeing up
temporary compression buffers.
% Delaying its execution for too long
%might allow this memory to be evicted from processor cache before the
%{\tt memcpy()} can occur.
For these reasons, the merge threads
explicitly invoke {\tt pack()} instead of waiting for {\tt
pageFlushed()} to be called.
%The second interface we added to Stasis' buffer manager is a function
%called {\tt forceRange()}. This function forces a portion of the page
%file to disk before returning, and is used to ensure that \rowss tree
%components reach disk during commit.
\subsection{Storage overhead}
The multicolumn page format is similar to the format of existing
column-wise compression formats. The algorithms we implemented have
page formats that can be divided into two sections.
The first section is a header that contains an encoding of the size of
the compressed region and perhaps a piece of uncompressed data. The second section
typically contains the compressed data.
A multicolumn page contains this information in addition to metadata
describing the position and type of each column. The type and number
of columns could be encoded in the ``page type'' field or be
explicitly represented using a few bytes per page column. If we use
16 bits to represent the page offset and 16 bits for the column
compressor type then the additional overhead for each
column is four bytes plus the size of the compression format headers.
A frame of reference column header consists of a single uncompressed
value and two bytes to record the number of encoded rows. Run
length encoding headers consist of a two byte count of compressed
blocks. Therefore, in the worst case (frame of reference encoding
64-bit integers, and 4KB pages) our prototype's multicolumn format
uses 14 bytes, or 0.35\% of the page to store each column header.
Bit-packing and
storing a table that maps page types to lists of column and compressor
types would reduce the size of the page headers.
In cases where the data does not compress well and tuples are large, additional
storage is wasted because \rows does not split tuples across
pages. Table~\ref{table:treeCreationTwo} illustrates this; it was
generated in the same way as Table~\ref{table:treeCreation}, except
that 400 byte tuples were used instead of 20 byte tuples. Larger
pages would reduce the impact of this problem. We chose 4KB pages
because they are large enough for the schemas we use to benchmark
\rows, but small enough to illustrate the overheads of our page
format.
% XXX graph of some sort to show this?
%% Breaking pages into smaller compressed blocks changes the compression
%% ratio in another way; the compressibility of the data varies with the
%% size of each compressed block. For example, when frame of reference
%% is applied to sorted data, incoming values eventually drift too far
%% from the page offset, causing them to be stored as exceptional values.
%% Therefore (neglecting header bytes), smaller frame of reference blocks
%% provide higher compression ratios.
%% Of course, conventional compression algorithms are free to divide
%% their compressed data into blocks to maximize compression ratios.
%% Although \rowss smaller compressed block size benefits some
%% compression implementations (and does not adversely impact either of
%% the algorithms we implemented), it creates an additional constraint,
%% and may interact poorly with some compression algorithms.
%\subsection{Supporting Random Access}
%% and that our binary searches
%% within a column should have better cache locality than searches of
%% row-oriented page layouts.
%% We have not examined the tradeoffs between different implementations
%% of tuple lookups. Currently, rather than using binary search to find
%% the boundaries of each range, our compressors simply iterate over the
%% compressed representation of the data in order to progressively narrow
%% the range of tuples to be considered. It is possible that (because of
%% expensive branch mispredictions and \rowss small pages) that our
%% linear search implementation will outperform approaches based upon
%% binary search.
\section{Evaluation}
\subsection{Raw write throughput}
\begin{table}
\caption{Weather data schema}
\centering
\label{tab:schema}
\begin{tabular}{|l|c|c|} \hline
Column Name & Compression Format & Key \\ \hline
Longitude & RLE & * \\ \hline
Latitude & RLE & * \\\hline
Timestamp & PFOR & * \\\hline
Weather conditions& RLE & \\\hline
Station ID & RLE & \\\hline
Elevation & RLE & \\\hline
Temperature & PFOR & \\\hline
Wind Direction & PFOR & \\\hline
Wind Speed & PFOR & \\\hline
Wind Gust Speed & RLE & \\
\hline\end{tabular}
\end{table}
To evaluate \rowss raw write throughput, we used it to index
weather data. The data set ranges from May 1,
2007 to Nov 2, 2007 and contains readings from ground stations around
the world~\cite{nssl}. Our implementation assumes these values will never be deleted or modified. Therefore, for this experiment the tree merging threads do not perform versioning or snapshotting. This data is approximately $1.3GB$ when stored in an
uncompressed tab delimited file. We duplicated the data by changing
the date fields to cover ranges from 2001 to 2009, producing a 12GB
ASCII dataset that contains approximately 132 million tuples.
Duplicating the data should have a limited effect on \rowss
compression ratios. We index on geographic position, placing
all readings from a particular station in a contiguous range. We then
index on date, separating duplicate versions of the same tuple
from each other.
\rows only supports integer data types. We store ASCII columns for this benchmark by
packing each character into 5 bits (the strings consist of
A-Z, ``+,'' ``-'' and ``*'') and storing them as integers. Floating point columns in
the original data set are always represented with two digits of precision;
we multiply them by 100, yielding an integer. The data source uses
nonsensical readings such as -9999.00 to represent NULL. Our
prototype does not understand NULL, so we leave these fields intact.
We represent each integer column as a 32-bit integer, even when we could use a 16-bit value.
The ``weather conditions'' field is packed into a
64-bit integer. Table~\ref{tab:schema} lists the columns and
compression algorithms we assigned to each column. The ``Key'' column indicates
that the field was used as part of a B-tree primary key.
%InnoDB performance tuning guides suggest limiting the length of the
%table's primary key. \rows does not support this optimization, so we
%indexed the \rows table on all columns.
%\rows targets seek limited applications; we assign a (single) random
%order to the tuples, and insert them in this order.
In this experiment, we randomized the order of the tuples and inserted
them into the index. We compare \rowss performance with the MySQL
InnoDB storage engine. We chose InnoDB because it has been tuned for
bulk load performance, and avoid the overhead of SQL insert
statements and MySQL transactions by using MySQL's bulk load
interface. Data is loaded 100,000 tuples at a time so that
MySQL inserts values into its tree throughout the run and does not
sort and bulk load the data all at once.
%If we did not do this, MySQL would simply sort the tuples, and then
%bulk load the index. This behavior is unacceptable in low-latency
%environments. Breaking the bulk load into multiple chunks
%forces MySQL to make intermediate results available as the bulk load
%proceeds\footnote{MySQL's {\tt concurrent} keyword allows access to
% {\em existing} data during a bulk load; new data is still exposed
% atomically.}.
InnoDB's buffer pool size was 2GB and its log file size was 1GB.
We enabled InnoDB's doublewrite buffer, which writes a copy of each updated
page to a sequential log. The doublewrite buffer increases the total amount of
I/O performed, but decreases the frequency with
which InnoDB calls {\tt fsync()} while writing dirty pages to disk. This
increases replication throughput for this workload.
We compiled \rowss C components with ``-O2'', and the C++ components
with ``-O3''. The later compiler flag is crucial, as compiler
inlining and other optimizations improve \rowss compression throughput
significantly. \rows was set to allocate $1GB$ to $C0$ and another
$1GB$ to its buffer pool. In this experiment, \rowss buffer pool is
essentially wasted once its page file size exceeds 1GB; \rows
accesses the page file sequentially and evicts pages using LRU,
leading to a cache hit rate near zero.
Our test hardware has two dual core 64-bit 3GHz Xeon processors with
2MB of cache (Linux reports 4 CPUs) and 8GB of RAM. We disabled the
swap file and unnecessary system services. Datasets large enough to
become disk bound on this system are unwieldy, so we {\tt mlock()} 5.25GB of
RAM to prevent it from being used by experiments.
The remaining 750MB is used to cache
binaries and to provide Linux with enough page cache to prevent it
from unexpectedly evicting portions of the \rows binary. We monitored
\rows throughout the experiment, confirming that its resident memory
size was approximately 2GB.
All software used during our tests was compiled for 64-bit
architectures. We used a 64-bit Ubuntu Gutsy (Linux
2.6.22-14-generic) installation and its prebuilt MySQL package
(5.0.45-Debian\_1ubuntu3). All page files and logs
were stored on a dedicated drive. The operating system, executables
and input files were stored on another drive.
\subsection{Comparison with conventional techniques}
\begin{figure}
\centering \epsfig{file=average-throughput.pdf, width=3.32in}
\caption{Mean insertion throughput (log-log). We truncated InnoDB's
trend line; throughput was 20kb/s ($\frac{1}{57}$th of \rowss)
after 50 million insertions.}
\label{fig:avg-thru}
\end{figure}
\rows provides roughly 4.7 times more throughput than InnoDB at the
beginning of the experiment~(Figure~\ref{fig:avg-thru}). InnoDB's
throughput remains constant until it starts to perform random
I/O, which causes throughput to drop sharply.
\rowss performance begins to fall off earlier due to merging and
because its page cache is half the size of InnoDB's. However, \rows does
not fall back on random I/O and maintains significantly higher
throughput than InnoDB throughout the run. InnoDB's peak write
throughput was 1.8 mb/s and dropped to 20 kb/s after 50 million tuples
were inserted. The InnoDB trend line in Figure~\ref{fig:avg-thru} has
been truncated for readability. In contrast, \rows maintained an
average throughput of 1.13 mb/sec over the entire 132 million tuple
dataset.
\rows merged $C0$ and $C1$ 59 times and merged $C1$ and $C2$ 15 times.
At the end of the run (132 million tuple insertions) $C2$ took up
2.8GB and $C1$ was 250MB. The actual page
file was 8.0GB, and the minimum possible size was 6GB. InnoDB used
5.3GB after 53 million tuple insertions.
%\begin{figure}
%\centering
%\epsfig{file=instantaneous-throughput.pdf, width=3.33in}
%\caption{Instantaneous insertion throughput (average over 100,000 tuple windows).}
%\label{fig:inst-thru}
%\end{figure}
%\begin{figure}
%\centering
%\epsfig{file=instantaneous-ms-tup.pdf, width=3.33in}
%\caption{Instantaneous tuple insertion time (average over 100,000 tuple windows).}
%\end{figure}
\subsection{Comparison with analytical model}
In this section we measure update latency and compare
measured write throughput with the analytical model's predicted
throughput.
Figure~\ref{fig:avg-tup} shows tuple insertion times for \rows and InnoDB.
The ``\rows (instantaneous)'' line reports insertion times
averaged over 100,000 insertions, while the other lines are averaged
over the entire run.
The periodic spikes in instantaneous tuple
insertion times occur when an insertion blocks waiting
for a tree merge to complete. This happens when one copy of $C0$ is
full and the other one is being merged with $C1$. Admission control
would provide consistent insertion times.
\begin{figure}
\centering
\epsfig{file=average-ms-tup.pdf, width=3.32in}
\caption{Tuple insertion time (log). ``Instantaneous'' is the mean over 100,000
tuples. \rowss cost increases with $\sqrt{n}$; InnoDB's increases linearly.}
\label{fig:avg-tup}
\end{figure}
% First, $C0$ accepts insertions at a much
%greater rate than $C1$ or $C2$ can accept them. Over 100,000 tuples
%fit in memory, so multiple samples are taken before each new $C0$
%component blocks on the disk bound mergers. Second, \rows merges
%entire trees at once, occasionally blocking smaller components
%for long periods of time while larger components complete a merge
%step. Both of these problems could be masked by rate limiting the
%updates presented to \rows.
%This paper has mentioned a number of limitations in our prototype
%implementation.
Figure~\ref{fig:4R} compares our prototype's performance to that of an
ideal uncompressed LSM-tree. The cost of an insertion is $4R$ times
the number of bytes written. We can approximate an optimal value for
$R$ by taking $\sqrt{\frac{|C2|}{|C0|}}$, where tree component size is
measured in number of tuples. This factors out compression and memory
overheads associated with $C0$.
We use this value to
generate Figure~\ref{fig:4R} by multiplying \rowss replication
throughput by $1 + 4R$. The additional $1$ is the cost of reading the
inserted tuple from $C1$ during the merge with $C2$.
\rows achieves 2x compression on the real data set. For comparison,
we ran the experiment with compression disabled, and with
run length encoded synthetic datasets that lead to 4x and 8x compression
ratios. Initially, all runs exceed optimal throughput as they
populate memory and produce the initial versions of $C1$ and $C2$.
Eventually, each run converges to a constant effective disk
utilization roughly proportional to its compression ratio. This shows
that \rows continues to be I/O bound with higher compression ratios.
The analytical model predicts that, given the hard drive's 57.5~mb/s
write bandwidth, an ideal \rows implementation would perform about
twice as fast as our prototype. We believe the difference is largely
due to Stasis' buffer manager, which delivers between 22 and 47 mb/s
of sequential bandwidth on our test hardware. The remaining
difference in throughput is probably due to \rowss imperfect
overlapping of computation with I/O and coarse-grained control over
component sizes and R.
Despite these limitations, compression allows \rows to deliver significantly
higher throughput than would be possible with an uncompressed LSM-tree. We expect
\rowss throughput to increase with the size of RAM and storage
bandwidth for the foreseeable future.
%Our compressed runs suggest that \rows is
%I/O bound throughout our tests, ruling out compression and other cpu overheads. We stopped running the
%comparison at 8x because \rows currently requires tuples to be unique;
%to ensure uniqueness we use one column of each tuple to store a unique
%value. Neither FOR nor RLE compress this value particularly well.
%Since our schema only has ten columns, the maximum achievable
%compression ratio is approximately 10x.
%% A number of factors contribute to the discrepancy between our model
%% and our prototype's performance. First, the prototype's
%% whole-tree-at-a-time approach to merging forces us to make extremely
%% coarse and infrequent runtime adjustments to the ratios between tree
%% components. This prevents \rows from reliably keeping the ratios near
%% the current target value for $R$. Second, \rows currently
%% synchronously forces tree components to disk. Given our large buffer
%% pool, a significant fraction of each new tree component is in the
%% buffer pool or operating system cache when the merge thread forces it
%% to disk. This prevents \rows from overlapping I/O with computation.
%% Finally, our analytical model neglects some minor sources of storage
%% overhead.
%One other factor significantly limits our prototype's performance.
\begin{figure}
\centering
\epsfig{file=4R-throughput.pdf, width=3.32in}
\caption{Disk bandwidth an uncompressed LSM-tree would
require to match \rowss throughput. The buffer manager provides
22-45 mb/s.}
\label{fig:4R}
\end{figure}
\subsection{TPC-C / H}
TPC-H is an analytical processing benchmark that targets periodically
bulk-loaded data warehousing systems. In particular, compared to
TPC-C, it de-emphasizes transaction processing and rollback and
allows database vendors to permute the dataset off-line. In real-time
database replication environments faithful reproduction of
transaction processing schedules is important and there is no
opportunity to sort data before making it available to queries.
Therefore, we insert data in chronological order.
Our \rows prototype is far from a complete storage engine
implementation. Rather than implement relational algebra operations
and attempt to process and appropriately optimize SQL queries on top
of \rows, we chose a small subset of the TPC-H and C benchmark
queries and wrote custom code to invoke appropriate \rows tuple
modifications, table scans and index lookup requests. For simplicity,
updates and queries are performed by a single thread.
When modifying TPC-H and C for our experiments, we follow an existing
approach~\cite{entropy,bitsForChronos} and start with a pre-computed
join and projection of the TPC-H dataset. We use the schema described
in Table~\ref{tab:tpc-schema}, and populate the table by using a scale
factor of 30 and following the random distributions dictated by the
TPC-H specification. The schema for this experiment is designed to
have poor update locality.
Updates from customers are grouped by
order id, but the index is sorted by product and date.
This forces the database to permute these updates
into an order that would provide suppliers with
% more interesting to suppliers
%the index is sorted by
%product and date,
inexpensive access to lists of orders to
be filled and historical sales information for each product.
We generate a dataset containing a list of product orders, and insert
tuples for each order (one for each part number in the order), then
add a number of extra transactions. The following updates are applied in chronological order:
\begin{itemize}
\item Following TPC-C's lead, 1\% of orders are immediately cancelled
and rolled back. This is handled by inserting a tombstone for the
order.
\item Remaining orders are delivered in full within the next 14 days.
The order completion time is chosen uniformly at random between 0
and 14 days after the order was placed.
\item The status of each line item is changed to ``delivered'' at a time
chosen uniformly at random before the order completion time.
\end{itemize}
The following read-only transactions measure the performance of \rowss
access methods:
\begin{itemize}
\item Every 100,000 orders we initiate a table scan over the entire
data set. The per-order cost of this scan is proportional to the
number of orders processed so far.
\item 50\% of orders are checked with order status queries. These are
simply index probes that lookup a single line item (tuple) from the
order. Orders that are checked with status queries are checked 1, 2
or 3 times with equal probability. Line items are chosen randomly
with equal probability.
\item Order status queries happen with a uniform random delay of 1.3
times the order processing time. For example, if an order
is fully delivered 10 days after it is placed, then order status queries are
timed uniformly at random within the 13 days after the order is
placed.
\end{itemize}
The script that we used to generate our dataset is publicly available,
along with Stasis' and \rowss source code (Section~\ref{sec:avail}).
This dataset is not easily compressible using the algorithms provided
by \rows. Many columns fit in a single byte, rendering \rowss version
of PFOR useless. These fields change frequently enough to limit the
effectiveness of run length encoding. Both of these issues would be
addressed by bit packing. Also, occasionally re-evaluating and modifying
compression strategies is known to improve compression of TPC-H data.
TPC-H dates are clustered during weekdays, from 1995-2005, and around
Mother's Day and the last few weeks of each year.
\begin{table}
\caption{TPC-C/H schema}
\centering
\label{tab:tpc-schema}
\begin{tabular}{|l|c|c|} \hline
Column Name & Compression Format & Data type\\ \hline
Part \# + Supplier & RLE & int32 \\ \hline
Year & RLE & int16 \\\hline
Week & NONE & int8 \\\hline
Day of week & NONE & int8 \\\hline
Order number & NONE & int64 \\\hline
Quantity & NONE & int8 \\\hline
Delivery Status & RLE & int8 \\\hline
\end{tabular}
\end{table}
\begin{figure}
\centering \epsfig{file=query-graph.pdf, width=3.32in}
\caption{\rows TPC-C/H order throughput}
\label{fig:tpch}
\end{figure}
%% We generated a dataset based on this schema then
%% added transaction rollbacks, line item delivery transactions and
%% order status queries. Every 100,000 orders we initiate a table scan
%% over the entire dataset. Following TPC-C's lead, 1\% of new orders are immediately cancelled
%% and rolled back; the remainder are delivered in full within the
%% next 14 days. We choose an order completion time uniformly at
%% random within the next 14 days then choose part delivery times
%% uniformly at random within that range.
%% We decided that $50\%$ of orders would be checked
%% using order status queries; the number of status queries for such
%% transactions was chosen uniformly at random from one to four, inclusive.
%% Order status queries happen with a uniform random delay of up to 1.3 times
%% the order processing time (if an order takes 10 days
%% to arrive then we perform order status queries within the 13 day
%% period after the order was initiated).
Order status queries have excellent temporal locality and generally
succeed after accessing $C0$. These queries simply increase the
amount of CPU time between tuple insertions and have minimal impact on
replication throughput. \rows overlaps their processing with the
asynchronous I/O performed by merges.
We force \rows to become seek bound by running a second set of
experiments with a different version of the order status query. In one set
of experiments, which we call ``Lookup C0,'' the order status query
only examines $C0$. In the other, which we call ``Lookup all
components,'' we force each order status query to examine every tree
component. This keeps \rows from exploiting the fact that most order
status queries can be serviced from $C0$. Finally, \rows provides
versioning for this test; though its garbage collection code is
executed, it never collects overwritten or deleted tuples.
%% The other type of query we process is a table scan that could be used
%% to track the popularity of each part over time. We know that \rowss
%% replication throughput is significantly lower than its sequential
%% table scan throughput, so we expect to see good scan performance for
%% this query. However, these sequential scans compete with merge
%% processes for I/O bandwidth, so we expect them to have a measurable impact on
%% replication throughput.
Figure~\ref{fig:tpch} plots the number of orders processed by \rows
per second against the total number of orders stored in the \rows
replica. For this experiment, we configure \rows to reserve 1GB for
the page cache and 2GB for $C0$. We {\tt mlock()} 4.5GB of RAM, leaving
500MB for the kernel, system services, and Linux's page cache.
%% In order to
%% characterize query performance, we re-ran the experiment with various
%% read-only queries disabled. In the figure, ``All queries'' line contains all
%% the queries mentioned above, while ``None'' only contains updates.
%% The ``Lookup: All components'' line plots the performance of the \rows
%% replica when performing index probes that access each tree component,
%% while ``Lookup: C0'' measures performance when the index probes match
%% data in $C0$.
The cost of searching $C0$ is negligible, while randomly
accessing the larger components is quite expensive. The overhead of
index scans increases as the table increases in size, leading to a
continuous downward slope throughout runs that perform scans.
Surprisingly, periodic table scans improve lookup
performance for $C1$ and $C2$. The effect is most pronounced after
3 million orders are processed. That is approximately
when Stasis' page file exceeds the size of the buffer pool, which is
managed using LRU. After each merge, half the pages it read
become obsolete. Index scans rapidly replace these pages with live
data using sequential I/O. This increases the likelihood that index
probes will be serviced from memory. A more sophisticated page
replacement policy would further improve performance by evicting
obsolete pages before accessible pages.
We use InnoDB to measure the cost of performing B-tree style updates.
In this experiment, we limit InnoDB's page cache to $1GB$, and {\tt mlock()} $6GB$
of RAM. InnoDB does not perform any order status queries or scans (Figure~\ref{fig:tpch-innodb}).
Although \rows has a total of $3GB$ for this
experiment, it only uses $1GB$ for queries, so this setup causes both
systems to begin performing random I/O at the same time.
InnoDB's performance degrades rapidly once it begins using random I/O
to update its B-tree. This is because the sort order of its B-tree
does not match the ordering of the updates it processes. Ignoring
repeated accesses to the same tuple within the same order, InnoDB
performs two B-tree operations per line item (a read and a write),
leading to many page accesses per order.
In contrast, \rows performs an index probe 50\% of the time. \rowss
index probes read $C1$ and $C2$, so it accesses one page per order on
average. However, by the time the experiment concludes, pages in $C1$
are accessed R times more often ($\sim6.6$) than those in $C2$, and
the page file is 3.9GB. This allows \rows to keep $C1$ cached in
memory, so each order uses approximately half a disk seek. At larger
scale factors, \rowss access time should double, but remain well
below the time a B-tree would spend applying updates.
After terminating the InnoDB run, we allowed MySQL to quiesce, then
performed an index scan over the table. At this point, it had
processed 11.8 million orders, and index scans took 19 minutes and 20
seconds. Had we performed index scans during the InnoDB run this
would translate to an 11.5 ms cost per order. The overhead of \rows
scans at that point in the run was approximately 2.2 ms per order.
%The number of parts and concurrent orders increase with TPC-H's scale
%factor. In the worst case, \rows keeps 2-3 data pages in memory for
%each order status query, while InnoDB only keeps one. \rows pages are
%4KB and InnoDB's are 16KB. Furthermore, $C1$ is smaller than $C2$;
%its pages are more likely to contain multiple outstanding orders.
%
%Therefore, \rows has a smaller working set size than InnoDB. By the
%time we terminated the experiment, InnoDB had processed approximately
%11 million orders, and was performing approximately 3.5 page file
%accesses per order. Over 100,000 new orders may be placed in the time it
%%takes to for an order to be completed. This means that InnoDB would dirty approximately 100,000 * 16KB = 1.6GB of pages before re-accessing the tuples created by the order.
%Eventually $1GB$ will not be able to store this working set,
%and both systems will begin performing disk seeks during order status
%queries. InnoDB will also perform seeks when marking line items
%delivered.
%As scale size increases further, the internal tree nodes will no
%longer fit in memory. At this point, InnoDB's larger page size will
%be a significant advantage, as its tree probes will be less expensive
%than \rowss. We could avoid this problem by using large pages for
%internal nodes.
\begin{figure}
\centering \epsfig{file=query-innodb.pdf, width=3.32in}
\caption{\rows and InnoDB TPC-C/H order times, averaged starting after
3.3 million orders. Drive access time is $\sim$13.5ms. Reading
data during updates dominates the cost of queries.}
\label{fig:tpch-innodb}
\end{figure}
%% Finally, note that the analytical model's predicted throughput
%% increases with \rowss compression ratio. Sophisticated, high-speed
%% compression routines achieve 4-32x compression ratios on TPC-H data,
%% while \rowss simplistic compression routines provide approximately 2x
%% compression. Given ample processing power, these algorithms should
%% improve \rowss throughput by four to sixteen times. XXX check cited ratios
%% Our performance figures show that \rows significantly outperforms a
%% popular, production quality B-tree implementation. Our experiments
%% reveal a number of deficiencies in our prototype implementation,
%% suggesting that further implementation efforts would improve its
%% performance significantly. Finally, though our prototype could be
%% improved, it already performs at roughly $\frac{1}{4}$th of its ideal
%% throughput. Our analytical models suggest that it will significantly
%% outperform any B-tree implementation when applied to appropriate
%% update workloads.
\section{Related Work}
\subsection{LSM-trees}
The original LSM-tree work~\cite{lsm} provides a more detailed
analytical model than the one presented above. It focuses on update
intensive OLTP (TPC-A) workloads and hardware provisioning for steady
state workloads.
LHAM is an adaptation of LSM-trees for hierarchical storage
systems~\cite{lham}. It stores higher numbered
components on archival media such as CD-R or tape drives, and
scales LSM-trees beyond the capacity of high-performance storage
media. It also supports efficient time travel queries over archived, historical
data~\cite{lham}.
Partitioned exponential files are similar to LSM-trees, except that
they range partition data into smaller indices~\cite{partexp}. This solves a number
of issues that are left unaddressed by \rows, most notably
skewed update patterns and merge storage
overhead.
\rows is optimized for uniform random insertion patterns
and does not attempt to take advantage of skew in the replication
workload. If most updates are to a particular partition then
partitioned exponential files merge that partition frequently,
skipping merges of unmodified partitions.
Also, smaller merges duplicate smaller components, wasting less space.
Multiple merges can run in parallel, improving concurrency.
Partitioning a \rows
replica into multiple LSM-trees would enable similar optimizations.
Partitioned exponential files avoid a few other pitfalls of LSM-tree
implementations. \rows addresses these problems in different ways.
One issue is page file fragmentation. Partitioned exponential files
make use of maintenance tasks to move partitions, ensuring that there
is enough contiguous space to hold a new partition. \rows avoids this
problem by allowing tree components to become fragmented. It does
this by using Stasis to allocate contiguous regions of pages that
are long enough to guarantee good sequential scan performance.
\rows always allocates regions of the same length, guaranteeing that
Stasis can reuse all freed regions before extending the page file.
This can waste nearly an entire region per component, which does not
matter in \rows, but could be significant to systems with
many small partitions.
Some LSM-tree implementations do not support concurrent insertions,
merges and queries. This causes such implementations to block during
merges that take $O(n)$ time in the tree size.
\rows overlaps insertions and queries with tree merges. Therefore,
admission control would provide predictable and uniform latencies.
Partitioning can be used to limit the number of tree components. We
have argued that allocating two unpartitioned on-disk components is adequate for
\rowss target applications.
Reusing existing B-tree implementations as
the underlying storage mechanism for LSM-trees has been proposed~\cite{cidrPartitionedBTree}. Many
standard B-tree optimizations, such as prefix compression and bulk insertion,
would benefit LSM-tree implementations. However, \rowss custom bulk-loaded tree
implementation benefits compression. Unlike B-tree compression, \rowss
compression does not need to support
efficient, in-place updates of tree nodes.
Recent work optimizes B-trees for write intensive workloads by dynamically
relocating regions of B-trees during
writes~\cite{bTreeHighUpdateRates}. This reduces index fragmentation
but still relies upon random I/O in the worst case. In contrast,
LSM-trees never use disk seeks to service write requests and produce
perfectly laid out B-trees.
Online B-tree merging~\cite{onlineMerging} is closely related to
LSM-trees' merge process. B-tree merging
addresses situations where the contents of a single table index have
been split across two physical B-trees that now need to be reconciled.
This situation arises, for example, during rebalancing of partitions
within a cluster of database machines.
One B-tree merging approach lazily piggybacks merge
operations on top of tree access requests. To service an index
probe or range scan, the system must read leaf nodes from both B-trees.
Rather than simply evicting the pages from cache, this approach merges
the portion of the tree that has already been brought into
memory.
LSM-trees can service delayed
LSM-tree index scans without performing additional I/O. Queries that request table scans wait for
the merge processes to make a pass over the index.
By combining this idea with lazy merging an LSM-tree implementation
could service
range scans immediately without significantly increasing the amount of
I/O performed by the system.
\subsection{Row-based database compression}
Row-oriented database compression techniques compress each tuple
individually and sometimes ignore similarities between adjacent
tuples. One such approach compresses low cardinality data by building
a table-wide mapping between short identifier codes and longer string
values. The mapping table is stored in memory for convenient
compression and decompression. Other approaches include NULL
suppression, which stores runs of NULL values as a single count and
leading zero suppression which stores integers in a variable length
format that does not store zeros before the first non-zero digit of each
number. Row oriented compression schemes typically provide efficient random access to
tuples, often by explicitly storing tuple offsets at the head of each page.
Another approach is to compress page data using a generic compression
algorithm, such as gzip. The primary drawback of this approach is
that the size of the compressed page is not known until after
compression. Also, general purpose compression techniques typically
do not provide random access within pages and are often more processor
intensive than specialized database compression
techniques~\cite{rowImplementationPerf}.
\subsection{Column-oriented database compression}
Some column compression techniques are based on the observation that sorted
columns of data are often easier to compress than sorted tuples. Each
column contains a single data type, and sorting decreases the
cardinality and range of data stored on each page. This increases the
effectiveness of simple, special purpose, compression schemes.
PFOR was introduced as an extension to
MonetDB~\cite{pfor}, a column-oriented database, along with two other
formats. PFOR-DELTA is similar to PFOR, but stores differences between values as
deltas. PDICT encodes columns as keys and a dictionary that
maps to the original values. We plan to add both these formats to
\rows in the future. We chose to implement RLE and PFOR because they
provide high compression and decompression bandwidth. Like MonetDB,
each \rows table is supported by custom-generated code.
C-Store, another column oriented database, has relational operators
that have been optimized to work directly on compressed
data~\cite{compExec}. For example, when joining two run length encoded
columns, it is unnecessary to explicitly represent each row during the
join. This optimization would be useful in \rows, as its
merge processes perform repeated joins over compressed data.
Search engines make use of similar techniques and apply column compression to conserve
disk and bus bandwidth. Updates are performed by storing the index in
partitions and replacing entire partitions at a
time. Partitions are rebuilt offline~\cite{searchengine}.
A recent paper~\cite{bitsForChronos} provides a survey of database compression techniques
and characterizes the interaction between compression algorithms,
processing power and memory bus bandwidth. The formats within their
classification scheme either split tuples across pages or group
information from the same tuple in the same portion of the
page.
\rows, which does not split tuples across pages, takes a different
approach and stores each column separately within a page. Our
column-oriented page layouts incur different types of per-page overhead and
have fundamentally different processor
cache behaviors and instruction-level parallelism properties than the
schemes they consider.
In addition to supporting compression, column databases typically
optimize for queries that project away columns during processing.
They do this by precomputing the projection and potentially resorting
and recompressing the data. This reduces the size of the uncompressed
data and can improve compression ratios, reducing the amount of I/O
performed by the query. \rows can support these optimizations by
computing the projection of the data during replication. This
provides \rows replicas optimized for a set of queries.
Unlike column-oriented databases, \rows provides
for high throughput, low-latency, in-place updates and tuple lookups comparable to row-oriented storage.
%However, many column storage techniques are applicable to \rows. Any
%column index that supports efficient bulk-loading, provides scans over data in an order appropriate for bulk-loading and can be emulated by an
%updatable data structure can be implemented within
%\rows. This allows us to apply other types of index structures
%to real-time replication scenarios.
%This property does not come without cost; compared to a column
%store, \rows must merge replicated data more often, achieves lower
%compression ratios, and performs index lookups that are roughly twice
%as expensive as a B-tree lookup.
\subsection{Snapshot consistency}
\rows relies upon the correctness of the master database's concurrency
control algorithms to provide snapshot consistency to queries. \rows
is compatible with most popular approaches to concurrency control in
OLTP environments, including two-phase locking, optimistic concurrency
control and multiversion concurrency control.
\rows only provides read-only queries. Therefore, its concurrency
control algorithms need only address read-write conflicts.
Well-understood techniques protect against such conflicts
without causing requests to block, deadlock or
livelock~\cite{concurrencyControl}.
%% \subsection{Log shipping}
%% Log shipping mechanisms are largely outside the scope of this paper;
%% any protocol that provides \rows replicas with up-to-date, intact
%% copies of the replication log will do. Depending on the desired level
%% of durability, a commit protocol could be used to ensure that the
%% \rows replica receives updates before the master commits.
%% \rows is already bound by sequential I/O throughput, and because the
%% replication log might not be appropriate for database recovery, large
%% deployments would probably opt to store recovery logs on machines
%% that are not used for replication.
\section{Conclusion}
Compressed LSM-trees are practical on modern hardware. Hardware trends such as increased memory size and sequential disk bandwidth will further improve \rowss performance. In addition
to developing new compression formats optimized for LSM-trees, we presented a new approach to
database replication that leverages the strengths of LSM-trees by
avoiding index probing during updates. We also introduced the idea of
using snapshot consistency to provide concurrency control for
LSM-trees.
%% Our prototype's LSM-tree recovery mechanism is extremely
%% straightforward, and makes use of a simple latching mechanism to
%% maintain our LSM-trees' consistency. It can easily be extended to
%% more sophisticated LSM-tree implementations that perform incremental
%% tree merging.
Our implementation is a first cut at a working version of \rows.
Our prototype's random read performance compares favorably to that of a production-quality B-tree, and we have
bounded the increases in \rowss replication throughput that could be obtained without improving compression ratios.
By avoiding disk seeks for all operations except
random index probes, uncompressed LSM-trees can provide orders of magnitude more write throughput than
B-trees. With real-world
database compression ratios ranging from 5-20x, \rows
can outperform B-tree based database replicas by an
additional factor of ten.
% We implemented \rows to address scalability issues faced by large
%scale database installations. \rows addresses
%applications that perform real-time analytical and decision
%support queries over large, frequently updated data sets.
%Such applications are becoming increasingly common.
\section{Availability}
\label{sec:avail}
\rowss source code and the tools used to generate our TPC-C/H workload
are available at:
{\tt http://www.cs.berkeley.edu/$\sim$sears/stasis}
\section{Acknowledgements}
We would like to thank Petros Maniatis, Tyson Condie and the
anonymous reviewers for their feedback. Portions of this work were
performed at Intel Research, Berkeley.
\bibliographystyle{abbrv}
\bibliography{rose} % sigproc.bib is the name of the Bibliography in this case
% You must have a proper ".bib" file
% and remember to run:
% latex bibtex latex latex
% to resolve all references
%
% ACM needs 'a single self-contained file'!
%
\balancecolumns % GM July 2000
% That's all folks!
\end{document}