% This is "sig-alternate.tex" V1.3 OCTOBER 2002 % This file should be compiled with V1.6 of "sig-alternate.cls" OCTOBER 2002 % % This example file demonstrates the use of the 'sig-alternate.cls' % V1.6 LaTeX2e document class file. It is for those submitting % articles to ACM Conference Proceedings WHO DO NOT WISH TO % STRICTLY ADHERE TO THE SIGS (PUBS-BOARD-ENDORSED) STYLE. % The 'sig-alternate.cls' file will produce a similar-looking, % albeit, 'tighter' paper resulting in, invariably, fewer pages. % % ---------------------------------------------------------------------------------------------------------------- % This .tex file (and associated .cls V1.6) produces: % 1) The Permission Statement % 2) The Conference (location) Info information % 3) The Copyright Line with ACM data % 4) Page numbers % % as against the acm_proc_article-sp.cls file which % DOES NOT produce 1) thru' 3) above. % % Using 'sig-alternate.cls' you have control, however, from within % the source .tex file, over both the CopyrightYear % (defaulted to 2002) and the ACM Copyright Data % (defaulted to X-XXXXX-XX-X/XX/XX). % e.g. % \CopyrightYear{2003} will cause 2002 to appear in the copyright line. % \crdata{0-12345-67-8/90/12} will cause 0-12345-67-8/90/12 to appear in the copyright line. % % --------------------------------------------------------------------------------------------------------------- % This .tex source is an example which *does* use % the .bib file (from which the .bbl file % is produced). % REMEMBER HOWEVER: After having produced the .bbl file, % and prior to final submission, you *NEED* to 'insert' % your .bbl file into your source .tex file so as to provide % ONE 'self-contained' source file. % % ================= IF YOU HAVE QUESTIONS ======================= % Questions regarding the SIGS styles, SIGS policies and % procedures, Conferences etc. should be sent to % Adrienne Griscti (griscti@acm.org) % % Technical questions _only_ to % Gerald Murray (murray@acm.org) % =============================================================== % % For tracking purposes - this is V1.3 - OCTOBER 2002 \documentclass{sig-alternate-sigmod08} \usepackage{xspace,color} \newcommand{\rows}{Rows\xspace} \newcommand{\rowss}{Rows'\xspace} \begin{document} % % --- Author Metadata here --- \conferenceinfo{ACM SIGMOD}{'08 Vancouver, BC, Canada} %\CopyrightYear{2001} % Allows default copyright year (2000) to be over-ridden - IF NEED BE. %\crdata{0-12345-67-8/90/01} % Allows default copyright data (0-89791-88-6/97/05) to be over-ridden - IF NEED BE. % --- End of Author Metadata --- \title{{\ttlit \rows}: Compressed, log-structured replication} % % You need the command \numberofauthors to handle the "boxing" % and alignment of the authors under the title, and to add % a section for authors number 4 through n. % \numberofauthors{3} \author{} %\author{Russell Sears \and Mark Callaghan \and Eric Brewer} \maketitle \begin{abstract} This paper describes \rows\footnote{[Clever acronym here]}, a database storage engine designed for high-throughput replication. It targets applications with write-intensive (seek limited) transaction processing workloads and near-realtime decision support and analytical processing queries. \rows uses {\em log structured merge} (LSM) trees to create full database replicas using purely sequential I/O. It provides access to inconsistent data in real-time and consistent data with a few seconds delay. \rows was written to support micropayment transactions. Here, we apply it to archival of weather data. A \rows replica serves two purposes. First, by avoiding seeks, \rows reduces the load on the replicas' disks, leaving surplus I/O capacity for read-only queries and allowing inexpensive hardware to handle workloads produced by database machines with tens of disks. This allows decision support and OLAP queries to scale linearly with the number of machines, regardless of lock contention and other bottlenecks associated with distributed transactions. Second, \rows replica groups provide highly available copies of the database. In Internet-scale environments, decision support queries may be more important than update availability. %\rows targets seek-limited update-in-place OLTP applications, and uses %a {\em log structured merge} (LSM) tree to trade disk bandwidth for %seeks. LSM-trees translate random updates into sequential scans and %bulk loads. Their performance is limited by the sequential I/O %bandwidth required by a vacuumer analogous to merges in %sort-merge join. \rows uses column compression to reduce this %bottleneck. \rowss throughput is limited by sequential I/O bandwidth. We use column compression to reduce this bottleneck. Rather than reassemble rows from a column-oriented disk layout, we adapt existing column compression algorithms to a new row-oriented data layout. This introduces negligible space overhead and can be applied to most single-pass, randomly accessible compression formats. Our prototype uses lightweight (superscalar) column compression algorithms. Existing analytical models and our experiments show that, for disk bound workloads, \rows provides significantly higher replication throughput than conventional replication techniques. Finally, we introduce an easily measured metric that predicts replication performance of \rows implementations in a variety of deployment scenarios. \end{abstract} %% SIGMOD DOESN'T USE THESE % A category with the (minimum) three required fields %\category{H.4}{Information Systems Applications}{Miscellaneous} %A category including the fourth, optional field follows... %\category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures] %\terms{Delphi theory} %\keywords{ACM proceedings, \LaTeX, text tagging} \section{Introduction} \rows is a database replication engine for workloads with high volumes of in-place updates. Traditional database updates are difficult to scale beyond a certain size. Once data exceeds the size of RAM, any attempt to update data in place is severely limited by the cost of drive head seeks. This can be addressed by adding more drives, which increases cost and decreases reliability. Alternatively, the database can be run on a cluster of machines, providing improved scalability at great expense. These problems lead large-scale database installations to partition their workloads across multiple servers, allowing linear scalability, but sacrificing consistency between data stored on different partitions. Fortunately, updates often deal with well-defined subsets of the data; with an appropriate partitioning scheme, one can achieve linear scalability for localized updates. The cost of partitioning is that no single coherent version of the data exists; queries that rely on disparate portions of the database must either run multiple queries, or, if they are too expensive to run on master database instances, are delegated to data warehousing systems. Although \rows cannot process SQL update queries directly, it is able to replicate conventional database instances at a fraction of the cost of the master database server. Like a data warehousing solution, this decreases the cost of large, read-only OLAP and decision support queries. \rows does this without introducing significant replication latency. Therefore, we think of \rows as a compromise between data warehousing solutions (which provide extremely efficient access to data after a significant delay), and database replication systems (which cost nearly as much as the database instances they replicate). The low cost of \rows replicas also allows database administrators to replicate multiple master databases on the same machine, simplifying queries that span partitions. \subsection{System design} A \rows replica takes a {\em replication log} as input. The replication log should record each transaction begin, commit, and abort performed by the master database, along with the pre and post images associated with each tuple update. The ordering of these entries should match the order in which they are applied at the database master. Upon receiving a log entry, \rows applies it to an in-memory tree, and the update is immediately reflected by inconsistent reads. \rows provides snapshot consistency to readers that require transactional isolation. It does so in a lock-free manner; transactions' reads and writes are not tracked, and no \rows transaction can ever force another to block or abort. When given appropriate workloads, \rows provides extremely low-latency replication. Transactionally consistent data becomes available after a delay on the order of the duration of a few update transactions. To prevent the in-memory tree from growing without bound, a merge process iterates over the (sorted) tree entries, and merges them with existing on-disk tree entries. As the database increases in size, the on disk tree grows, forcing \rows to compare an ever-increasing number of existing tuples with each new tuple. To mitigate this effect, the on-disk tree is periodically emptied by merging it with a second, larger tree. In order to look up a tuple stored in \rows, a query must examine all three trees, typically starting with the in-memory (fastest, and most up-to-date) component, and then moving on to progressively larger and out-of-date trees. In order to perform a range scan, the query can either iterate over the trees manually, or wait until the next round of merging occurs, and apply the scan to tuples as the mergers examine them. By waiting until the tuples are due to be merged, the range-scan can occur with zero I/O cost, at the expense of significant delay. Merge throughput is bounded by sequential I/O bandwidth, and index probe performance is limited by the amount of available memory. \rows uses compression to trade surplus computational power for scarce storage resources. XXX provide a quick paper outline here. 1-2 paragraphs describing the flow of the paper. \section{Related Work} XXX This section isn't really written. Instead, it just contains notes about what it should contain. \subsection{Row-based schemes, MySQL page compression} MySQL compression: Guess compression ratio. If fits, great. If not, split b-tree page. Based on conventional compression algs. Lots of database compression literature. Generally accepted that compression increases throughput when memory / bandwidth savings is more important than CPU overhead. \subsection{Column-based compression} There are a few column oriented compression papers to cite. Gather up the references. Biggest distinction: We're near real-time regardless up update access patterns. Prior perf studies focus on compression kernel throughput. At least for \rows, the results don't apply, as other operations now dominate. (Kernels for merges, etc could help.) \subsection{Snapshot consistency} cite '81 survey; two broad approaches: locking, timestamp / mvcc. The later makes serialized order explicit. (Useful for replication systems) \subsection{Replication techniques and log shipping} Large body of work on replication techniques and topologies. Largely orthogonal to \rows. We place two restrictions on the master. First, it must ship writes (before and after images) for each update, not queries, as some DB's do. \rows must be given, or be able to infer the order in which each transaction's data should be committed to the database, and such an order must exist. If there is no such order, and the offending transactions are from different, \rows will commit their writes to the database in a different order than the master. \section{\rowss I/O performance} As far as we know, \rows is the first LSM-tree implementation. This section provides an overview of LSM-trees, and steps through a rough analysis of LSM-tree performance on current hardware (we refer the reader to the original LSM work for a thorough analytical discussion of LSM performance). We defer discussion of the CPU overhead of compression to later sections, and simply account for I/O and memory bottlenecks in this section. \subsection{Tree merging} % XXX mention Sql server `bulk loaded' mode. % figures? % xxx am I counting right when I say three level lsm trees? An LSM-tree consists of a number of underlying trees. For simplicity, this paper considers three component LSM-trees. Component zero ($C0$) is an in-memory binary search tree. Components one and two ($C1$, $C2$) are read-only, bulk-loaded B-trees. Only $C0$ is updated in place. Each update is handled in three stages. In the first stage, the update is applied to the in-memory tree. Next, once enough updates have been applied, the tuple is merged with existing tuples in $C1$. The merge process performs a sequential scan over the in-memory tree and $C1$, producing a new version of $C1$. Conceptually, when the merge is complete, $C1$ is atomically replaced with the new tree, and $C0$ is atomically replaced with an empty tree. The process is then eventually repeated when $C1$ and $C2$ are merged. At that point, the insertion will not cause any more I/O operations. Although our prototype replaces entire trees at once, this approach introduces a number of performance problems. The original LSM work proposes a more sophisticated scheme that addresses some of these issues. Instead of replacing entire trees at once, it replaces one subtree at a time. This reduces peak storage and memory requirements. Atomic replacement of portions of an LSM-tree would cause ongoing merges to block insertions, and force the mergers to run in lock step. (This is the ``crossing iterator'' problem mentioned in the LSM paper.) We address this issue by allowing data to be inserted into the new version of the smaller component before the merge completes. This forces \rows to check both versions of components $C0$ and $C1$ in order to look up each tuple, but it addresses the crossing iterator problem without resorting to fine-grained concurrency control. Applying this approach to subtrees reduces the impact of these extra probes, which could be filtered with a range comparison in the common case. In a populated LSM-tree $C2$ is the largest component, and $C0$ is the smallest component. The original LSM-tree work proves that throughput is maximized when the ratio of the sizes of $C1$ to $C0$ is equal to the ratio between $C2$ and $C1$. They call this ratio $R$. Note that (on average in a steady state) for every $C0$ tuple consumed by a merge, $R$ tuples from $C1$ must be examined. Similarly, each time a tuple in $C1$ is consumed, $R$ tuples from $C2$ are examined. Therefore, in a steady state, insertion throughput cannot exceed $R * cost_{read~and~write~C2}$, or $R * cost_{read~and~write~C1}$. Note that the total size of the tree is approximately $R^2$ (neglecting the data stored in $C0$ and $C1$)\footnote{The proof that keeping R constant across our three tree components follows from the fact that the mergers compete for I/O bandwidth and $x(1-x)$ is maximized when $x=0.5$. The LSM-tree paper proves the general case.}. \subsection{Replication Throughput} LSM-trees have different asymptotic performance characteristics than conventional index structures. In particular, the amortized cost of insertion is $O(\sqrt{n})$ in the size of the data. This cost is $O(log~n)$ for a B-tree. The relative costs of sequential and random I/O determine whether or not \rows is able to outperform B-trees in practice. Starting with the (more familiar) B-tree case, in the steady state, we can expect each index update to perform two random disk accesses (one evicts a page, the other reads a page): \[ cost_{Btree~update}=2~cost_{random~io} \] (We assume that the upper levels of the B-tree are memory resident.) If we assume uniform access patterns, 4 KB pages and 100 byte tuples, this means that an uncompressed B-tree would keep $\sim2.5\%$ of the tuples in memory. Prefix compression and a skewed update distribution would improve the situation significantly, but are not considered here. Without a skewed update distribution, batching I/O into sequential writes only helps if a significant fraction of the tree's data fits in RAM. In \rows, we have: \[ cost_{LSMtree~update}=2*2*2*R*\frac{cost_{sequential~io}}{compression~ratio} %% not S + sqrt S; just 2 sqrt S. \] where $R$ is the ratio of adjacent tree component sizes ($R^2=\frac{|tree|}{|mem|}$). We multiply by $2R$ because each new tuple is eventually merged into both of the larger components, and each merge involves $R$ comparisons with existing tuples on average. An update of a tuple is handled as a deletion of the old tuple (an insertion of a tombstone), and an insertion of the new tuple, leading to a second factor of two. The third reflects the fact that the merger must read existing tuples into memory before writing them back to disk. The $compression~ratio$ is $\frac{uncompressed~size}{compressed~size}$. For simplicity, we assume that the compression ratio is the same throughout each component of the LSM-tree; \rows addresses this at run time by dynamically adjusting component sizes\footnote{todo implement this}. Our test hardware's hard drive is a 7200RPM, 750 GB Seagate Barracuda ES. %has a manufacturer-reported average rotational latency of %$4.16~msec$, seek times of $8.5/9.5~msec$ (read/write), and a maximum %sustained throughput of $78~MB/s$. Third party benchmarks\footnote{http://www.storagereview.com/ST3750640NS.sr} report random access times of $12.3/13.5~msec$ and $44.3-78.5~MB/s$ sustained throughput. Timing {\tt dd if=/dev/zero of=file; sync} on an empty ext3 file system suggests our test hardware provides $57.5MB/s$ of storage bandwidth. %We used two hard drives for our tests, a smaller, high performance drive with an average seek time of $9.3~ms$, a %rotational latency of $4.2~ms$, and a manufacturer reported raw %throughput of $150~mb/s$. Our buffer manager achieves $\sim 27~mb/s$ %throughput; {\tt dd if=/dev/zero of=file} writes at $\sim 30.5~mb/s$. Assuming a fixed hardware configuration, and measuring cost in disk time, we have: %\[ % cost_{sequential~io}=\frac{|tuple|}{30.5*1024^2}=0.000031268~msec %\] %% 12.738854 \[ cost_{sequential}=\frac{|tuple|}{78.5MB/s}=12.7~|tuple|~~nsec/tuple~(min) \] %% 22.573363 \[ cost_{sequential}=\frac{|tuple|}{44.3MB/s}=22.6~|tuple|~~nsec/tuple~(max) \] and \[ cost_{random}=\frac{12.3+13.5}{2} = 12.9~msec/tuple \] Pessimistically setting \[ 2~cost_{random}\approx1,000,000\frac{cost_{sequential}}{|tuple|} \] yields: \[ \frac{cost_{LSMtree~update}}{cost_{Btree~update}}=\frac{2*2*2*R*cost_{sequential}}{compression~ratio*2*cost_{random}} % \frac{cost_{LSMtree~update}}{cost_{Btree~update}} \approx \frac{(S + \sqrt{S})}{|tuple|~compression~ratio~250,000} \] \[ \approx\frac{R*|tuple|}{250,000*compression~ratio} \] If tuples are 100 bytes and we assume a compression ratio of 4 (lower than we expect to see in practice, but numerically convenient), the LSM-tree outperforms the B-tree when: \[ R < \frac{250,000*compression~ratio}{|tuple|} \] \[ R < 10,000 \] %750 gb throughput = 1 / (((8 * 27 * 22.6 * 100) / 4) * (ns)) = 8.00198705 khz % 1 / (((8 * 2.73 * 100 * 22.6) / 4) * (ns)) on a machine that can store 1 GB in an in-memory tree, this yields a maximum ``interesting'' tree size of $R^2*1GB = $ 100 petabytes, well above the actual drive capacity of $750~GB$. A $750~GB$ tree would have a $C2$ component 750 times larger than the 1GB $C0$ component. Therefore, it would have an R of $\sqrt{750}\approx27$; we would expect such a tree to have a sustained insertion throughput of approximately 8000 tuples / second, or 800 kbyte/sec\footnote{It would take 11 days to overwrite every tuple on the drive in random order.}; two orders of magnitude above the 83 I/O operations that the drive can deliver per second, and well above the 41.5 tuples / sec we would expect from a B-tree with a $18.5~GB$ buffer pool. Increasing \rowss system memory to cache 10 GB of tuples would increase write performance by a factor of $\sqrt{10}$. % 41.5/(1-80/750) = 46.4552239 Increasing memory another ten fold to 100GB would yield an LSM-tree with an R of $\sqrt{750/100} = 2.73$ and a throughput of 81,000 tuples/sec. In contrast, the B-tree could cache roughly 80GB of leaf pages in memory, and write approximately $\frac{41.5}{(1-(80/750)} = 46.5$ tuples/sec. Increasing memory further yields a system that is no longer disk bound. Assuming that the system CPUs are fast enough to allow \rows compression and merge routines to keep up with the bandwidth supplied by the disks, we conclude that \rows will provide significantly higher replication throughput for disk bound applications. \subsection{Indexing} Our analysis ignores the cost of allocating and initializing our LSM-trees' internal nodes. The compressed data constitutes the leaf pages of the tree. Each time the compression process fills a page, it inserts an entry into the leftmost entry in the tree, allocating additional nodes if necessary. Our prototype does not compress internal tree nodes\footnote{This is a limitation of our prototype; not our approach. Internal tree nodes are append-only and, at the very least, the page id data is amenable to compression. Like B-tree compression, this would decrease the memory used by index probes.}, so it writes one tuple into the tree's internal nodes per compressed page. \rows inherits a default page size of 4KB from the transaction system we based it upon. Although 4KB is fairly small by modern standards, \rows is not particularly sensitive to page size; even with 4KB pages, \rowss per-page overheads are negligible. Assuming tuples are 100 bytes, $\sim\frac{1}{40}$th of our pages are dedicated to the lowest level of tree nodes, with $\frac{1}{40}$th that number devoted to the next highest level, and so on. See Table~\ref{table:treeCreation} for a comparison of compression performance with and without tree creation enabled. The data was generated by applying \rowss compressors to randomly generated five column, 1,000,000 row tables. Across five runs, RLE's page count had a standard deviation of $\sigma=2.35$; the other values had $\sigma=0$. (XXX the tables should match the text. Measure 100 byte tuples?) %Throughput's $\sigma<6MB/s$. \begin{table} \caption{Tree creation overhead - five column (20 bytes/column)} \centering \label{table:treeCreation} \begin{tabular}{|l|c|c|c|} \hline Format & Compression & Page count \\ \hline %& Throughput\\ \hline FOR & 1.96x & 2494 \\ \hline %& 133.4 MB/s \\ \hline FOR + tree & 1.94x & +80 \\ \hline %& 129.8 MB/s \\ \hline RLE & 3.24x & 1505 \\ \hline %& 150.6 MB/s \\ \hline RLE + tree & 3.22x & +21 \\ %& 148.4 MB/s \\ \hline\end{tabular} \end{table} \begin{table} \caption{Tree creation overhead - 100 columns (400 bytes/column)} \centering \label{table:treeCreationTwo} \begin{tabular}{|l|c|c|c|} \hline Format & Compression & Page count \\ \hline %& Throughput\\ \hline FOR & 1.37x & 7143 \\ \hline %& 133.4 MB/s \\ \hline FOR + tree & 1.17x & 8335 \\ \hline %& 129.8 MB/s \\ \hline RLE & 1.75x & 5591 \\ \hline %& 150.6 MB/s \\ \hline RLE + tree & 1.50x & 6525 \\ %& 148.4 MB/s \\ \hline\end{tabular} \end{table} As the size of the tuples increases, as in Table~\ref{table:treeCreationTwo} ($N=5$; $\sigma < 7.26$ pages), the number of compressed pages that each internal tree node points to decreases, increasing the overhead of tree creation. In such circumstances, internal tree node compression and larger pages should improve the situation. \subsection{Isolation} \rows groups replicated transactions into snapshots. Each transaction is assigned to a snapshot according to a timestamp; two snapshots are active at any given time. \rows assigns incoming transactions to the newer of the two active snapshots. Once all transactions in the older snapshot have completed, that snapshot is marked inactive, exposing its contents to new queries that request a consistent view of the data. At this point a new active snapshot is created, and the process continues. The timestamp is simply the snapshot number. In the case of a tie during merging (such as two tuples with the same primary key and timestamp), the version from the newest (lower numbered) component is taken. This ensures that, within each snapshot, \rows applies all updates in the same order as the primary database. Across snapshots, concurrent transactions (which can write non-conflicting tuples in arbitrary orders) lead to reordering of updates. However, these updates are guaranteed to be applied in transaction order. The correctness of this scheme hinges on the correctness of the timestamps applied to each transaction. If the master database provides snapshot isolation using multiversion concurrency control (as is becoming increasingly popular), we can simply reuse the timestamp it applies to each transaction. If the master uses two phase locking, the situation becomes more complex, as we have to use the commit time of each transaction\footnote{This works if all transactions use transaction-duration write locks, and lock release and commit occur atomically. Transactions that obtain short write locks can be treated as a set of single action transactions.}. Until the commit time is known, \rows stores the transaction id in the LSM-tree. As transactions are committed, it records the mapping from transaction id to snapshot. Eventually, the merger translates transaction id's to snapshots, preventing the mapping from growing without bound. New snapshots are created in two steps. First, all transactions in epoch $t-1$ must complete (commit or abort) so that they are guaranteed to never apply updates to the database again. In the second step, \rowss current snapshot number is incremented, and new read-only transactions are assigned to snapshot $t-1$. Each such transaction is granted a shared lock on the existence of the snapshot, protecting that version of the database from garbage collection. In order to ensure that new snapshots are created in a timely and predictable fashion, the time between them should acceptably short, but still slightly longer than the longest running transaction. \subsubsection{Isolation performance impact} Although \rowss isolation mechanisms never block the execution of index operations, their performance degrades in the presence of long running transactions. Long running updates block the creation of new snapshots. Ideally, upon encountering such a transaction, \rows simply asks the master database to abort the offending update. It then waits until appropriate rollback (or perhaps commit) entries appear in the replication log, and creates the new snapshot. While waiting for the transactions to complete, \rows continues to process replication requests by extending snapshot $t$. Of course, proactively aborting long running updates is simply an optimization. Without a surly database administrator to defend it against application developers, \rows does not send abort requests, but otherwise behaves identically. Read-only queries that are interested in transactional consistency continue to read from (the increasingly stale) snapshot $t-2$ until $t-1$'s long running updates commit. Long running queries present a different set of challenges to \rows. Although \rows provides fairly efficient time-travel support, versioning databases are not our target application. \rows provides each new read-only query with guaranteed access to a consistent version of the database. Therefore, long-running queries force \rows to keep old versions of overwritten tuples around until the query completes. These tuples increase the size of \rowss LSM-trees, increasing merge overhead. If the space consumed by old versions of the data is a serious issue, long running queries should be disallowed. Alternatively, historical, or long-running queries could be run against certain snapshots (every 1000th, or the first one of the day, for example), reducing the overhead of preserving old versions of frequently updated data. \subsubsection{Merging and Garbage collection} \rows merges components by iterating over them in order, removing obsolete and duplicate tuples and writing the rest into a new version of the largest component. In order to determine whether or not a tuple is obsolete, \rows compares the tuple's timestamp with any matching tombstones (or record creations, if the tuple is a tombstone), and with any tuples that match on primary key. Upon encountering such candidates for deletion, \rows compares their timestamps with the set of locked snapshots. If there are no snapshots between the tuple being examined and the updated version, then the tuple can be deleted. Tombstone tuples can also be deleted once they reach $C2$ and any older matching tuples have been removed. Once \rows completes its scan over existing components (and registers new ones in their places), it frees the regions of pages that stored the components. \subsection{Parallelism} \rows provides ample opportunities for parallelism. All of its operations are lock-free; concurrent readers and writers work independently, avoiding blocking, deadlock and livelock. Index probes must latch $C0$ in order to perform a lookup, but the more costly probes into $C1$ and $C2$ are against read-only trees; beyond locating and pinning tree components against deallocation, probes of these components do not interact with the merge processes. Our prototype exploits replication's piplelined parallelism by running each component's merge process in a separate thread. In practice, this allows our prototype to exploit two to three processor cores during replication.[XXX need experimental evidence...] During bulk load, the buffer manager, which uses Linux's {\tt sync\_file\_range} function allows \rows to asynchronously force regions [XXX currently, we do a synchronous force with sync\_file\_range....] of the page file to disk. \rows has control over region size; if the system is CPU bound \rows can ensure that the time spent waiting for synchronous page writes is negligible, by choosing an appropriate region size. On the other hand, if the system is disk bound, the same asynchronous force mechanism ensures that \rows overlaps computation with I/O. [XXX this is probably too detailed; we should just say that \rows uses standard techniques to overlap computation and I/O] Remaining cores can be used by queries, or (as hardware designers increase the number of processor cores per package) by using data parallelism to split each merge across multiple threads. Therefore, given ample storage bandwidth, we expect the throughput of \rows replication to increase with Moore's law for the foreseeable future. \subsection{Recovery} Like other log structured storage systems, \rowss recovery process is inexpensive and straightforward. However, \rows does not attempt to ensure that transactions are atomically committed to disk, and is not meant to supplement or replace the master database's recovery log. Instead, recovery occurs in two steps. Whenever \rows writes a tree component to disk, it does so by beginning a new transaction in the transaction manager that \rows is based upon. Next, it allocates contiguous regions of storage space (generating one log entry per region), and performs a B-tree style bulk load of the new tree into these regions (this bulk load does not produce any log entries). Finally, \rows forces the tree's regions to disk, and writes the list of regions used by the tree and the location of the tree's root to normal (write ahead logged) records. Finally, it commits the underlying transaction. After the underlying transaction manager completes recovery, \rows will have a set of intact and complete tree components. Space taken up by partially written trees was allocated by an aborting transaction, and has been reclaimed by the transaction manager's recovery mechanism. After the underlying recovery mechanisms complete, \rows reads the last committed timestamp from the LSM-tree header, and begins playback of the replication log at the appropriate position. Upon committing new components to disk, \rows allows the appropriate portion of the replication log to be truncated. \section{Row compression} Disk heads are the primary storage bottleneck for most OLTP environments, and disk capacity is of secondary concern. Therefore, database compression is generally performed to improve system performance, not capacity. In \rows, sequential I/O throughput is the primary replication bottleneck; and is proportional to the compression ratio. Furthermore, compression increases the effective size of the buffer pool, which is the primary bottleneck for \rowss random index probes. Although \rows targets row-oriented workloads, its compression routines are based upon column-oriented techniques and rely on the assumption that pages are indexed in an order that yields easily compressible columns. \rowss compression formats are based on our {\em multicolumn} compression format. In order to store data from an $N$ column table, we divide the page into $N+1$ variable length regions. $N$ of these regions each contain a compressed column in an existing column-based compression format[XXX cite them]. The remaining region contains ``exceptional'' column data (potentially from more than one columns). For example, a column might be encoded using the {\em frame of reference} (FOR) algorithm, which stores a column of integers as a single offset value and a list of deltas. When a value too different from the offset to be encoded as a delta is encountered, an offset into the exceptions region is stored. The resulting algorithm is called {\em patched frame of reference} (PFOR) in the literature. \rowss multicolumn pages extend this idea by allowing multiple columns (each with its own compression algorithm) to coexist on each page. This section discusses the computational and storage overhead of the multicolumn compression approach. \subsection{Multicolumn computational overhead} \rows builds upon compression algorithms that are amenable to superscalar optimization, and can achieve throughputs in excess of 1GB/s on current hardware. Although multicolumn support introduces significant overhead, \rowss variants of these approaches run within an order of magnitude of published speeds. Additional computational overhead is introduced in two areas. First, \rows compresses each column in a separate buffer, then uses {\tt memcpy()} to gather this data into a single page buffer before writing it to disk. This {\tt memcpy()} occurs once per page allocation. Second, we need a way to translate requests to write a tuple into calls to appropriate page formats and compression implementations. Unless we hardcode our \rows executable to support a predefined set of page formats (and table schemas), this invokes an extra {\tt for} loop (over the columns) whose body contains a {\tt switch} statement (in order to choose between column compressors) to each tuple compression request. % explain how append works \subsection{The {\tt \large append()} operation} \rowss compressed pages provide an {\tt tupleAppend()} operation that takes a tuple as input, and returns {\tt false} if the page does not have room for the new tuple. {\tt tupleAppend()} consists of a dispatch routine that calls {\tt append()} on each column in turn. Each column's {\tt append()} routine secures storage space for the column value, or returns {\tt false} if no space is available. {\tt append()} has the following signature: \begin{quote} {\tt append(COL\_TYPE value, int* exception\_offset, void* exceptions\_base, void* column\_base, int* freespace) } \end{quote} where {\tt value} is the value to be appended to the column, {\tt exception\_offset} is a pointer to the first free byte in the exceptions region, {\tt exceptions\_base} and {\tt column\_base} point to (page sized) buffers used to store exceptions and column data as the page is being written to. One copy of these buffers exist for each page that \rows is actively writing to (one per disk-resident LSM-tree component); they do not significantly increase \rowss memory requirements. Finally, {\tt freespace} is a pointer to the number of free bytes remaining on the page. The multicolumn format initializes these values when the page is allocated. As {\tt append()} implementations are called they update this data accordingly. Initially, our multicolumn module managed these values and the exception space. This led to extra arithmetic operations and conditionals and did not significantly simplify the code. % contrast with prior work Existing superscalar compression algorithms assume they have access to a buffer of uncompressed data and that they are able to make multiple passes over the data during compression. This allows them to remove branches from loop bodies, improving compression throughput. We opted to avoid this approach in \rows, as it would increase the complexity of the {\tt append()} interface, and add a buffer to \rowss merge processes. \subsection{Static code generation} % discuss templatization of code After evaluating the performance of a C implementation of \rowss compression routines, we decided to rewrite the compression routines as C++ templates. C++ template instantiation performs compile-time macro substitutions. We declare all functions {\tt inline}, and place them in header files (rather than separate compilation units). This gives g++ the opportunity to perform optimizations such as cross-module constant propagation and branch elimination. It also allows us to write code that deals with integer data types instead of void pointers without duplicating code or breaking encapsulation. Such optimizations are possible in C, but, because of limitations of the preprocessor, would be difficult to express or require separate code-generation utilities. We found that this set of optimizations improved compression and decompression performance by roughly an order of magnitude. To illustrate this, Table~\ref{table:optimization} compares compressor throughput with and without compiler optimizations enabled. Although compressor throughput varies with data distributions and type, optimizations yield a similar performance improvement across varied datasets and random data distributions. We performed one additional set of optimizations. Rather than instantiate each compressor template once for each column type at compile time, we instantiate a multicolumn page format template for each page format we wish to support. This removes the {\tt for} loop and {\tt switch} statement that supporting multiple columns per page introduced, but hardcodes page schemas at compile time. The two approaches could coexist in a single runtime environment, allowing the use of hardcoded implementations for performance critical tables, while falling back on slower, general purpose implementations for previously unseen table layouts. \subsection{Buffer manager interface extensions} \rows uses a preexisting, conventional database buffer manager. Each page contains an LSN (which is largely unused, as we bulk-load \rowss trees) and a page implementation number. Pages are stored in a hashtable keyed by page number, and replaced using an LRU strategy\footnote{LRU is a particularly poor choice, given that \rowss I/O is dominated by large table scans. Eventually, we hope to add support for a DBMIN[xxx cite]-style page replacement policy.}. In implementing \rows, we made use of a number of non-standard (but generally useful) callbacks. The first, {\tt pageLoaded()} instantiates a new multicolumn page implementation when the page is first read into memory. The second, {\tt pageFlushed()} informs our multicolumn implementation that the page is about to be written to disk, and the third {\tt pageEvicted()} invokes the multicolumn destructor. (XXX are these really non-standard?) As we mentioned above, pages are split into a number of temporary buffers while they are being written, and are then packed into a contiguous buffer before being flushed. Although this operation is expensive, it does present an opportunity for parallelism. \rows provides a per-page operation, {\tt pack()} that performs the translation. We can register {\tt pack()} as a {\tt pageFlushed()} callback or we can explicitly call it during (or shortly after) compression. {\tt pageFlushed()} could be safely executed in a background thread with minimal impact on system performance. However, the buffer manager was written under the assumption that the cost of in-memory operations is negligible. Therefore, it blocks all buffer management requests while {\tt pageFlushed()} is being executed. In practice, this causes multiple \rows threads to block on each {\tt pack()}. Also, {\tt pack()} reduces \rowss memory utilization by freeing up temporary compression buffers. Delaying its execution for too long might cause this memory to be evicted from processor cache before the {\tt memcpy()} can occur. For all these reasons, our merge processes explicitly invoke {\tt pack()} as soon as possible. {\tt pageLoaded()} and {\tt pageEvicted()} allow us to amortize page sanity checks and header parsing across many requests to read from a compressed page. When a page is loaded from disk, {\tt pageLoaded()} associates the page with the appropriate optimized multicolumn implementation (or with the slower generic multicolumn implementation, if necessary), and then allocates and initializes a small amount of metadata containing information about the number, types and positions of columns on a page. Requests to read records or append data to the page use this cached data rather than re-reading the information from the page. \subsection{Storage overhead} The multicolumn page format is quite similar to the format of existing column-wise compression formats. The algorithms we implemented have page formats that can be (broadly speaking) divided into two sections. The first section is a header that contains an encoding of the size of the compressed region, and perhaps a piece of uncompressed exemplar data (as in frame of reference compression). The second section typically contains the compressed data. A multicolumn page contains this information in addition to metadata describing the position and type of each column. The type and number of columns could be encoded in the ``page type'' field, or be explicitly represented using a few bytes per page column. Allocating 16 bits for the page offset and 16 bits for the column type compressor uses 4 bytes per column. Therefore, the additional overhead for an N column page is \[ (N-1) * (4 + |average~compression~format~header|) \] % XXX the first mention of RLE is here. It should come earlier. bytes. A frame of reference column header consists of 2 bytes to record the number of encoded rows and a single uncompressed value. Run length encoding headers consist of a 2 byte count of compressed blocks. Therefore, in the worst case (frame of reference encoding 64-bit integers, and \rowss 4KB pages) our prototype's multicolumn format uses $14/4096\approx0.35\%$ of the page to store each column header. If the data does not compress well, and tuples are large, additional storage may be wasted because \rows does not split tuples across pages. Tables~\ref{table:treeCreation} and~\ref{table:treeCreationTwo}, which draw column values from independent, identical distributions, show that \rowss compression ratio can be significantly impacted by large tuples. % XXX graph of some sort to show this? Breaking pages into smaller compressed blocks changes the compression ratio in another way; the compressibility of the data varies with the size of each compressed block. For example, when frame of reference is applied to sorted data, incoming values eventually drift too far from the page offset, causing them to be stored as exceptional values. Therefore (neglecting header bytes), smaller frame of reference blocks provide higher compression ratios. Of course, conventional compression algorithms are free to divide their compressed data into blocks to maximize compression ratios. Although \rowss smaller compressed block size benefits some compression implementations (and does not adversely impact either of the algorithms we implemented), it creates an additional constraint, and may interact poorly with some compression algorithms. [XXX saw paper that talks about this] \subsection{Supporting Random Access} The multicolumn page format is designed to allow efficient row-oriented access to data. The efficiency of random access within a page depends on the format used by individual compressors. \rows compressors support two access methods. The first looks up a value by slot id. This operation is $O(1)$ for frame of reference columns, and $O(log~n)$ (in the number of runs of identical values on the page) for run length encoded columns. The second operation is used to look up tuples by value, and is based on the assumption that the the tuples are stored in the page in sorted order. It takes a range of slot ids and a value, and returns the offset of the first and last instance of the value within the range. This operation is $O(log~n)$ (in the number of values in the range) for frame of reference columns, and $O(log~n)$ (in the number of runs on the page) for run length encoded columns. The multicolumn implementation uses this method to look up tuples by beginning with the entire page in range, and calling each compressor's implementation in order to narrow the search until the correct tuple(s) are located or the range is empty. Note that partially-matching tuples are only partially examined during the search, and that our binary searches within a column should have better cache locality than searches of row-oriented page layouts. We have not examined the tradeoffs between different implementations of tuple lookups. Currently, rather than using binary search to find the boundaries of each range, our compressors simply iterate over the compressed representation of the data in order to progressively narrow the range of tuples to be considered. It is possible that (given expensive branch mispredictions and \rowss small pages) that our linear search implementation will outperform approaches based upon binary search. \section{Evaluation} (graphs go here) \subsection{The data set} Weather data\footnote{National Severe Storms Laboratory Historical Weather Data Archives, Norman, Oklahoma, from their Web site at http://data.nssl.noaa.gov} \subsection{Merge throughput in practice} RB <-> LSM tree merges contain different code and perform different I/O than LSM <-> LSM mergers. The former must perform random memory accesses, and performs less I/O. They run at different speeds. Their relative speeds are unpredictable. \subsection{Choosing R} We want to maximize the rate of insertion into $C0$, meaning we need to maximize the rate at which tuples are removed from $C0$. This means that the rate of tuple consumption by the $C2$ merger should match that of the $C1$ merger\footnote{The LSM work uses I/O cost as a proxy for throughput; with compression and superscalar effects, this is less than ideal}. Otherwise, $C2$ will back up, causing $C1$ to block, or $C2$ will starve, implying the $C1$ could run more quickly. Now, the C1 merger and C2 merger processes execute different code; C1 consumes tuples from a red black tree, and a on disk tree, while C2 consumes tuples from two on-disk trees. \subsection{Implementation performance constant} Measure peak merge throughput if n times drive throughput; can produce n drives worth of throughput at given compression ratio. \section{Conclusion} Compressed LSM trees are practical on modern hardware. As CPU resources increase XXX ... Our implementation is a first cut at a working system; we have mentioned a number of implementation limitations throughput this paper. In particular, while superscalar compression algorithms provide significant benefits to \rows, their real world performance (whether reading data from a parser, or working within a \rows merge) is significantly lower than their performance in isolation. On the input end of things, this is primarily due to the expense of parsing, random memory I/O and moving uncompressed data over the bus. Merge processes suffer from none of these limitations, but are CPU bound due to the cost of decompressing, comparing, and recompressing each tuple. Existing work to perform relational operations (particularly join) on compressed representations of data may improve the situation. In particular, it might be possible to adapt column-oriented optimizations to \rowss multicolumn page layout. [real conclusion here; the prior paragraph is future work] XXX\cite{bowman:reasoning} \bibliographystyle{abbrv} \bibliography{rose} % sigproc.bib is the name of the Bibliography in this case % You must have a proper ".bib" file % and remember to run: % latex bibtex latex latex % to resolve all references % % ACM needs 'a single self-contained file'! % \balancecolumns % GM July 2000 % That's all folks! \end{document}