initial wal implementation

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2434 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2011-04-20 20:17:26 +00:00
parent e5058da393
commit 0508f6cb40
4 changed files with 95 additions and 6 deletions

View file

@ -4,6 +4,9 @@
#include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferManager/bufferHash.h>
#include <stasis/logger/logger2.h>
#include <stasis/logger/logHandle.h>
#include <stasis/logger/safeWrites.h>
#include "mergeStats.h"
#undef try
@ -22,6 +25,7 @@ static inline double tv_to_double(struct timeval tv)
template<class TUPLE>
logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
{
recovering = true;
this->max_c0_size = max_c0_size;
this->mean_c0_run_length = max_c0_size;
this->num_c0_mergers = 0;
@ -54,6 +58,17 @@ logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p
this->internal_region_size = internal_region_size;
this->datapage_region_size = datapage_region_size;
this->datapage_size = datapage_size;
const char * lsm_log_file_name = "lsm.logfile";
log_file = stasis_log_safe_writes_open(lsm_log_file_name,
stasis_log_file_mode,
stasis_log_file_permissions,
stasis_log_softcommit);
log_file->group_force =
stasis_log_group_force_init(log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec.
printf("Warning enable group force in logstore.cpp\n");
}
template<class TUPLE>
@ -98,7 +113,6 @@ void logtable<TUPLE>::deinit_stasis() { Tdeinit(); }
template<class TUPLE>
recordid logtable<TUPLE>::allocTable(int xid)
{
table_rec = Talloc(xid, sizeof(tbl_header));
mergeStats * stats = 0;
//create the big tree
@ -113,6 +127,7 @@ recordid logtable<TUPLE>::allocTable(int xid)
tree_c0 = new memTreeComponent<datatuple>::rbtree_t;
tbl_header.merge_manager = merge_mgr->talloc(xid);
tbl_header.log_trunc = 0;
update_persistent_header(xid);
return table_rec;
@ -131,8 +146,52 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
merge_mgr->new_merge(0);
}
template<class TUPLE>
void logtable<TUPLE>::update_persistent_header(int xid) {
void logtable<TUPLE>::logUpdate(datatuple * tup) {
LogEntry * e = stasis_log_write_update(log_file, 0, INVALID_PAGE, 0/*Page**/, 0/*op*/, tup->to_bytes(), tup->byte_length());
log_file->write_entry_done(log_file,e);
}
template<class TUPLE>
void logtable<TUPLE>::replayLog() {
lsn_t start = tbl_header.log_trunc;
LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file);
const LogEntry * e;
while((e = nextInLog(lh))) {
switch(e->type) {
case UPDATELOG: {
datatuple * tup = datatuple::from_bytes((byte*)stasis_log_entry_update_args_cptr(e));
// assert(e->update.funcID == 0/*LSMINSERT*/);
insertTuple(tup, false);
datatuple::freetuple(tup);
} break;
case INTERNALLOG: { } break;
default: assert(e->type == UPDATELOG); abort();
}
}
recovering = false;
printf("\nLog replay complete.\n");
}
template<class TUPLE>
lsn_t logtable<TUPLE>::get_log_offset() {
if(recovering) { return INVALID_LSN; }
return log_file->next_available_lsn(log_file);
}
template<class TUPLE>
void logtable<TUPLE>::truncate_log() {
if(recovering) {
printf("Not truncating log until recovery is complete.\n");
} else {
printf("truncating log to %lld\n", tbl_header.log_trunc);
log_file->truncate(log_file, tbl_header.log_trunc);
}
}
template<class TUPLE>
void logtable<TUPLE>::update_persistent_header(int xid, lsn_t trunc_lsn) {
tbl_header.c2_root = tree_c2->get_root_rid();
tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid();
@ -143,6 +202,11 @@ void logtable<TUPLE>::update_persistent_header(int xid) {
merge_mgr->marshal(xid, tbl_header.merge_manager);
if(trunc_lsn != INVALID_LSN) {
printf("\nsetting log truncation point to %lld\n", trunc_lsn);
tbl_header.log_trunc = trunc_lsn;
}
Tset(xid, table_rec, &tbl_header);
}
@ -506,6 +570,11 @@ void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
for(int i = 0; i < tuple_count; i++) {
merge_mgr->read_tuple_from_small_component(0, tuples[i]);
}
for(int i = 0; i < tuple_count; i++) {
logUpdate(tuples[i]);
}
log_file->force_tail(log_file, LOG_FORCE_COMMIT);
pthread_mutex_lock(&rb_mut);
int num_old_tups = 0;
pageid_t sum_old_tup_lens = 0;
@ -522,8 +591,12 @@ void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
}
template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple)
void logtable<TUPLE>::insertTuple(datatuple *tuple, bool should_log)
{
if(should_log) {
logUpdate(tuple);
log_file->force_tail(log_file, LOG_FORCE_COMMIT);
}
//lock the red-black tree
merge_mgr->read_tuple_from_small_component(0, tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut.
datatuple * pre_t = 0; // this is a pointer to any data tuples that we'll be deleting below. We need to update the merge_mgr statistics with it, but have to do so outside of the rb_mut region.

View file

@ -49,7 +49,7 @@ private:
datatuple * insertTupleHelper(datatuple *tuple);
public:
void insertManyTuples(struct datatuple **tuples, int tuple_count);
void insertTuple(struct datatuple *tuple);
void insertTuple(struct datatuple *tuple, bool should_log = true);
/** This test and set has strange semantics on two fronts:
*
* 1) It is not atomic with respect to non-testAndSet operations (which is fine in theory, since they have no barrier semantics, and we don't have a use case to support the extra overhead)
@ -62,6 +62,9 @@ public:
void openTable(int xid, recordid rid);
void flushTable();
void replayLog();
void logUpdate(datatuple * tup);
static void init_stasis();
static void deinit_stasis();
@ -92,7 +95,10 @@ public:
bool get_c0_is_merging() { return c0_is_merging; }
void set_c0_is_merging(bool is_merging) { c0_is_merging = is_merging; }
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
void update_persistent_header(int xid);
lsn_t get_log_offset();
void truncate_log();
void update_persistent_header(int xid, lsn_t log_trunc = INVALID_LSN);
inline tuplemerger * gettuplemerger(){return tmerger;}
@ -106,6 +112,7 @@ public:
recordid c1_state;
recordid c1_dp_state;
recordid merge_manager;
lsn_t log_trunc;
};
rwlc * header_mut;
pthread_mutex_t tick_mut;
@ -117,6 +124,9 @@ public:
mergeManager * merge_mgr;
stasis_log_t * log_file;
bool recovering;
bool accepting_new_requests;
inline bool is_still_running() { return !shutting_down_; }
inline void stop() {

View file

@ -85,6 +85,8 @@ void * merge_scheduler::memMergeThread() {
stats->starting_merge();
lsn_t merge_start = ltable_->get_log_offset();
printf("\nstarting memory merge. log offset is %lld\n", merge_start);
// 3: Begin transaction
xid = Tbegin();
@ -139,9 +141,11 @@ void * merge_scheduler::memMergeThread() {
double new_c1_size = stats->output_size();
pthread_cond_signal(&ltable_->c0_needed);
ltable_->update_persistent_header(xid);
ltable_->update_persistent_header(xid, merge_start);
Tcommit(xid);
ltable_->truncate_log();
//TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger

View file

@ -1,4 +1,5 @@
#include <stasis/transactional.h>
#include <stasis/logger/safeWrites.h>
#undef end
#undef try
#undef begin
@ -59,6 +60,7 @@ int main(int argc, char *argv[])
Tcommit(xid);
merge_scheduler * mscheduler = new merge_scheduler(&ltable);
mscheduler->start();
ltable.replayLog();
simpleServer *lserver = new simpleServer(&ltable);