stasis-aries-wal/doc/rosePaper/rose.tex

1071 lines
52 KiB
TeX

% This is "sig-alternate.tex" V1.3 OCTOBER 2002
% This file should be compiled with V1.6 of "sig-alternate.cls" OCTOBER 2002
%
% This example file demonstrates the use of the 'sig-alternate.cls'
% V1.6 LaTeX2e document class file. It is for those submitting
% articles to ACM Conference Proceedings WHO DO NOT WISH TO
% STRICTLY ADHERE TO THE SIGS (PUBS-BOARD-ENDORSED) STYLE.
% The 'sig-alternate.cls' file will produce a similar-looking,
% albeit, 'tighter' paper resulting in, invariably, fewer pages.
%
% ----------------------------------------------------------------------------------------------------------------
% This .tex file (and associated .cls V1.6) produces:
% 1) The Permission Statement
% 2) The Conference (location) Info information
% 3) The Copyright Line with ACM data
% 4) Page numbers
%
% as against the acm_proc_article-sp.cls file which
% DOES NOT produce 1) thru' 3) above.
%
% Using 'sig-alternate.cls' you have control, however, from within
% the source .tex file, over both the CopyrightYear
% (defaulted to 2002) and the ACM Copyright Data
% (defaulted to X-XXXXX-XX-X/XX/XX).
% e.g.
% \CopyrightYear{2003} will cause 2002 to appear in the copyright line.
% \crdata{0-12345-67-8/90/12} will cause 0-12345-67-8/90/12 to appear in the copyright line.
%
% ---------------------------------------------------------------------------------------------------------------
% This .tex source is an example which *does* use
% the .bib file (from which the .bbl file % is produced).
% REMEMBER HOWEVER: After having produced the .bbl file,
% and prior to final submission, you *NEED* to 'insert'
% your .bbl file into your source .tex file so as to provide
% ONE 'self-contained' source file.
%
% ================= IF YOU HAVE QUESTIONS =======================
% Questions regarding the SIGS styles, SIGS policies and
% procedures, Conferences etc. should be sent to
% Adrienne Griscti (griscti@acm.org)
%
% Technical questions _only_ to
% Gerald Murray (murray@acm.org)
% ===============================================================
%
% For tracking purposes - this is V1.3 - OCTOBER 2002
\documentclass{sig-alternate-sigmod08}
\usepackage{xspace,color}
\newcommand{\rows}{Rows\xspace}
\newcommand{\rowss}{Rows'\xspace}
\begin{document}
%
% --- Author Metadata here ---
\conferenceinfo{ACM SIGMOD}{'08 Vancouver, BC, Canada}
%\CopyrightYear{2001} % Allows default copyright year (2000) to be over-ridden - IF NEED BE.
%\crdata{0-12345-67-8/90/01} % Allows default copyright data (0-89791-88-6/97/05) to be over-ridden - IF NEED BE.
% --- End of Author Metadata ---
\title{{\ttlit \rows}: Compressed, log-structured replication}
%
% You need the command \numberofauthors to handle the "boxing"
% and alignment of the authors under the title, and to add
% a section for authors number 4 through n.
%
\numberofauthors{3}
\author{}
%\author{Russell Sears \and Mark Callaghan \and Eric Brewer}
\maketitle
\begin{abstract}
This paper describes \rows\footnote{[Clever acronym here]}, a database
storage engine designed for high-throughput replication. It targets
applications with write-intensive (seek limited) transaction
processing workloads and near-realtime decision support and analytical
processing queries. \rows uses {\em log structured merge} (LSM) trees
to create full database replicas using purely sequential I/O. It
provides access to inconsistent data in real-time and consistent data
with a few seconds delay. \rows was written to support micropayment
transactions. Here, we apply it to archival of weather data.
A \rows replica serves two purposes. First, by avoiding seeks, \rows
reduces the load on the replicas' disks, leaving surplus I/O capacity
for read-only queries and allowing inexpensive hardware to handle
workloads produced by database machines with tens of disks. This
allows decision support and OLAP queries to scale linearly with the
number of machines, regardless of lock contention and other
bottlenecks associated with distributed transactions. Second, \rows
replica groups provide highly available copies of the database. In
Internet-scale environments, decision support queries may be more
important than update availability.
%\rows targets seek-limited update-in-place OLTP applications, and uses
%a {\em log structured merge} (LSM) tree to trade disk bandwidth for
%seeks. LSM-trees translate random updates into sequential scans and
%bulk loads. Their performance is limited by the sequential I/O
%bandwidth required by a vacuumer analogous to merges in
%sort-merge join. \rows uses column compression to reduce this
%bottleneck.
\rowss throughput is limited by sequential I/O bandwidth. We use
column compression to reduce this bottleneck. Rather than reassemble
rows from a column-oriented disk layout, we adapt existing column
compression algorithms to a new row-oriented data layout. This
introduces negligible space overhead and can be applied to most
single-pass, randomly accessible compression formats. Our prototype
uses lightweight (superscalar) column compression algorithms.
Existing analytical models and our experiments show that, for disk
bound workloads, \rows provides significantly higher replication
throughput than conventional replication techniques. Finally, we
introduce an easily measured metric that predicts replication
performance of \rows implementations in a variety of deployment
scenarios.
\end{abstract}
%% SIGMOD DOESN'T USE THESE
% A category with the (minimum) three required fields
%\category{H.4}{Information Systems Applications}{Miscellaneous}
%A category including the fourth, optional field follows...
%\category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures]
%\terms{Delphi theory}
%\keywords{ACM proceedings, \LaTeX, text tagging}
\section{Introduction}
\rows is a database replication engine for workloads with high volumes
of in-place updates. Traditional database updates are difficult to
scale beyond a certain size. Once data exceeds the size of RAM, any
attempt to update data in place is severely limited by the cost of
drive head seeks. This can be addressed by adding more drives, which
increases cost and decreases reliability. Alternatively, the database
can be run on a cluster of machines, providing improved scalability at
great expense.
These problems lead large-scale database installations to partition
their workloads across multiple servers, allowing linear scalability,
but sacrificing consistency between data stored on different
partitions. Fortunately, updates often deal with well-defined subsets
of the data; with an appropriate partitioning scheme, one can achieve
linear scalability for localized updates.
The cost of partitioning is that no single coherent version of the
data exists; queries that rely on disparate portions of the database
must either run multiple queries, or, if they are too expensive to run
on master database instances, are delegated to data warehousing
systems.
Although \rows cannot process SQL update queries directly, it is able
to replicate conventional database instances at a fraction of the cost
of the master database server. Like a data warehousing solution, this
decreases the cost of large, read-only OLAP and decision support
queries. \rows does this without introducing significant replication
latency.
Therefore, we think of \rows as a compromise between data warehousing
solutions (which provide extremely efficient access to data after a
significant delay), and database replication systems (which cost
nearly as much as the database instances they replicate). The low cost
of \rows replicas also allows database administrators to replicate
multiple master databases on the same machine, simplifying queries
that span partitions.
\subsection{System design}
A \rows replica takes a {\em replication log} as input. The
replication log should record each transaction begin, commit, and
abort performed by the master database, along with the pre and post
images associated with each tuple update. The ordering of these
entries should match the order in which they are applied at the
database master.
Upon receiving a log entry, \rows applies it to an in-memory tree, and
the update is immediately reflected by inconsistent reads. \rows
provides snapshot consistency to readers that require transactional
isolation. It does so in a lock-free manner; transactions' reads and
writes are not tracked, and no \rows transaction can ever force
another to block or abort. When given appropriate workloads, \rows
provides extremely low-latency replication. Transactionally
consistent data becomes available after a delay on the order of the
duration of a few update transactions.
To prevent the in-memory tree from growing without bound, a merge
process iterates over the (sorted) tree entries, and merges them with
existing on-disk tree entries. As the database increases in size, the
on disk tree grows, forcing \rows to compare an ever-increasing number
of existing tuples with each new tuple. To mitigate this effect, the
on-disk tree is periodically emptied by merging it with a second,
larger tree.
In order to look up a tuple stored in \rows, a query must examine all
three trees, typically starting with the in-memory (fastest, and most
up-to-date) component, and then moving on to progressively larger and
out-of-date trees. In order to perform a range scan, the query can
either iterate over the trees manually, or wait until the next round
of merging occurs, and apply the scan to tuples as the mergers examine
them. By waiting until the tuples are due to be merged, the
range-scan can occur with zero I/O cost, at the expense of significant
delay.
Merge throughput is bounded by sequential I/O bandwidth, and index
probe performance is limited by the amount of available memory. \rows
uses compression to trade surplus computational power for scarce
storage resources.
XXX provide a quick paper outline here. 1-2 paragraphs describing the flow of the paper.
\section{Related Work}
XXX This section isn't really written. Instead, it just contains
notes about what it should contain.
\subsection{Row-based schemes, MySQL page compression}
MySQL compression: Guess compression ratio. If fits, great. If not,
split b-tree page. Based on conventional compression algs. Lots of
database compression literature. Generally accepted that compression
increases throughput when memory / bandwidth savings is more important than
CPU overhead.
\subsection{Column-based compression}
There are a few column oriented compression papers to cite. Gather up
the references. Biggest distinction: We're near real-time regardless
up update access patterns. Prior perf studies focus on compression
kernel throughput. At least for \rows, the results don't apply, as
other operations now dominate. (Kernels for merges, etc could help.)
\subsection{Snapshot consistency}
cite '81 survey; two broad approaches: locking, timestamp / mvcc. The
later makes serialized order explicit. (Useful for replication
systems)
\subsection{Replication techniques and log shipping}
Large body of work on replication techniques and topologies. Largely
orthogonal to \rows. We place two restrictions on the master. First,
it must ship writes (before and after images) for each update, not
queries, as some DB's do. \rows must be given, or be able to infer
the order in which each transaction's data should be committed to the
database, and such an order must exist. If there is no such order,
and the offending transactions are from different, \rows will commit
their writes to the database in a different order than the master.
\section{\rowss I/O performance}
As far as we know, \rows is the first LSM-tree implementation. This
section provides an overview of LSM-trees, and steps through a rough
analysis of LSM-tree performance on current hardware (we refer the
reader to the original LSM work for a thorough analytical
discussion of LSM performance). We defer discussion of the CPU
overhead of compression to later sections, and simply account for I/O
and memory bottlenecks in this section.
\subsection{Tree merging}
% XXX mention Sql server `bulk loaded' mode.
% figures?
% xxx am I counting right when I say three level lsm trees?
An LSM-tree consists of a number of underlying trees. For simplicity,
this paper considers three component LSM-trees. Component zero ($C0$)
is an in-memory binary search tree. Components one and two ($C1$,
$C2$) are read-only, bulk-loaded B-trees. Only $C0$ is updated in
place. Each update is handled in three stages. In the first stage,
the update is applied to the in-memory tree. Next, once enough
updates have been applied, the tuple is merged with existing tuples in
$C1$. The merge process performs a sequential scan over the in-memory
tree and $C1$, producing a new version of $C1$.
Conceptually, when the merge is complete, $C1$ is atomically replaced
with the new tree, and $C0$ is atomically replaced with an empty tree.
The process is then eventually repeated when $C1$ and $C2$ are merged.
At that point, the insertion will not cause any more I/O operations.
Although our prototype replaces entire trees at once, this approach
introduces a number of performance problems. The original LSM work
proposes a more sophisticated scheme that addresses some of these
issues. Instead of replacing entire trees at once, it replaces one
subtree at a time. This reduces peak storage and memory requirements.
Atomic replacement of portions of an LSM-tree would cause ongoing
merges to block insertions, and force the mergers to run in lock step.
(This is the ``crossing iterator'' problem mentioned in the LSM
paper.) We address this issue by allowing data to be inserted into
the new version of the smaller component before the merge completes.
This forces \rows to check both versions of components $C0$ and $C1$
in order to look up each tuple, but it addresses the crossing iterator
problem without resorting to fine-grained concurrency control.
Applying this approach to subtrees reduces the impact of these extra
probes, which could be filtered with a range comparison in the common
case.
In a populated LSM-tree $C2$ is the largest component, and $C0$ is the
smallest component. The original LSM-tree work proves that throughput
is maximized when the ratio of the sizes of $C1$ to $C0$ is equal to
the ratio between $C2$ and $C1$. They call this ratio $R$. Note that
(on average in a steady state) for every $C0$ tuple consumed by a
merge, $R$ tuples from $C1$ must be examined. Similarly, each time a
tuple in $C1$ is consumed, $R$ tuples from $C2$ are examined.
Therefore, in a steady state, insertion throughput cannot exceed $R *
cost_{read~and~write~C2}$, or $R * cost_{read~and~write~C1}$. Note
that the total size of the tree is approximately $R^2$ (neglecting the
data stored in $C0$ and $C1$)\footnote{The proof that keeping R
constant across our three tree components follows from the fact that
the mergers compete for I/O bandwidth and $x(1-x)$ is maximized when
$x=0.5$. The LSM-tree paper proves the general case.}.
\subsection{Replication Throughput}
LSM-trees have different asymptotic performance characteristics than
conventional index structures. In particular, the amortized cost of
insertion is $O(\sqrt{n})$ in the size of the data. This cost is
$O(log~n)$ for a B-tree. The relative costs of sequential
and random I/O determine whether or not \rows is able to outperform
B-trees in practice.
Starting with the (more familiar) B-tree case, in the steady state, we
can expect each index update to perform two random disk accesses (one
evicts a page, the other reads a page):
\[
cost_{Btree~update}=2~cost_{random~io}
\]
(We assume that the upper levels of the B-tree are memory resident.) If
we assume uniform access patterns, 4 KB pages and 100 byte tuples,
this means that an uncompressed B-tree would keep $\sim2.5\%$ of the
tuples in memory. Prefix compression and a skewed update distribution
would improve the situation significantly, but are not considered
here. Without a skewed update distribution, batching I/O into
sequential writes only helps if a significant fraction of the tree's
data fits in RAM.
In \rows, we have:
\[
cost_{LSMtree~update}=2*2*2*R*\frac{cost_{sequential~io}}{compression~ratio} %% not S + sqrt S; just 2 sqrt S.
\]
where $R$ is the ratio of adjacent tree component sizes
($R^2=\frac{|tree|}{|mem|}$). We multiply by $2R$ because each new
tuple is eventually merged into both of the larger components, and
each merge involves $R$ comparisons with existing tuples on average.
An update of a tuple is handled as a deletion of the old tuple (an
insertion of a tombstone), and an insertion of the new tuple, leading
to a second factor of two. The third reflects the fact that the
merger must read existing tuples into memory before writing them back
to disk.
The $compression~ratio$ is
$\frac{uncompressed~size}{compressed~size}$. For simplicity, we
assume that the compression ratio is the same throughout each
component of the LSM-tree; \rows addresses this at run time by
dynamically adjusting component sizes\footnote{todo implement this}.
Our test hardware's hard drive is a 7200RPM, 750 GB Seagate Barracuda
ES.
%has a manufacturer-reported average rotational latency of
%$4.16~msec$, seek times of $8.5/9.5~msec$ (read/write), and a maximum
%sustained throughput of $78~MB/s$.
Third party
benchmarks\footnote{http://www.storagereview.com/ST3750640NS.sr}
report random access times of $12.3/13.5~msec$ and $44.3-78.5~MB/s$
sustained throughput. Timing {\tt dd if=/dev/zero of=file; sync} on an
empty ext3 file system suggests our test hardware provides $57.5MB/s$ of
storage bandwidth.
%We used two hard drives for our tests, a smaller, high performance drive with an average seek time of $9.3~ms$, a
%rotational latency of $4.2~ms$, and a manufacturer reported raw
%throughput of $150~mb/s$. Our buffer manager achieves $\sim 27~mb/s$
%throughput; {\tt dd if=/dev/zero of=file} writes at $\sim 30.5~mb/s$.
Assuming a fixed hardware configuration, and measuring cost in disk
time, we have:
%\[
% cost_{sequential~io}=\frac{|tuple|}{30.5*1024^2}=0.000031268~msec
%\]
%% 12.738854
\[
cost_{sequential}=\frac{|tuple|}{78.5MB/s}=12.7~|tuple|~~nsec/tuple~(min)
\]
%% 22.573363
\[
cost_{sequential}=\frac{|tuple|}{44.3MB/s}=22.6~|tuple|~~nsec/tuple~(max)
\]
and
\[
cost_{random}=\frac{12.3+13.5}{2} = 12.9~msec/tuple
\]
Pessimistically setting
\[
2~cost_{random}\approx1,000,000\frac{cost_{sequential}}{|tuple|}
\] yields: \[
\frac{cost_{LSMtree~update}}{cost_{Btree~update}}=\frac{2*2*2*R*cost_{sequential}}{compression~ratio*2*cost_{random}}
% \frac{cost_{LSMtree~update}}{cost_{Btree~update}} \approx \frac{(S + \sqrt{S})}{|tuple|~compression~ratio~250,000}
\]
\[
\approx\frac{R*|tuple|}{250,000*compression~ratio}
\]
If tuples are 100 bytes and we assume a compression ratio of 4 (lower
than we expect to see in practice, but numerically convenient), the
LSM-tree outperforms the B-tree when:
\[
R < \frac{250,000*compression~ratio}{|tuple|}
\]
\[
R < 10,000
\]
%750 gb throughput = 1 / (((8 * 27 * 22.6 * 100) / 4) * (ns)) = 8.00198705 khz
% 1 / (((8 * 2.73 * 100 * 22.6) / 4) * (ns))
on a machine that can store 1 GB in an in-memory tree, this yields a
maximum ``interesting'' tree size of $R^2*1GB = $ 100 petabytes, well
above the actual drive capacity of $750~GB$. A $750~GB$ tree would
have a $C2$ component 750 times larger than the 1GB $C0$ component.
Therefore, it would have an R of $\sqrt{750}\approx27$; we would
expect such a tree to have a sustained insertion throughput of
approximately 8000 tuples / second, or 800 kbyte/sec\footnote{It would
take 11 days to overwrite every tuple on the drive in random
order.}; two orders of magnitude above the 83 I/O operations that
the drive can deliver per second, and well above the 41.5 tuples / sec
we would expect from a B-tree with a $18.5~GB$ buffer pool.
Increasing \rowss system memory to cache 10 GB of tuples would
increase write performance by a factor of $\sqrt{10}$.
% 41.5/(1-80/750) = 46.4552239
Increasing memory another ten fold to 100GB would yield an LSM-tree
with an R of $\sqrt{750/100} = 2.73$ and a throughput of 81,000
tuples/sec. In contrast, the B-tree could cache roughly 80GB of leaf pages
in memory, and write approximately $\frac{41.5}{(1-(80/750)} = 46.5$
tuples/sec. Increasing memory further yields a system that
is no longer disk bound.
Assuming that the system CPUs are fast enough to allow \rows
compression and merge routines to keep up with the bandwidth supplied
by the disks, we conclude that \rows will provide significantly higher
replication throughput for disk bound applications.
\subsection{Indexing}
Our analysis ignores the cost of allocating and initializing our
LSM-trees' internal nodes. The compressed data constitutes the leaf
pages of the tree. Each time the compression process fills a page, it
inserts an entry into the leftmost entry in the tree, allocating
additional nodes if necessary. Our prototype does not compress
internal tree nodes\footnote{This is a limitation of our prototype;
not our approach. Internal tree nodes are append-only and, at the
very least, the page id data is amenable to compression. Like B-tree
compression, this would decrease the memory used by index probes.},
so it writes one tuple into the tree's internal nodes per compressed
page. \rows inherits a default page size of 4KB from the transaction
system we based it upon. Although 4KB is fairly small by modern
standards, \rows is not particularly sensitive to page size; even with
4KB pages, \rowss per-page overheads are negligible. Assuming tuples
are 100 bytes, $\sim\frac{1}{40}$th of our pages are dedicated to the
lowest level of tree nodes, with $\frac{1}{40}$th that number devoted
to the next highest level, and so on. See
Table~\ref{table:treeCreation} for a comparison of compression
performance with and without tree creation enabled. The data was
generated by applying \rowss compressors to randomly generated five
column, 1,000,000 row tables. Across five runs, RLE's page count had
a standard deviation of $\sigma=2.35$; the other values had
$\sigma=0$.
(XXX the tables should match the text. Measure 100 byte tuples?)
%Throughput's $\sigma<6MB/s$.
\begin{table}
\caption{Tree creation overhead - five column (20 bytes/column)}
\centering
\label{table:treeCreation}
\begin{tabular}{|l|c|c|c|} \hline
Format & Compression & Page count \\ \hline %& Throughput\\ \hline
FOR & 1.96x & 2494 \\ \hline %& 133.4 MB/s \\ \hline
FOR + tree & 1.94x & +80 \\ \hline %& 129.8 MB/s \\ \hline
RLE & 3.24x & 1505 \\ \hline %& 150.6 MB/s \\ \hline
RLE + tree & 3.22x & +21 \\ %& 148.4 MB/s \\
\hline\end{tabular}
\end{table}
\begin{table}
\caption{Tree creation overhead - 100 columns (400 bytes/column)}
\centering
\label{table:treeCreationTwo}
\begin{tabular}{|l|c|c|c|} \hline
Format & Compression & Page count \\ \hline %& Throughput\\ \hline
FOR & 1.37x & 7143 \\ \hline %& 133.4 MB/s \\ \hline
FOR + tree & 1.17x & 8335 \\ \hline %& 129.8 MB/s \\ \hline
RLE & 1.75x & 5591 \\ \hline %& 150.6 MB/s \\ \hline
RLE + tree & 1.50x & 6525 \\ %& 148.4 MB/s \\
\hline\end{tabular}
\end{table}
As the size of the tuples increases, as in
Table~\ref{table:treeCreationTwo} ($N=5$; $\sigma < 7.26$ pages), the
number of compressed pages that each internal tree node points to
decreases, increasing the overhead of tree creation. In such
circumstances, internal tree node compression and larger pages should
improve the situation.
\subsection{Isolation}
\rows groups replicated transactions into snapshots. Each transaction
is assigned to a snapshot according to a timestamp; two snapshots are
active at any given time. \rows assigns incoming transactions to the
newer of the two active snapshots. Once all transactions in the older
snapshot have completed, that snapshot is marked inactive, exposing
its contents to new queries that request a consistent view of the
data. At this point a new active snapshot is created, and the process
continues.
The timestamp is simply the snapshot number. In the case of a tie
during merging (such as two tuples with the same primary key and
timestamp), the version from the newest (lower numbered) component is
taken.
This ensures that, within each snapshot, \rows applies all updates in the
same order as the primary database. Across snapshots, concurrent
transactions (which can write non-conflicting tuples in arbitrary
orders) lead to reordering of updates. However, these updates are
guaranteed to be applied in transaction order. The correctness of
this scheme hinges on the correctness of the timestamps applied to
each transaction.
If the master database provides snapshot isolation using multiversion
concurrency control (as is becoming increasingly popular), we can
simply reuse the timestamp it applies to each transaction. If the
master uses two phase locking, the situation becomes more complex, as
we have to use the commit time of each transaction\footnote{This works
if all transactions use transaction-duration write locks, and lock
release and commit occur atomically. Transactions that obtain short
write locks can be treated as a set of single action transactions.}.
Until the commit time is known, \rows stores the transaction id in the
LSM-tree. As transactions are committed, it records the mapping from
transaction id to snapshot. Eventually, the merger translates
transaction id's to snapshots, preventing the mapping from growing
without bound.
New snapshots are created in two steps. First, all transactions in
epoch $t-1$ must complete (commit or abort) so that they are
guaranteed to never apply updates to the database again. In the
second step, \rowss current snapshot number is incremented, and new
read-only transactions are assigned to snapshot $t-1$. Each such
transaction is granted a shared lock on the existence of the snapshot,
protecting that version of the database from garbage collection. In
order to ensure that new snapshots are created in a timely and
predictable fashion, the time between them should acceptably short,
but still slightly longer than the longest running transaction.
\subsubsection{Isolation performance impact}
Although \rowss isolation mechanisms never block the execution of
index operations, their performance degrades in the presence of long
running transactions. Long running updates block the creation of new
snapshots. Ideally, upon encountering such a transaction, \rows
simply asks the master database to abort the offending update. It
then waits until appropriate rollback (or perhaps commit) entries
appear in the replication log, and creates the new snapshot. While
waiting for the transactions to complete, \rows continues to process
replication requests by extending snapshot $t$.
Of course, proactively aborting long running updates is simply an
optimization. Without a surly database administrator to defend it
against application developers, \rows does not send abort requests,
but otherwise behaves identically. Read-only queries that are
interested in transactional consistency continue to read from (the
increasingly stale) snapshot $t-2$ until $t-1$'s long running
updates commit.
Long running queries present a different set of challenges to \rows.
Although \rows provides fairly efficient time-travel support,
versioning databases are not our target application. \rows
provides each new read-only query with guaranteed access to a
consistent version of the database. Therefore, long-running queries
force \rows to keep old versions of overwritten tuples around until
the query completes. These tuples increase the size of \rowss
LSM-trees, increasing merge overhead. If the space consumed by old
versions of the data is a serious issue, long running queries should
be disallowed. Alternatively, historical, or long-running queries
could be run against certain snapshots (every 1000th, or the first
one of the day, for example), reducing the overhead of preserving
old versions of frequently updated data.
\subsubsection{Merging and Garbage collection}
\rows merges components by iterating over them in order, removing
obsolete and duplicate tuples and writing the rest into a new version
of the largest component. In order to determine whether or not a
tuple is obsolete, \rows compares the tuple's timestamp with any
matching tombstones (or record creations, if the tuple is a
tombstone), and with any tuples that match on primary key. Upon
encountering such candidates for deletion, \rows compares their
timestamps with the set of locked snapshots. If there are no
snapshots between the tuple being examined and the updated version,
then the tuple can be deleted. Tombstone tuples can also be deleted once
they reach $C2$ and any older matching tuples have been removed.
Once \rows completes its scan over existing components (and registers
new ones in their places), it frees the regions of pages that stored
the components.
\subsection{Parallelism}
\rows provides ample opportunities for parallelism. All of its
operations are lock-free; concurrent readers and writers work
independently, avoiding blocking, deadlock and livelock. Index probes
must latch $C0$ in order to perform a lookup, but the more costly
probes into $C1$ and $C2$ are against read-only trees; beyond locating
and pinning tree components against deallocation, probes of these
components do not interact with the merge processes.
Our prototype exploits replication's piplelined parallelism by running
each component's merge process in a separate thread. In practice,
this allows our prototype to exploit two to three processor cores
during replication.[XXX need experimental evidence...] During bulk
load, the buffer manager, which uses Linux's {\tt sync\_file\_range}
function allows \rows to asynchronously force regions [XXX currently,
we do a synchronous force with sync\_file\_range....] of the page
file to disk. \rows has control over region size; if the system is
CPU bound \rows can ensure that the time spent waiting for synchronous
page writes is negligible, by choosing an appropriate region size. On
the other hand, if the system is disk bound, the same asynchronous
force mechanism ensures that \rows overlaps computation with I/O. [XXX
this is probably too detailed; we should just say that \rows uses
standard techniques to overlap computation and I/O]
Remaining cores can be used by queries, or (as hardware designers
increase the number of processor cores per package) by using data
parallelism to split each merge across multiple threads. Therefore,
given ample storage bandwidth, we expect the throughput of \rows
replication to increase with Moore's law for the foreseeable future.
\subsection{Recovery}
Like other log structured storage systems, \rowss recovery process is
inexpensive and straightforward. However, \rows does not attempt to
ensure that transactions are atomically committed to disk, and is not
meant to supplement or replace the master database's recovery log.
Instead, recovery occurs in two steps. Whenever \rows writes a tree
component to disk, it does so by beginning a new transaction in the
transaction manager that \rows is based upon. Next, it allocates
contiguous regions of storage space (generating one log entry per
region), and performs a B-tree style bulk load of the new tree into
these regions (this bulk load does not produce any log entries).
Finally, \rows forces the tree's regions to disk, and writes the list
of regions used by the tree and the location of the tree's root to
normal (write ahead logged) records. Finally, it commits the
underlying transaction.
After the underlying transaction manager completes recovery, \rows
will have a set of intact and complete tree components. Space taken
up by partially written trees was allocated by an aborting
transaction, and has been reclaimed by the transaction manager's
recovery mechanism. After the underlying recovery mechanisms
complete, \rows reads the last committed timestamp from the LSM-tree
header, and begins playback of the replication log at the appropriate
position. Upon committing new components to disk, \rows allows the
appropriate portion of the replication log to be truncated.
\section{Row compression}
Disk heads are the primary storage bottleneck for most OLTP
environments, and disk capacity is of secondary concern. Therefore,
database compression is generally performed to improve system
performance, not capacity. In \rows, sequential I/O throughput is the
primary replication bottleneck; and is proportional to the compression
ratio. Furthermore, compression increases the effective size of the buffer
pool, which is the primary bottleneck for \rowss random index probes.
Although \rows targets row-oriented workloads, its compression
routines are based upon column-oriented techniques and rely on the
assumption that pages are indexed in an order that yields easily
compressible columns. \rowss compression formats are based on our
{\em multicolumn} compression format. In order to store data from
an $N$ column table, we divide the page into $N+1$ variable length
regions. $N$ of these regions each contain a compressed column in an
existing column-based compression format[XXX cite them]. The
remaining region contains ``exceptional'' column data (potentially
from more than one columns).
For example, a column might be encoded using the {\em frame of
reference} (FOR) algorithm, which stores a column of integers as a
single offset value and a list of deltas. When a value too different
from the offset to be encoded as a delta is encountered, an offset
into the exceptions region is stored. The resulting algorithm is
called {\em patched frame of reference} (PFOR) in the literature.
\rowss multicolumn pages extend this idea by allowing multiple columns
(each with its own compression algorithm) to coexist on each page.
This section discusses the computational and storage overhead of the
multicolumn compression approach.
\subsection{Multicolumn computational overhead}
\rows builds upon compression algorithms that are amenable to
superscalar optimization, and can achieve throughputs in excess of
1GB/s on current hardware. Although multicolumn support introduces
significant overhead, \rowss variants of these approaches run within
an order of magnitude of published speeds.
Additional computational overhead is introduced in two areas. First,
\rows compresses each column in a separate buffer, then uses {\tt
memcpy()} to gather this data into a single page buffer before
writing it to disk. This {\tt memcpy()} occurs once per page
allocation.
Second, we need a way to translate requests to write a tuple into
calls to appropriate page formats and compression implementations.
Unless we hardcode our \rows executable to support a predefined set of
page formats (and table schemas), this invokes an extra {\tt for} loop
(over the columns) whose body contains a {\tt switch} statement (in
order to choose between column compressors) to each tuple compression
request.
% explain how append works
\subsection{The {\tt \large append()} operation}
\rowss compressed pages provide an {\tt tupleAppend()} operation that
takes a tuple as input, and returns {\tt false} if the page does not have
room for the new tuple. {\tt tupleAppend()} consists of a dispatch
routine that calls {\tt append()} on each column in turn. Each
column's {\tt append()} routine secures storage space for the column
value, or returns {\tt false} if no space is available. {\tt append()} has the
following signature:
\begin{quote}
{\tt append(COL\_TYPE value, int* exception\_offset,
void* exceptions\_base, void* column\_base, int* freespace) }
\end{quote}
where {\tt value} is the value to be appended to the column, {\tt
exception\_offset} is a pointer to the first free byte in the
exceptions region, {\tt exceptions\_base} and {\tt column\_base} point
to (page sized) buffers used to store exceptions and column data as
the page is being written to. One copy of these buffers exist for
each page that \rows is actively writing to (one per disk-resident
LSM-tree component); they do not significantly increase \rowss memory
requirements. Finally, {\tt freespace} is a pointer to the number of
free bytes remaining on the page. The multicolumn format initializes
these values when the page is allocated.
As {\tt append()} implementations are called they update this data
accordingly. Initially, our multicolumn module
managed these values and the exception space. This led to
extra arithmetic operations and conditionals and did not
significantly simplify the code.
% contrast with prior work
Existing superscalar compression algorithms assume they have access to
a buffer of uncompressed data and that they are able to make multiple
passes over the data during compression. This allows them to remove
branches from loop bodies, improving compression throughput. We opted
to avoid this approach in \rows, as it would increase the complexity
of the {\tt append()} interface, and add a buffer to \rowss merge processes.
\subsection{Static code generation}
% discuss templatization of code
After evaluating the performance of a C implementation of \rowss
compression routines, we decided to rewrite the compression routines
as C++ templates. C++ template instantiation performs compile-time
macro substitutions. We declare all functions {\tt inline}, and place
them in header files (rather than separate compilation units). This
gives g++ the opportunity to perform optimizations such as
cross-module constant propagation and branch elimination. It also
allows us to write code that deals with integer data types instead of
void pointers without duplicating code or breaking encapsulation.
Such optimizations are possible in C, but, because of limitations of
the preprocessor, would be difficult to express or require separate
code-generation utilities. We found that this set of optimizations
improved compression and decompression performance by roughly an order
of magnitude. To illustrate this, Table~\ref{table:optimization}
compares compressor throughput with and without compiler optimizations
enabled. Although compressor throughput varies with data distributions
and type, optimizations yield a similar performance improvement across
varied datasets and random data distributions.
We performed one additional set of optimizations. Rather than
instantiate each compressor template once for each column type at
compile time, we instantiate a multicolumn page format template for
each page format we wish to support. This removes the {\tt for} loop
and {\tt switch} statement that supporting multiple columns per page
introduced, but hardcodes page schemas at compile time.
The two approaches could coexist in a single runtime environment,
allowing the use of hardcoded implementations for performance critical
tables, while falling back on slower, general purpose implementations
for previously unseen table layouts.
\subsection{Buffer manager interface extensions}
\rows uses a preexisting, conventional database buffer manager. Each
page contains an LSN (which is largely unused, as we bulk-load \rowss
trees) and a page implementation number. Pages are stored in a
hashtable keyed by page number, and replaced using an LRU
strategy\footnote{LRU is a particularly poor choice, given that \rowss
I/O is dominated by large table scans. Eventually, we hope to add
support for a DBMIN[xxx cite]-style page replacement policy.}.
In implementing \rows, we made use of a number of non-standard (but
generally useful) callbacks. The first, {\tt pageLoaded()}
instantiates a new multicolumn page implementation when the page is
first read into memory. The second, {\tt pageFlushed()} informs our
multicolumn implementation that the page is about to be written to
disk, and the third {\tt pageEvicted()} invokes the multicolumn
destructor. (XXX are these really non-standard?)
As we mentioned above, pages are split into a number of temporary
buffers while they are being written, and are then packed into a
contiguous buffer before being flushed. Although this operation is
expensive, it does present an opportunity for parallelism. \rows
provides a per-page operation, {\tt pack()} that performs the
translation. We can register {\tt pack()} as a {\tt pageFlushed()}
callback or we can explicitly call it during (or shortly after)
compression.
{\tt pageFlushed()} could be safely executed in a background thread
with minimal impact on system performance. However, the buffer
manager was written under the assumption that the cost of in-memory
operations is negligible. Therefore, it blocks all buffer management
requests while {\tt pageFlushed()} is being executed. In practice,
this causes multiple \rows threads to block on each {\tt pack()}.
Also, {\tt pack()} reduces \rowss memory utilization by freeing up
temporary compression buffers. Delaying its execution for too long
might cause this memory to be evicted from processor cache before the
{\tt memcpy()} can occur. For all these reasons, our merge processes
explicitly invoke {\tt pack()} as soon as possible.
{\tt pageLoaded()} and {\tt pageEvicted()} allow us to amortize page
sanity checks and header parsing across many requests to read from a
compressed page. When a page is loaded from disk, {\tt pageLoaded()}
associates the page with the appropriate optimized multicolumn
implementation (or with the slower generic multicolumn implementation,
if necessary), and then allocates and initializes a small amount of
metadata containing information about the number, types and positions
of columns on a page. Requests to read records or append data to the
page use this cached data rather than re-reading the information from
the page.
\subsection{Storage overhead}
The multicolumn page format is quite similar to the format of existing
column-wise compression formats. The algorithms we implemented have
page formats that can be (broadly speaking) divided into two sections.
The first section is a header that contains an encoding of the size of
the compressed region, and perhaps a piece of uncompressed exemplar
data (as in frame of reference compression). The second section
typically contains the compressed data.
A multicolumn page contains this information in addition to metadata
describing the position and type of each column. The type and number
of columns could be encoded in the ``page type'' field, or be
explicitly represented using a few bytes per page column. Allocating
16 bits for the page offset and 16 bits for the column type compressor
uses 4 bytes per column. Therefore, the additional overhead for an N
column page is
\[
(N-1) * (4 + |average~compression~format~header|)
\]
% XXX the first mention of RLE is here. It should come earlier.
bytes. A frame of reference column header consists of 2 bytes to
record the number of encoded rows and a single uncompressed
value. Run length encoding headers consist of a 2 byte count of
compressed blocks. Therefore, in the worst case (frame of reference
encoding 64-bit integers, and \rowss 4KB pages) our prototype's
multicolumn format uses $14/4096\approx0.35\%$ of the page to store
each column header. If the data does not compress well, and tuples
are large, additional storage may be wasted because \rows does not
split tuples across pages. Tables~\ref{table:treeCreation}
and~\ref{table:treeCreationTwo}, which draw column values from
independent, identical distributions, show that \rowss compression
ratio can be significantly impacted by large tuples.
% XXX graph of some sort to show this?
Breaking pages into smaller compressed blocks changes the compression
ratio in another way; the compressibility of the data varies with the
size of each compressed block. For example, when frame of reference
is applied to sorted data, incoming values eventually drift too far
from the page offset, causing them to be stored as exceptional values.
Therefore (neglecting header bytes), smaller frame of reference blocks
provide higher compression ratios.
Of course, conventional compression algorithms are free to divide
their compressed data into blocks to maximize compression ratios.
Although \rowss smaller compressed block size benefits some
compression implementations (and does not adversely impact either of
the algorithms we implemented), it creates an additional constraint,
and may interact poorly with some compression algorithms. [XXX saw paper that talks about this]
\subsection{Supporting Random Access}
The multicolumn page format is designed to allow efficient
row-oriented access to data. The efficiency of random access within a
page depends on the format used by individual compressors. \rows
compressors support two access methods. The first looks up a value by
slot id. This operation is $O(1)$ for frame of reference columns, and
$O(log~n)$ (in the number of runs of identical values on the page) for
run length encoded columns.
The second operation is used to look up tuples by value, and is based
on the assumption that the the tuples are stored in the page in sorted
order. It takes a range of slot ids and a value, and returns the
offset of the first and last instance of the value within the range.
This operation is $O(log~n)$ (in the number of values in the range)
for frame of reference columns, and $O(log~n)$ (in the number of runs
on the page) for run length encoded columns. The multicolumn
implementation uses this method to look up tuples by beginning with
the entire page in range, and calling each compressor's implementation
in order to narrow the search until the correct tuple(s) are located
or the range is empty. Note that partially-matching tuples are only
partially examined during the search, and that our binary searches
within a column should have better cache locality than searches of
row-oriented page layouts.
We have not examined the tradeoffs between different implementations
of tuple lookups. Currently, rather than using binary search to find
the boundaries of each range, our compressors simply iterate over the
compressed representation of the data in order to progressively narrow
the range of tuples to be considered. It is possible that (given
expensive branch mispredictions and \rowss small pages) that our
linear search implementation will outperform approaches based upon
binary search.
\section{Evaluation}
(XXX graphs go here)
\begin{figure}
\centering
\epsfig{file=MySQLthroughput.pdf, width=3.33in}
\caption{InnoDB insertion throughput (average over 100,000 tuple windows).}
\end{figure}
\begin{figure}
\centering
\epsfig{file=mysql-ms-tuple.pdf, width=3.33in}
\caption{InnoDB tuple insertion time (average over 100,000 tuple windows).}
\end{figure}
\subsection{The data set}
In order to evaluate \rowss performance, we used it to index
information reported by weather stations worldwide. We obtained the
data from\footnote{XXX National Severe Storms Laboratory Historical
Weather Data Archives, Norman, Oklahoma, from their Web site at
http://data.nssl.noaa.gov}. The data we used ranges from May 1,
2007 to Nov 2, 2007, and contains reaidngs from ground stations around
the world. This data is approximately $1.3GB$ when stored in an
uncompressed tab delimited file. We duplicated the data by changing
the date fields to cover ranges from 2001 to 2009, producing a 12GB
dataset.
Duplicating the data should have a limited effect on \rowss
compression ratios. Although we index on geographic position, placing
all readings from a particular station in a contiguous range, we then
index on date, seperating nearly identical tuples from each other.
\rows only supports integer data types. We encode the ASCII columns
in the data by packing each character into 5 bits (the strings only
contain the characters A-Z, +, -, and *). Floating point columns in
the raw data set are always represented with two digits of precision;
we multiply them by 100, yielding an integer. The datasource uses
nonsensical readings (such as -9999.00) to represent NULL. Our
prototype does not understand NULL, so we leave these fields intact.
We represent each column as a 32-bit integer (even when a 16-bit value
would do), except current weather condititons, which is packed into a
64-bit integer. Table~[XXX] lists the columns and compression
algorithms we assigned to each column.
\rows targets seek limited applications; we assign a (single) random
order to the tuples, and insert them in this order. We compare \rowss
performance with the MySQL InnoDB storage engine's bulk
loader\footnote{We also evaluated MySQL's MyISAM table format.
Predictably, performance degraded as the tree grew; ISAM indices do not
support node splits.}. This avoids the overhead of SQL insert
statements. To force InnoDB to update its B-tree index in place, we
break the dataset into 100,000 tuple chunks, and bulk load each one in
succession.
If we did not do this, MySQL would simply sort the tuples, and then
bulk load the index. This behavior is unacceptable in a low-latency
replication environment. Breaking the bulk load into multiple chunks
forces MySQL to make intermediate results available as the bulk load
proceeds\footnote{MySQL's {\tt concurrent} keyword allows access to
{\em existing} data during a bulk load; new data is still exposed
atomically.}.
XXX more information on mysql setup:
Discuss graphs (1) relative performance of rose and mysql (2) compression ratio / R over time (3) merge throughput?
\subsection{Merge throughput in practice}
XXX what purpose does this section serve?
RB <-> LSM tree merges contain different code and perform different
I/O than LSM <-> LSM mergers. The former must perform random memory
accesses, and performs less I/O. They run at different speeds. Their
relative speeds are unpredictable.
\subsection{Choosing R}
The original LSM tree work eexplains how to calculate $R$ for steady
state systems; essentially, the ratios between the sizes of adjacent
compoents should be equal. \rows provides two runtime heuristics for
setting $R$. The first follows the lead of the LSM work, and sets R
to be the square root of the ratio of the sizes of the smallest and
largest tree components, $\sqrt{|C2|/|C0|}$. However, computing this
value is complicated by two factors. First, \rows stores data
uncompressed in memory, while it uses compression on disk. Second,
the red-black tree we use has significant memory overhead. We take
compression into account by dividing the number of items in the tree
by the compression ratio. Tree overhead is taken into account when we
bound the number of tuples in $C0$.
The second approach to choosing tree sizes assumes that merge
processes do not compete for resources. Therefore, each time a merge
process completes, it immediately requests a new batch of tuples for
merge. This minimizes the size of $C0$ and $C1$ when the system is
under capacity, allowing it to gracefully handle bursts of load.
However, by shrinking $C0$ and $C1$, it decreases the efficiency of
$C2$'s merges. As load increases, this creates a backlog of tuples
for merge in $C1$ and $C0$ until the tree ratios equalize. By
allowing merges to continue as quickly as possible, this approach also
takes superscalar (the two merge threads take different types of trees
as input) and other runtime effects into account.
A hybrid between this greedy strategy and explicitly trying to balance
$R$ across tree components might yield a system that is more tolerant
of bursty workloads without decreasing maximum sustainable throughput.
XXX either repeat r varying experiments or cut this section.
\section{Conclusion}
Compressed LSM trees are practical on modern hardware. As CPU
resources increase XXX ... Our implementation is a first cut at a
working system; we have mentioned a number of implementation
limitations throughput this paper. In particular, while superscalar
compression algorithms provide significant benefits to \rows, their
real world performance (whether reading data from a parser, or working
within a \rows merge) is significantly lower than their performance in
isolation. On the input end of things, this is primarily due to the
expense of parsing, random memory I/O and moving uncompressed data
over the bus. Merge processes suffer from none of these limitations,
but are CPU bound due to the cost of decompressing, comparing, and
recompressing each tuple. Existing work to perform relational
operations (particularly join) on compressed representations of data
may improve the situation. In particular, it might be possible to
adapt column-oriented optimizations to \rowss multicolumn page layout.
[real conclusion here; the prior paragraph is future work]
XXX\cite{bowman:reasoning}
\bibliographystyle{abbrv}
\bibliography{rose} % sigproc.bib is the name of the Bibliography in this case
% You must have a proper ".bib" file
% and remember to run:
% latex bibtex latex latex
% to resolve all references
%
% ACM needs 'a single self-contained file'!
%
\balancecolumns % GM July 2000
% That's all folks!
\end{document}