diff --git a/logstore.cpp b/logstore.cpp index d3b9fad..93eb901 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -4,6 +4,9 @@ #include #include #include +#include +#include +#include #include "mergeStats.h" #undef try @@ -22,6 +25,7 @@ static inline double tv_to_double(struct timeval tv) template logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { + recovering = true; this->max_c0_size = max_c0_size; this->mean_c0_run_length = max_c0_size; this->num_c0_mergers = 0; @@ -54,6 +58,17 @@ logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p this->internal_region_size = internal_region_size; this->datapage_region_size = datapage_region_size; this->datapage_size = datapage_size; + + + const char * lsm_log_file_name = "lsm.logfile"; + + log_file = stasis_log_safe_writes_open(lsm_log_file_name, + stasis_log_file_mode, + stasis_log_file_permissions, + stasis_log_softcommit); + log_file->group_force = + stasis_log_group_force_init(log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec. + printf("Warning enable group force in logstore.cpp\n"); } template @@ -98,7 +113,6 @@ void logtable::deinit_stasis() { Tdeinit(); } template recordid logtable::allocTable(int xid) { - table_rec = Talloc(xid, sizeof(tbl_header)); mergeStats * stats = 0; //create the big tree @@ -113,6 +127,7 @@ recordid logtable::allocTable(int xid) tree_c0 = new memTreeComponent::rbtree_t; tbl_header.merge_manager = merge_mgr->talloc(xid); + tbl_header.log_trunc = 0; update_persistent_header(xid); return table_rec; @@ -131,8 +146,52 @@ void logtable::openTable(int xid, recordid rid) { merge_mgr->new_merge(0); } + template -void logtable::update_persistent_header(int xid) { +void logtable::logUpdate(datatuple * tup) { + LogEntry * e = stasis_log_write_update(log_file, 0, INVALID_PAGE, 0/*Page**/, 0/*op*/, tup->to_bytes(), tup->byte_length()); + log_file->write_entry_done(log_file,e); +} + +template +void logtable::replayLog() { + lsn_t start = tbl_header.log_trunc; + LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file); + const LogEntry * e; + while((e = nextInLog(lh))) { + switch(e->type) { + case UPDATELOG: { + datatuple * tup = datatuple::from_bytes((byte*)stasis_log_entry_update_args_cptr(e)); + // assert(e->update.funcID == 0/*LSMINSERT*/); + insertTuple(tup, false); + datatuple::freetuple(tup); + } break; + case INTERNALLOG: { } break; + default: assert(e->type == UPDATELOG); abort(); + } + } + recovering = false; + printf("\nLog replay complete.\n"); + +} + +template +lsn_t logtable::get_log_offset() { + if(recovering) { return INVALID_LSN; } + return log_file->next_available_lsn(log_file); +} +template +void logtable::truncate_log() { + if(recovering) { + printf("Not truncating log until recovery is complete.\n"); + } else { + printf("truncating log to %lld\n", tbl_header.log_trunc); + log_file->truncate(log_file, tbl_header.log_trunc); + } +} + +template +void logtable::update_persistent_header(int xid, lsn_t trunc_lsn) { tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); @@ -143,6 +202,11 @@ void logtable::update_persistent_header(int xid) { merge_mgr->marshal(xid, tbl_header.merge_manager); + if(trunc_lsn != INVALID_LSN) { + printf("\nsetting log truncation point to %lld\n", trunc_lsn); + tbl_header.log_trunc = trunc_lsn; + } + Tset(xid, table_rec, &tbl_header); } @@ -506,6 +570,11 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { for(int i = 0; i < tuple_count; i++) { merge_mgr->read_tuple_from_small_component(0, tuples[i]); } + for(int i = 0; i < tuple_count; i++) { + logUpdate(tuples[i]); + } + log_file->force_tail(log_file, LOG_FORCE_COMMIT); + pthread_mutex_lock(&rb_mut); int num_old_tups = 0; pageid_t sum_old_tup_lens = 0; @@ -522,8 +591,12 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { } template -void logtable::insertTuple(datatuple *tuple) +void logtable::insertTuple(datatuple *tuple, bool should_log) { + if(should_log) { + logUpdate(tuple); + log_file->force_tail(log_file, LOG_FORCE_COMMIT); + } //lock the red-black tree merge_mgr->read_tuple_from_small_component(0, tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut. datatuple * pre_t = 0; // this is a pointer to any data tuples that we'll be deleting below. We need to update the merge_mgr statistics with it, but have to do so outside of the rb_mut region. diff --git a/logstore.h b/logstore.h index 94165a9..c47cded 100644 --- a/logstore.h +++ b/logstore.h @@ -49,7 +49,7 @@ private: datatuple * insertTupleHelper(datatuple *tuple); public: void insertManyTuples(struct datatuple **tuples, int tuple_count); - void insertTuple(struct datatuple *tuple); + void insertTuple(struct datatuple *tuple, bool should_log = true); /** This test and set has strange semantics on two fronts: * * 1) It is not atomic with respect to non-testAndSet operations (which is fine in theory, since they have no barrier semantics, and we don't have a use case to support the extra overhead) @@ -62,6 +62,9 @@ public: void openTable(int xid, recordid rid); void flushTable(); + void replayLog(); + void logUpdate(datatuple * tup); + static void init_stasis(); static void deinit_stasis(); @@ -92,7 +95,10 @@ public: bool get_c0_is_merging() { return c0_is_merging; } void set_c0_is_merging(bool is_merging) { c0_is_merging = is_merging; } void set_tree_c0_mergeable(memTreeComponent::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } - void update_persistent_header(int xid); + lsn_t get_log_offset(); + void truncate_log(); + + void update_persistent_header(int xid, lsn_t log_trunc = INVALID_LSN); inline tuplemerger * gettuplemerger(){return tmerger;} @@ -106,6 +112,7 @@ public: recordid c1_state; recordid c1_dp_state; recordid merge_manager; + lsn_t log_trunc; }; rwlc * header_mut; pthread_mutex_t tick_mut; @@ -117,6 +124,9 @@ public: mergeManager * merge_mgr; + stasis_log_t * log_file; + bool recovering; + bool accepting_new_requests; inline bool is_still_running() { return !shutting_down_; } inline void stop() { diff --git a/merger.cpp b/merger.cpp index 5affb60..5bc96aa 100644 --- a/merger.cpp +++ b/merger.cpp @@ -85,6 +85,8 @@ void * merge_scheduler::memMergeThread() { stats->starting_merge(); + lsn_t merge_start = ltable_->get_log_offset(); + printf("\nstarting memory merge. log offset is %lld\n", merge_start); // 3: Begin transaction xid = Tbegin(); @@ -139,9 +141,11 @@ void * merge_scheduler::memMergeThread() { double new_c1_size = stats->output_size(); pthread_cond_signal(<able_->c0_needed); - ltable_->update_persistent_header(xid); + ltable_->update_persistent_header(xid, merge_start); Tcommit(xid); + ltable_->truncate_log(); + //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger diff --git a/newserver.cpp b/newserver.cpp index 7c9c0fe..11f9614 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -1,4 +1,5 @@ #include +#include #undef end #undef try #undef begin @@ -59,6 +60,7 @@ int main(int argc, char *argv[]) Tcommit(xid); merge_scheduler * mscheduler = new merge_scheduler(<able); mscheduler->start(); + ltable.replayLog(); simpleServer *lserver = new simpleServer(<able);