refactoring; move init_stasis into logtable, put merge stats into their own module

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@792 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-04-28 19:21:25 +00:00
parent 418bc33805
commit 151373d09a
14 changed files with 46 additions and 120 deletions

View file

@ -17,8 +17,6 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/slotted.h> #include <stasis/page/slotted.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferManager/bufferHash.h>
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
// LOGTREE implementation // LOGTREE implementation
@ -33,20 +31,6 @@ const size_t diskTreeComponent::internalNodes::root_rec_size = sizeof(int64_t);
const int64_t diskTreeComponent::internalNodes::PREV_LEAF = 0; //pointer to prev leaf page const int64_t diskTreeComponent::internalNodes::PREV_LEAF = 0; //pointer to prev leaf page
const int64_t diskTreeComponent::internalNodes::NEXT_LEAF = 1; //pointer to next leaf page const int64_t diskTreeComponent::internalNodes::NEXT_LEAF = 1; //pointer to next leaf page
// TODO move init_stasis to a more appropriate module
void diskTreeComponent::internalNodes::init_stasis() {
bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE;
DataPage<datatuple>::register_stasis_page_impl();
stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; // XXX workaround stasis issue #22.
Tinit();
}
recordid diskTreeComponent::get_root_rid() { return ltree->get_root_rec(); } recordid diskTreeComponent::get_root_rid() { return ltree->get_root_rec(); }
recordid diskTreeComponent::get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); } recordid diskTreeComponent::get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); }
recordid diskTreeComponent::get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); } recordid diskTreeComponent::get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); }
@ -76,7 +60,7 @@ void diskTreeComponent::writes_done() {
} }
} }
int diskTreeComponent::insertTuple(int xid, datatuple *t, merge_stats_t *stats) int diskTreeComponent::insertTuple(int xid, datatuple *t, mergeStats *stats)
{ {
int ret = 0; // no error. int ret = 0; // no error.
if(dp==0) { if(dp==0) {
@ -141,7 +125,6 @@ datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t k
return tup; return tup;
} }
void diskTreeComponent::internalNodes::deinit_stasis() { Tdeinit(); }
recordid diskTreeComponent::internalNodes::create(int xid) { recordid diskTreeComponent::internalNodes::create(int xid) {
pageid_t root = internal_node_alloc->alloc_extent(xid, 1); pageid_t root = internal_node_alloc->alloc_extent(xid, 1);

View file

@ -10,6 +10,7 @@
#include "datapage.h" #include "datapage.h"
#include "datatuple.h" #include "datatuple.h"
#include "mergeStats.h"
class diskTreeComponent { class diskTreeComponent {
public: public:
@ -36,7 +37,7 @@ class diskTreeComponent {
recordid get_internal_node_allocator_rid(); recordid get_internal_node_allocator_rid();
internalNodes * get_internal_nodes() { return ltree; } internalNodes * get_internal_nodes() { return ltree; }
datatuple* findTuple(int xid, datatuple::key_t key, size_t keySize); datatuple* findTuple(int xid, datatuple::key_t key, size_t keySize);
int insertTuple(int xid, /*DataPage<datatuple> *dp,*/ datatuple *t, merge_stats_t *stats); int insertTuple(int xid, datatuple *t, mergeStats *stats);
void writes_done(); void writes_done();
@ -72,9 +73,6 @@ class diskTreeComponent {
public: public:
class internalNodes{ class internalNodes{
public: public:
// XXX move these to another module.
static void init_stasis();
static void deinit_stasis();
internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size); internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size);
internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state); internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state);

View file

@ -2,6 +2,9 @@
#include "merger.h" #include "merger.h"
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferManager/bufferHash.h>
#undef try #undef try
#undef end #undef end
@ -58,6 +61,18 @@ logtable<TUPLE>::~logtable()
delete tmerger; delete tmerger;
} }
template<class TUPLE>
void logtable<TUPLE>::init_stasis() {
DataPage<datatuple>::register_stasis_page_impl();
Tinit();
}
template<class TUPLE>
void logtable<TUPLE>::deinit_stasis() { Tdeinit(); }
template<class TUPLE> template<class TUPLE>
recordid logtable<TUPLE>::allocTable(int xid) recordid logtable<TUPLE>::allocTable(int xid)
{ {

View file

@ -7,28 +7,11 @@
#include <vector> #include <vector>
typedef struct merge_stats_t {
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging?
pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)?
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input?
} merge_stats_t;
#include "diskTreeComponent.h" #include "diskTreeComponent.h"
#include "memTreeComponent.h" #include "memTreeComponent.h"
#include "tuplemerger.h" #include "tuplemerger.h"
struct logtable_mergedata; struct logtable_mergedata;
template<class TUPLE> template<class TUPLE>
@ -63,8 +46,9 @@ public:
void openTable(int xid, recordid rid); void openTable(int xid, recordid rid);
void flushTable(); void flushTable();
inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls) static void init_stasis();
static void deinit_stasis();
inline uint64_t get_epoch() { return epoch; } inline uint64_t get_epoch() { return epoch; }
void registerIterator(iterator * it); void registerIterator(iterator * it);

View file

@ -7,48 +7,6 @@
#undef try #undef try
#undef end #undef end
void merge_stats_pp(FILE* fd, merge_stats_t &stats) {
long long sleep_time = stats.start.tv_sec - stats.sleep.tv_sec;
long long work_time = stats.done.tv_sec - stats.start.tv_sec;
long long total_time = sleep_time + work_time;
double mb_out = ((double)stats.bytes_out) /(1024.0*1024.0);
double mb_ins= ((double)stats.bytes_in_small) /(1024.0*1024.0);
double mb_inl = ((double)stats.bytes_in_large) /(1024.0*1024.0);
double kt_out = ((double)stats.num_tuples_out) /(1024.0);
double kt_ins= ((double)stats.num_tuples_in_small) /(1024.0);
double kt_inl = ((double)stats.num_tuples_in_large) /(1024.0);
double mb_hdd = mb_out + mb_inl + (stats.merge_level == 1 ? 0.0 : mb_ins);
double kt_hdd = kt_out + kt_inl + (stats.merge_level == 1 ? 0.0 : kt_ins);
fprintf(fd,
"=====================================================================\n"
"Thread %d merge %lld: sleep %lld sec, run %lld sec\n"
" megabytes kTuples datapages MB/s (real) kTup/s (real)\n"
"Wrote %7lld %7lld %9lld" " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Read (small) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
".....................................................................\n"
"avg tuple len: %6.2fkb\n"
"effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n"
" (%.2f; %.2f) wallclock" "\n"
".....................................................................\n"
,
stats.merge_level, stats.merge_count,
sleep_time,
work_time,
(long long)mb_out, (long long)kt_out, stats.num_datapages_out, mb_out / (double)work_time, mb_out / (double)total_time, kt_out / (double)work_time, kt_out / (double)total_time,
(long long)mb_ins, (long long)kt_ins, mb_ins / (double)work_time, mb_ins / (double)total_time, kt_ins / (double)work_time, kt_ins / (double)total_time,
(long long)mb_inl, (long long)kt_inl, mb_inl / (double)work_time, mb_inl / (double)total_time, kt_inl / (double)work_time, kt_inl / (double)total_time,
(long long)mb_hdd, (long long)kt_hdd, mb_hdd / (double)work_time, mb_hdd / (double)total_time, kt_hdd / (double)work_time, kt_hdd / (double)total_time,
mb_out / kt_out,
mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins
);
}
double merge_stats_nsec_to_merge_in_bytes(merge_stats_t); // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers)
int merge_scheduler::addlogtable(logtable<datatuple> *ltable) int merge_scheduler::addlogtable(logtable<datatuple> *ltable)
{ {
@ -235,7 +193,7 @@ void merge_iterators(int xid,
ITB *itrB, ITB *itrB,
logtable<datatuple> *ltable, logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, diskTreeComponent *scratch_tree,
merge_stats_t *stats, mergeStats *stats,
bool dropDeletes); bool dropDeletes);
@ -274,11 +232,7 @@ void* memMergeThread(void*arg)
while(true) // 1 while(true) // 1
{ {
merge_stats_t stats; mergeStats stats(1, merge_count);
memset((void*)&stats, 0, sizeof(stats));
stats.merge_level = 1;
stats.merge_count = merge_count;
gettimeofday(&stats.sleep,0);
writelock(ltable->header_lock,0); writelock(ltable->header_lock,0);
int done = 0; int done = 0;
// 2: wait for c0_mergable // 2: wait for c0_mergable
@ -410,7 +364,7 @@ void* memMergeThread(void*arg)
unlock(ltable->header_lock); unlock(ltable->header_lock);
gettimeofday(&stats.done, 0); gettimeofday(&stats.done, 0);
merge_stats_pp(stdout, stats); stats.pretty_print(stdout);
//TODO: get the freeing outside of the lock //TODO: get the freeing outside of the lock
} }
@ -434,11 +388,8 @@ void *diskMergeThread(void*arg)
while(true) while(true)
{ {
merge_stats_t stats; mergeStats stats(2, merge_count);
memset((void*)&stats, 0, sizeof(stats));
stats.merge_level = 2;
stats.merge_count = merge_count;
gettimeofday(&stats.sleep,0);
// 2: wait for input // 2: wait for input
writelock(ltable->header_lock,0); writelock(ltable->header_lock,0);
int done = 0; int done = 0;
@ -479,7 +430,7 @@ void *diskMergeThread(void*arg)
// 4: do the merge. // 4: do the merge.
//create the iterators //create the iterators
diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); //new iterator<datatuple>(ltable->get_tree_c2()->get_root_rec()); diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator();
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator();
//create a new tree //create a new tree
@ -530,7 +481,7 @@ void *diskMergeThread(void*arg)
unlock(ltable->header_lock); unlock(ltable->header_lock);
gettimeofday(&stats.done, 0); gettimeofday(&stats.done, 0);
merge_stats_pp(stdout, stats); stats.pretty_print(stdout);
} }
return 0; return 0;
@ -543,7 +494,7 @@ void merge_iterators(int xid,
ITA *itrA, //iterator on c1 or c2 ITA *itrA, //iterator on c1 or c2
ITB *itrB, //iterator on c0 or c1, respectively ITB *itrB, //iterator on c0 or c1, respectively
logtable<datatuple> *ltable, logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, merge_stats_t *stats, diskTreeComponent *scratch_tree, mergeStats *stats,
bool dropDeletes // should be true iff this is biggest component bool dropDeletes // should be true iff this is biggest component
) )
{ {

View file

@ -62,8 +62,6 @@ struct logtable_mergedata
}; };
#include "logstore.h" // XXX hacky include workaround.
class merge_scheduler class merge_scheduler
{ {
std::vector<std::pair<logtable<datatuple> *, logtable_mergedata*> > mergedata; std::vector<std::pair<logtable<datatuple> *, logtable_mergedata*> > mergedata;

View file

@ -37,7 +37,7 @@ void terminate (int param)
printf("Deinitializing stasis...\n"); printf("Deinitializing stasis...\n");
fflush(stdout); fflush(stdout);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();
exit(0); exit(0);
} }
@ -50,7 +50,7 @@ int main(int argc, char *argv[])
prev_fn = signal (SIGINT,terminate); prev_fn = signal (SIGINT,terminate);
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
int xid = Tbegin(); int xid = Tbegin();

View file

@ -30,7 +30,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
sync(); sync();
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
int xid = Tbegin(); int xid = Tbegin();
@ -118,7 +118,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
Tcommit(xid); Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();
} }

View file

@ -14,7 +14,7 @@ int main(int argc, char **argv)
sync(); sync();
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
int xid = Tbegin(); int xid = Tbegin();
@ -33,7 +33,7 @@ int main(int argc, char **argv)
delete ro_alloc; delete ro_alloc;
Tcommit(xid); Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();

View file

@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
sync(); sync();
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
int xid = Tbegin(); int xid = Tbegin();
@ -56,7 +56,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);
merge_stats_t *stats = (merge_stats_t*)calloc(sizeof(stats), 1); mergeStats *stats = (mergeStats*)calloc(sizeof(stats), 1);
for(size_t i = 0; i < NUM_ENTRIES; i++) for(size_t i = 0; i < NUM_ENTRIES; i++)
{ {
@ -121,7 +121,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("Random Reads completed.\n"); printf("Random Reads completed.\n");
Tcommit(xid); Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();
} }
/** @test /** @test

View file

@ -35,7 +35,7 @@ void insertProbeIter_str(int NUM_ENTRIES)
sync(); sync();
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
int xid = Tbegin(); int xid = Tbegin();
@ -137,7 +137,7 @@ void insertProbeIter_str(int NUM_ENTRIES)
delete it; delete it;
delete ro_alloc; delete ro_alloc;
Tcommit(xid); Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();
} }

View file

@ -24,7 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
unlink("storefile.txt"); unlink("storefile.txt");
unlink("logfile.txt"); unlink("logfile.txt");
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
//data generation //data generation
std::vector<std::string> * data_arr = new std::vector<std::string>; std::vector<std::string> * data_arr = new std::vector<std::string>;
@ -147,7 +147,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
Tcommit(xid); Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();
} }

View file

@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
sync(); sync();
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
//data generation //data generation
// std::vector<std::string> * data_arr = new std::vector<std::string>; // std::vector<std::string> * data_arr = new std::vector<std::string>;
@ -97,10 +97,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
gettimeofday(&stop_tv,0); gettimeofday(&stop_tv,0);
printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
//Tcommit(xid); logtable<datatuple>::deinit_stasis();
diskTreeComponent::internalNodes::deinit_stasis();
} }

View file

@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
sync(); sync();
diskTreeComponent::internalNodes::init_stasis(); logtable<datatuple>::init_stasis();
double delete_freq = .05; double delete_freq = .05;
double update_freq = .15; double update_freq = .15;
@ -249,7 +249,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
Tcommit(xid); Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis(); logtable<datatuple>::deinit_stasis();
} }