simplified merge_scheduler
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1479 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
8dddc7e168
commit
a00531ae6d
14 changed files with 168 additions and 255 deletions
17
logstore.cpp
17
logstore.cpp
|
@ -23,7 +23,7 @@ template<class TUPLE>
|
||||||
logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
|
logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
|
||||||
{
|
{
|
||||||
|
|
||||||
r_val = MIN_R;
|
r_val = 3.0; // MIN_R
|
||||||
tree_c0 = NULL;
|
tree_c0 = NULL;
|
||||||
tree_c0_mergeable = NULL;
|
tree_c0_mergeable = NULL;
|
||||||
c0_is_merging = false;
|
c0_is_merging = false;
|
||||||
|
@ -36,8 +36,6 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
|
||||||
this->shutting_down_ = false;
|
this->shutting_down_ = false;
|
||||||
flushing = false;
|
flushing = false;
|
||||||
this->merge_mgr = new mergeManager(this);
|
this->merge_mgr = new mergeManager(this);
|
||||||
this->mergedata = 0;
|
|
||||||
//tmerger = new tuplemerger(&append_merger);
|
|
||||||
tmerger = new tuplemerger(&replace_merger);
|
tmerger = new tuplemerger(&replace_merger);
|
||||||
|
|
||||||
header_mut = rwlc_initlock();
|
header_mut = rwlc_initlock();
|
||||||
|
@ -61,6 +59,8 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
|
||||||
template<class TUPLE>
|
template<class TUPLE>
|
||||||
logtable<TUPLE>::~logtable()
|
logtable<TUPLE>::~logtable()
|
||||||
{
|
{
|
||||||
|
delete merge_mgr; // shuts down pretty print thread.
|
||||||
|
|
||||||
if(tree_c1 != NULL)
|
if(tree_c1 != NULL)
|
||||||
delete tree_c1;
|
delete tree_c1;
|
||||||
if(tree_c2 != NULL)
|
if(tree_c2 != NULL)
|
||||||
|
@ -85,7 +85,7 @@ template<class TUPLE>
|
||||||
void logtable<TUPLE>::init_stasis() {
|
void logtable<TUPLE>::init_stasis() {
|
||||||
|
|
||||||
DataPage<datatuple>::register_stasis_page_impl();
|
DataPage<datatuple>::register_stasis_page_impl();
|
||||||
stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages:
|
//stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages:
|
||||||
// XXX Workaround Stasis' (still broken) default concurrent buffer manager
|
// XXX Workaround Stasis' (still broken) default concurrent buffer manager
|
||||||
// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
|
// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
|
||||||
// stasis_buffer_manager_hint_writes_are_sequential = 0;
|
// stasis_buffer_manager_hint_writes_are_sequential = 0;
|
||||||
|
@ -112,6 +112,8 @@ recordid logtable<TUPLE>::allocTable(int xid)
|
||||||
merge_mgr->new_merge(0);
|
merge_mgr->new_merge(0);
|
||||||
c0_stats->starting_merge();
|
c0_stats->starting_merge();
|
||||||
|
|
||||||
|
tree_c0 = new memTreeComponent<datatuple>::rbtree_t;
|
||||||
|
|
||||||
update_persistent_header(xid, 1);
|
update_persistent_header(xid, 1);
|
||||||
update_persistent_header(xid, 2);
|
update_persistent_header(xid, 2);
|
||||||
|
|
||||||
|
@ -124,6 +126,7 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
|
||||||
Tread(xid, table_rec, &tbl_header);
|
Tread(xid, table_rec, &tbl_header);
|
||||||
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0);
|
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0);
|
||||||
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0);
|
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0);
|
||||||
|
tree_c0 = new memTreeComponent<datatuple>::rbtree_t;
|
||||||
|
|
||||||
merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size;
|
merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size;
|
||||||
merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size;
|
merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size;
|
||||||
|
@ -160,12 +163,6 @@ void logtable<TUPLE>::update_persistent_header(int xid, int merge_level) {
|
||||||
Tset(xid, table_rec, &tbl_header);
|
Tset(xid, table_rec, &tbl_header);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class TUPLE>
|
|
||||||
void logtable<TUPLE>::setMergeData(logtable_mergedata * mdata){
|
|
||||||
this->mergedata = mdata;
|
|
||||||
bump_epoch();
|
|
||||||
}
|
|
||||||
|
|
||||||
template<class TUPLE>
|
template<class TUPLE>
|
||||||
void logtable<TUPLE>::flushTable()
|
void logtable<TUPLE>::flushTable()
|
||||||
{
|
{
|
||||||
|
|
|
@ -100,9 +100,6 @@ public:
|
||||||
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
|
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
|
||||||
void update_persistent_header(int xid, int merge_level);
|
void update_persistent_header(int xid, int merge_level);
|
||||||
|
|
||||||
void setMergeData(logtable_mergedata * mdata);
|
|
||||||
logtable_mergedata* getMergeData(){return mergedata;}
|
|
||||||
|
|
||||||
inline tuplemerger * gettuplemerger(){return tmerger;}
|
inline tuplemerger * gettuplemerger(){return tmerger;}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
@ -118,8 +115,6 @@ public:
|
||||||
pageid_t c1_mergeable_size;
|
pageid_t c1_mergeable_size;
|
||||||
pageid_t c1_base_size;
|
pageid_t c1_base_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
logtable_mergedata * mergedata;
|
|
||||||
rwlc * header_mut;
|
rwlc * header_mut;
|
||||||
pthread_mutex_t tick_mut;
|
pthread_mutex_t tick_mut;
|
||||||
pthread_mutex_t rb_mut;
|
pthread_mutex_t rb_mut;
|
||||||
|
|
|
@ -23,13 +23,13 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeManager::~mergeManager() {
|
mergeManager::~mergeManager() {
|
||||||
delete c0;
|
|
||||||
delete c1;
|
|
||||||
delete c2;
|
|
||||||
still_running = false;
|
still_running = false;
|
||||||
pthread_cond_signal(&pp_cond);
|
pthread_cond_signal(&pp_cond);
|
||||||
pthread_join(pp_thread, 0);
|
pthread_join(pp_thread, 0);
|
||||||
pthread_cond_destroy(&pp_cond);
|
pthread_cond_destroy(&pp_cond);
|
||||||
|
delete c0;
|
||||||
|
delete c1;
|
||||||
|
delete c2;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mergeManager::new_merge(int mergeLevel) {
|
void mergeManager::new_merge(int mergeLevel) {
|
||||||
|
|
|
@ -64,16 +64,14 @@ public:
|
||||||
private:
|
private:
|
||||||
logtable<datatuple>* ltable;
|
logtable<datatuple>* ltable;
|
||||||
double throttle_seconds;
|
double throttle_seconds;
|
||||||
// double elapsed_seconds;
|
|
||||||
double last_throttle_seconds;
|
double last_throttle_seconds;
|
||||||
// double last_elapsed_seconds;
|
|
||||||
mergeStats * c0;
|
mergeStats * c0;
|
||||||
mergeStats * c1;
|
mergeStats * c1;
|
||||||
mergeStats * c2;
|
mergeStats * c2;
|
||||||
bool sleeping[3];
|
bool sleeping[3];
|
||||||
bool still_running;
|
bool still_running;
|
||||||
|
// Needed so that the pretty print thread can be woken up during shutdown.
|
||||||
pthread_cond_t pp_cond;
|
pthread_cond_t pp_cond;
|
||||||
pthread_t pp_thread;
|
pthread_t pp_thread;
|
||||||
|
|
||||||
};
|
};
|
||||||
#endif /* MERGEMANAGER_H_ */
|
#endif /* MERGEMANAGER_H_ */
|
||||||
|
|
|
@ -46,17 +46,13 @@ class mergeStats {
|
||||||
lifetime_elapsed(0),
|
lifetime_elapsed(0),
|
||||||
lifetime_consumed(0),
|
lifetime_consumed(0),
|
||||||
bps(10.0*1024.0*1024.0),
|
bps(10.0*1024.0*1024.0),
|
||||||
print_skipped(0),
|
|
||||||
active(false) {
|
active(false) {
|
||||||
gettimeofday(&sleep,0);
|
gettimeofday(&sleep,0);
|
||||||
gettimeofday(&last,0);
|
gettimeofday(&last,0);
|
||||||
mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last));
|
mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last));
|
||||||
mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last));
|
mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last));
|
||||||
pthread_mutex_init(&mut,0);
|
|
||||||
}
|
|
||||||
~mergeStats() {
|
|
||||||
pthread_mutex_destroy(&mut);
|
|
||||||
}
|
}
|
||||||
|
~mergeStats() { }
|
||||||
void new_merge2() {
|
void new_merge2() {
|
||||||
if(just_handed_off) {
|
if(just_handed_off) {
|
||||||
bytes_out = 0;
|
bytes_out = 0;
|
||||||
|
@ -151,11 +147,8 @@ class mergeStats {
|
||||||
|
|
||||||
double bps;
|
double bps;
|
||||||
|
|
||||||
int print_skipped; // used by pretty print in mergeManager.
|
|
||||||
|
|
||||||
bool active;
|
bool active;
|
||||||
|
|
||||||
pthread_mutex_t mut; // protects things touched in tick(), and nothing else.
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
void pretty_print(FILE* fd) {
|
void pretty_print(FILE* fd) {
|
||||||
|
|
250
merger.cpp
250
merger.cpp
|
@ -1,78 +1,29 @@
|
||||||
|
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include "merger.h"
|
#include "merger.h"
|
||||||
|
|
||||||
|
|
||||||
#include <stasis/transactional.h>
|
#include <stasis/transactional.h>
|
||||||
#undef try
|
#undef try
|
||||||
#undef end
|
#undef end
|
||||||
|
|
||||||
int merge_scheduler::addlogtable(logtable<datatuple> *ltable)
|
static void* memMerge_thr(void* arg) {
|
||||||
{
|
return ((merge_scheduler*)arg)->memMergeThread();
|
||||||
|
}
|
||||||
struct logtable_mergedata * mdata = new logtable_mergedata;
|
static void* diskMerge_thr(void* arg) {
|
||||||
|
return ((merge_scheduler*)arg)->diskMergeThread();
|
||||||
// initialize merge data
|
|
||||||
ltable->set_tree_c0_mergeable(NULL);
|
|
||||||
|
|
||||||
mergedata.push_back(std::make_pair(ltable, mdata));
|
|
||||||
return mergedata.size()-1;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
merge_scheduler::~merge_scheduler()
|
merge_scheduler::merge_scheduler(logtable<datatuple> *ltable) : ltable_(ltable), MIN_R(3.0) { }
|
||||||
{
|
merge_scheduler::~merge_scheduler() { }
|
||||||
mergedata.clear();
|
|
||||||
|
|
||||||
|
void merge_scheduler::shutdown() {
|
||||||
|
ltable_->stop();
|
||||||
|
pthread_join(mem_merge_thread_, 0);
|
||||||
|
pthread_join(disk_merge_thread_, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void merge_scheduler::shutdown()
|
void merge_scheduler::start() {
|
||||||
{
|
pthread_create(&mem_merge_thread_, 0, memMerge_thr, this);
|
||||||
//signal shutdown
|
pthread_create(&disk_merge_thread_, 0, diskMerge_thr, this);
|
||||||
for(size_t i=0; i<mergedata.size(); i++)
|
|
||||||
{
|
|
||||||
logtable<datatuple> *ltable = mergedata[i].first;
|
|
||||||
|
|
||||||
ltable->stop();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
for(size_t i=0; i<mergedata.size(); i++)
|
|
||||||
{
|
|
||||||
logtable_mergedata *mdata = mergedata[i].second;
|
|
||||||
|
|
||||||
pthread_join(mdata->memmerge_thread,0);
|
|
||||||
pthread_join(mdata->diskmerge_thread,0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
|
|
||||||
{
|
|
||||||
|
|
||||||
logtable<datatuple> * ltable = mergedata[index].first;
|
|
||||||
struct logtable_mergedata *mdata = mergedata[index].second;
|
|
||||||
|
|
||||||
//initialize rb-tree
|
|
||||||
ltable->set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
|
|
||||||
|
|
||||||
//disk merger args
|
|
||||||
#ifdef NO_SNOWSHOVEL
|
|
||||||
ltable->set_max_c0_size(MAX_C0_SIZE);
|
|
||||||
#else
|
|
||||||
ltable->set_max_c0_size(MAX_C0_SIZE*2); // XXX blatant hack.
|
|
||||||
#endif
|
|
||||||
diskTreeComponent ** block1_scratch = new diskTreeComponent*;
|
|
||||||
*block1_scratch=0;
|
|
||||||
|
|
||||||
DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page);
|
|
||||||
DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page);
|
|
||||||
|
|
||||||
void * (*diskmerger)(void*) = diskMergeThread;
|
|
||||||
void * (*memmerger)(void*) = memMergeThread;
|
|
||||||
|
|
||||||
pthread_create(&mdata->diskmerge_thread, 0, diskmerger, ltable);
|
|
||||||
pthread_create(&mdata->memmerge_thread, 0, memmerger, ltable);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class ITA, class ITB>
|
template <class ITA, class ITB>
|
||||||
|
@ -106,51 +57,49 @@ void merge_iterators(int xid, diskTreeComponent * forceMe,
|
||||||
</pre>
|
</pre>
|
||||||
Merge algorithm: actual order: 1 2 3 4 5 6 12 11.5 11 [7 8 (9) 10] 13
|
Merge algorithm: actual order: 1 2 3 4 5 6 12 11.5 11 [7 8 (9) 10] 13
|
||||||
*/
|
*/
|
||||||
void* memMergeThread(void*arg)
|
void * merge_scheduler::memMergeThread() {
|
||||||
{
|
|
||||||
|
|
||||||
int xid;
|
int xid;
|
||||||
|
|
||||||
logtable<datatuple> * ltable = (logtable<datatuple>*)arg;
|
assert(ltable_->get_tree_c1());
|
||||||
assert(ltable->get_tree_c1());
|
|
||||||
|
|
||||||
int merge_count =0;
|
int merge_count =0;
|
||||||
mergeStats * stats = ltable->merge_mgr->get_merge_stats(1);
|
mergeStats * stats = ltable_->merge_mgr->get_merge_stats(1);
|
||||||
|
|
||||||
while(true) // 1
|
while(true) // 1
|
||||||
{
|
{
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable_->header_mut);
|
||||||
ltable->merge_mgr->new_merge(1);
|
ltable_->merge_mgr->new_merge(1);
|
||||||
int done = 0;
|
int done = 0;
|
||||||
// 2: wait for c0_mergable
|
// 2: wait for c0_mergable
|
||||||
#ifdef NO_SNOWSHOVEL
|
#ifdef NO_SNOWSHOVEL
|
||||||
while(!ltable->get_tree_c0_mergeable())
|
while(!ltable_->get_tree_c0_mergeable())
|
||||||
{
|
{
|
||||||
pthread_cond_signal(<able->c0_needed);
|
pthread_cond_signal(<able_->c0_needed);
|
||||||
|
|
||||||
if(!ltable->is_still_running()){
|
if(!ltable_->is_still_running()){
|
||||||
done = 1;
|
done = 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
DEBUG("mmt:\twaiting for block ready cond\n");
|
DEBUG("mmt:\twaiting for block ready cond\n");
|
||||||
|
|
||||||
rwlc_cond_wait(<able->c0_ready, ltable->header_mut);
|
rwlc_cond_wait(<able_->c0_ready, ltable_->header_mut);
|
||||||
|
|
||||||
DEBUG("mmt:\tblock ready\n");
|
DEBUG("mmt:\tblock ready\n");
|
||||||
|
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
// the merge iterator will wait until c0 is big enough for us to proceed.
|
// the merge iterator will wait until c0 is big enough for us to proceed.
|
||||||
if(!ltable->is_still_running()) {
|
if(!ltable_->is_still_running()) {
|
||||||
done = 1;
|
done = 1;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if(done==1)
|
if(done==1)
|
||||||
{
|
{
|
||||||
pthread_cond_signal(<able->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
|
pthread_cond_signal(<able_->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
|
||||||
rwlc_unlock(ltable->header_mut);
|
rwlc_unlock(ltable_->header_mut);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,40 +111,40 @@ void* memMergeThread(void*arg)
|
||||||
// 4: Merge
|
// 4: Merge
|
||||||
|
|
||||||
//create the iterators
|
//create the iterators
|
||||||
diskTreeComponent::iterator *itrA = ltable->get_tree_c1()->open_iterator();
|
diskTreeComponent::iterator *itrA = ltable_->get_tree_c1()->open_iterator();
|
||||||
#ifdef NO_SNOWSHOVEL
|
#ifdef NO_SNOWSHOVEL
|
||||||
memTreeComponent<datatuple>::iterator *itrB =
|
memTreeComponent<datatuple>::iterator *itrB =
|
||||||
new memTreeComponent<datatuple>::iterator(ltable->get_tree_c0_mergeable());
|
new memTreeComponent<datatuple>::iterator(ltable_->get_tree_c0_mergeable());
|
||||||
#else
|
#else
|
||||||
// memTreeComponent<datatuple>::revalidatingIterator *itrB =
|
// memTreeComponent<datatuple>::revalidatingIterator *itrB =
|
||||||
// new memTreeComponent<datatuple>::revalidatingIterator(ltable->get_tree_c0(), <able->rb_mut);
|
// new memTreeComponent<datatuple>::revalidatingIterator(ltable_->get_tree_c0(), <able_->rb_mut);
|
||||||
// memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
|
// memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
|
||||||
// new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable->get_tree_c0(), <able->tree_bytes, ltable->max_c0_size, <able->flushing, 100, <able->rb_mut);
|
// new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), <able_->tree_bytes, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut);
|
||||||
#endif
|
#endif
|
||||||
const int64_t min_bloom_target = ltable->max_c0_size;
|
const int64_t min_bloom_target = ltable_->max_c0_size;
|
||||||
|
|
||||||
//create a new tree
|
//create a new tree
|
||||||
diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100);
|
diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100);
|
||||||
|
|
||||||
ltable->set_tree_c1_prime(c1_prime);
|
ltable_->set_tree_c1_prime(c1_prime);
|
||||||
|
|
||||||
rwlc_unlock(ltable->header_mut);
|
rwlc_unlock(ltable_->header_mut);
|
||||||
#ifndef NO_SNOWSHOVEL
|
#ifndef NO_SNOWSHOVEL
|
||||||
// needs to be past the rwlc_unlock...
|
// needs to be past the rwlc_unlock...
|
||||||
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
|
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
|
||||||
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable->get_tree_c0(), <able->tree_bytes, ltable->max_c0_size, <able->flushing, 100, <able->rb_mut);
|
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), <able_->tree_bytes, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut);
|
||||||
#endif
|
#endif
|
||||||
//: do the merge
|
//: do the merge
|
||||||
DEBUG("mmt:\tMerging:\n");
|
DEBUG("mmt:\tMerging:\n");
|
||||||
|
|
||||||
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, c1_prime, itrA, itrB, ltable, c1_prime, stats, false);
|
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, c1_prime, itrA, itrB, ltable_, c1_prime, stats, false);
|
||||||
|
|
||||||
delete itrA;
|
delete itrA;
|
||||||
delete itrB;
|
delete itrB;
|
||||||
|
|
||||||
// 5: force c1'
|
// 5: force c1'
|
||||||
|
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable_->header_mut);
|
||||||
|
|
||||||
//force write the new tree to disk
|
//force write the new tree to disk
|
||||||
c1_prime->force(xid);
|
c1_prime->force(xid);
|
||||||
|
@ -208,24 +157,24 @@ void* memMergeThread(void*arg)
|
||||||
// first, we need to move the c1' into c1.
|
// first, we need to move the c1' into c1.
|
||||||
|
|
||||||
// 12: delete old c1
|
// 12: delete old c1
|
||||||
ltable->get_tree_c1()->dealloc(xid);
|
ltable_->get_tree_c1()->dealloc(xid);
|
||||||
delete ltable->get_tree_c1();
|
delete ltable_->get_tree_c1();
|
||||||
|
|
||||||
// 10: c1 = c1'
|
// 10: c1 = c1'
|
||||||
ltable->set_tree_c1(c1_prime);
|
ltable_->set_tree_c1(c1_prime);
|
||||||
ltable->set_tree_c1_prime(0);
|
ltable_->set_tree_c1_prime(0);
|
||||||
|
|
||||||
#ifdef NO_SNOWSHOVEL
|
#ifdef NO_SNOWSHOVEL
|
||||||
// 11.5: delete old c0_mergeable
|
// 11.5: delete old c0_mergeable
|
||||||
memTreeComponent<datatuple>::tearDownTree(ltable->get_tree_c0_mergeable());
|
memTreeComponent<datatuple>::tearDownTree(ltable_->get_tree_c0_mergeable());
|
||||||
// 11: c0_mergeable = NULL
|
// 11: c0_mergeable = NULL
|
||||||
ltable->set_tree_c0_mergeable(NULL);
|
ltable_->set_tree_c0_mergeable(NULL);
|
||||||
#endif
|
#endif
|
||||||
ltable->set_c0_is_merging(false);
|
ltable_->set_c0_is_merging(false);
|
||||||
double new_c1_size = stats->output_size();
|
double new_c1_size = stats->output_size();
|
||||||
pthread_cond_signal(<able->c0_needed);
|
pthread_cond_signal(<able_->c0_needed);
|
||||||
|
|
||||||
ltable->update_persistent_header(xid, 1);
|
ltable_->update_persistent_header(xid, 1);
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
//TODO: this is simplistic for now
|
//TODO: this is simplistic for now
|
||||||
|
@ -233,28 +182,28 @@ void* memMergeThread(void*arg)
|
||||||
|
|
||||||
// update c0 effective size.
|
// update c0 effective size.
|
||||||
double frac = 1.0/(double)merge_count;
|
double frac = 1.0/(double)merge_count;
|
||||||
ltable->num_c0_mergers = merge_count;
|
ltable_->num_c0_mergers = merge_count;
|
||||||
ltable->mean_c0_effective_size =
|
ltable_->mean_c0_effective_size =
|
||||||
(int64_t) (
|
(int64_t) (
|
||||||
((double)ltable->mean_c0_effective_size)*(1-frac) +
|
((double)ltable_->mean_c0_effective_size)*(1-frac) +
|
||||||
((double)stats->bytes_in_small*frac));
|
((double)stats->bytes_in_small*frac));
|
||||||
ltable->merge_mgr->get_merge_stats(0)->target_size = ltable->mean_c0_effective_size;
|
ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size;
|
||||||
double target_R = *ltable->R();
|
double target_R = *ltable_->R();
|
||||||
|
|
||||||
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable->max_c0_size, (long long int)ltable->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable->max_c0_size, ((double)ltable->mean_c0_effective_size) / (double)ltable->max_c0_size);
|
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size);
|
||||||
|
|
||||||
assert(target_R >= MIN_R);
|
assert(target_R >= MIN_R);
|
||||||
bool signal_c2 = (new_c1_size / ltable->mean_c0_effective_size > target_R);
|
bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > target_R);
|
||||||
DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R);
|
DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R);
|
||||||
if( signal_c2 )
|
if( signal_c2 )
|
||||||
{
|
{
|
||||||
DEBUG("mmt:\tsignaling C2 for merge\n");
|
DEBUG("mmt:\tsignaling C2 for merge\n");
|
||||||
DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
|
DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
|
||||||
ltable->max_c0_size, a->max_size, target_R);
|
ltable_->max_c0_size, a->max_size, target_R);
|
||||||
|
|
||||||
// XXX need to report backpressure here!
|
// XXX need to report backpressure here!
|
||||||
while(ltable->get_tree_c1_mergeable()) {
|
while(ltable_->get_tree_c1_mergeable()) {
|
||||||
rwlc_cond_wait(<able->c1_needed, ltable->header_mut);
|
rwlc_cond_wait(<able_->c1_needed, ltable_->header_mut);
|
||||||
}
|
}
|
||||||
|
|
||||||
xid = Tbegin();
|
xid = Tbegin();
|
||||||
|
@ -262,27 +211,27 @@ void* memMergeThread(void*arg)
|
||||||
// we just set c1 = c1'. Want to move c1 -> c1 mergeable, clean out c1.
|
// we just set c1 = c1'. Want to move c1 -> c1 mergeable, clean out c1.
|
||||||
|
|
||||||
// 7: and perhaps c1_mergeable
|
// 7: and perhaps c1_mergeable
|
||||||
ltable->set_tree_c1_mergeable(ltable->get_tree_c1()); // c1_prime == c1.
|
ltable_->set_tree_c1_mergeable(ltable_->get_tree_c1()); // c1_prime == c1.
|
||||||
stats->handed_off_tree();
|
stats->handed_off_tree();
|
||||||
|
|
||||||
// 8: c1 = new empty.
|
// 8: c1 = new empty.
|
||||||
ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats));
|
ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats));
|
||||||
|
|
||||||
pthread_cond_signal(<able->c1_ready);
|
pthread_cond_signal(<able_->c1_ready);
|
||||||
pageid_t old_bytes_out = stats->bytes_out;
|
pageid_t old_bytes_out = stats->bytes_out;
|
||||||
stats->bytes_out = 0; // XXX HACK
|
stats->bytes_out = 0; // XXX HACK
|
||||||
ltable->update_persistent_header(xid, 1);
|
ltable_->update_persistent_header(xid, 1);
|
||||||
stats->bytes_out = old_bytes_out;
|
stats->bytes_out = old_bytes_out;
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page);
|
// DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable_->get_tree_c1()->get_root_rec().page);
|
||||||
// 13
|
// 13
|
||||||
|
|
||||||
rwlc_unlock(ltable->header_mut);
|
rwlc_unlock(ltable_->header_mut);
|
||||||
|
|
||||||
ltable->merge_mgr->finished_merge(1);
|
ltable_->merge_mgr->finished_merge(1);
|
||||||
// stats->pretty_print(stdout);
|
// stats->pretty_print(stdout);
|
||||||
|
|
||||||
//TODO: get the freeing outside of the lock
|
//TODO: get the freeing outside of the lock
|
||||||
|
@ -293,43 +242,42 @@ void* memMergeThread(void*arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void *diskMergeThread(void*arg)
|
void * merge_scheduler::diskMergeThread()
|
||||||
{
|
{
|
||||||
int xid;
|
int xid;
|
||||||
|
|
||||||
logtable<datatuple> * ltable = (logtable<datatuple>*)arg;
|
assert(ltable_->get_tree_c2());
|
||||||
assert(ltable->get_tree_c2());
|
|
||||||
|
|
||||||
|
|
||||||
int merge_count =0;
|
int merge_count =0;
|
||||||
mergeStats * stats = ltable->merge_mgr->get_merge_stats(2);
|
mergeStats * stats = ltable_->merge_mgr->get_merge_stats(2);
|
||||||
|
|
||||||
while(true)
|
while(true)
|
||||||
{
|
{
|
||||||
|
|
||||||
// 2: wait for input
|
// 2: wait for input
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable_->header_mut);
|
||||||
ltable->merge_mgr->new_merge(2);
|
ltable_->merge_mgr->new_merge(2);
|
||||||
int done = 0;
|
int done = 0;
|
||||||
// get a new input for merge
|
// get a new input for merge
|
||||||
while(!ltable->get_tree_c1_mergeable())
|
while(!ltable_->get_tree_c1_mergeable())
|
||||||
{
|
{
|
||||||
pthread_cond_signal(<able->c1_needed);
|
pthread_cond_signal(<able_->c1_needed);
|
||||||
|
|
||||||
if(!ltable->is_still_running()){
|
if(!ltable_->is_still_running()){
|
||||||
done = 1;
|
done = 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
DEBUG("dmt:\twaiting for block ready cond\n");
|
DEBUG("dmt:\twaiting for block ready cond\n");
|
||||||
|
|
||||||
rwlc_cond_wait(<able->c1_ready, ltable->header_mut);
|
rwlc_cond_wait(<able_->c1_ready, ltable_->header_mut);
|
||||||
|
|
||||||
DEBUG("dmt:\tblock ready\n");
|
DEBUG("dmt:\tblock ready\n");
|
||||||
}
|
}
|
||||||
if(done==1)
|
if(done==1)
|
||||||
{
|
{
|
||||||
rwlc_unlock(ltable->header_mut);
|
rwlc_unlock(ltable_->header_mut);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,23 +288,23 @@ void *diskMergeThread(void*arg)
|
||||||
|
|
||||||
// 4: do the merge.
|
// 4: do the merge.
|
||||||
//create the iterators
|
//create the iterators
|
||||||
diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator();
|
diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator();
|
||||||
#ifdef NO_SNOWSHOVEL
|
#ifdef NO_SNOWSHOVEL
|
||||||
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator();
|
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator();
|
||||||
#else
|
#else
|
||||||
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(<able->merge_mgr->cur_c1_c2_progress_delta, 0.05, <able->shutting_down_);
|
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(<able_->merge_mgr->cur_c1_c2_progress_delta, 0.05, <able_->shutting_down_);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
//create a new tree
|
//create a new tree
|
||||||
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats, (ltable->max_c0_size * *ltable->R() + stats->base_size)/ 1000);
|
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000);
|
||||||
// diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats);
|
// diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats);
|
||||||
|
|
||||||
rwlc_unlock(ltable->header_mut);
|
rwlc_unlock(ltable_->header_mut);
|
||||||
|
|
||||||
//do the merge
|
//do the merge
|
||||||
DEBUG("dmt:\tMerging:\n");
|
DEBUG("dmt:\tMerging:\n");
|
||||||
|
|
||||||
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, c2_prime, itrA, itrB, ltable, c2_prime, stats, true);
|
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, c2_prime, itrA, itrB, ltable_, c2_prime, stats, true);
|
||||||
|
|
||||||
delete itrA;
|
delete itrA;
|
||||||
delete itrB;
|
delete itrB;
|
||||||
|
@ -366,15 +314,15 @@ void *diskMergeThread(void*arg)
|
||||||
|
|
||||||
// (skip 6, 7, 8, 8.5, 9))
|
// (skip 6, 7, 8, 8.5, 9))
|
||||||
|
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable_->header_mut);
|
||||||
//12
|
//12
|
||||||
ltable->get_tree_c2()->dealloc(xid);
|
ltable_->get_tree_c2()->dealloc(xid);
|
||||||
delete ltable->get_tree_c2();
|
delete ltable_->get_tree_c2();
|
||||||
//11.5
|
//11.5
|
||||||
ltable->get_tree_c1_mergeable()->dealloc(xid);
|
ltable_->get_tree_c1_mergeable()->dealloc(xid);
|
||||||
//11
|
//11
|
||||||
delete ltable->get_tree_c1_mergeable();
|
delete ltable_->get_tree_c1_mergeable();
|
||||||
ltable->set_tree_c1_mergeable(0);
|
ltable_->set_tree_c1_mergeable(0);
|
||||||
|
|
||||||
//writes complete
|
//writes complete
|
||||||
//now atomically replace the old c2 with new c2
|
//now atomically replace the old c2 with new c2
|
||||||
|
@ -382,23 +330,23 @@ void *diskMergeThread(void*arg)
|
||||||
|
|
||||||
merge_count++;
|
merge_count++;
|
||||||
//update the current optimal R value
|
//update the current optimal R value
|
||||||
*(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable->mean_c0_effective_size) ) );
|
*(ltable_->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable_->mean_c0_effective_size) ) );
|
||||||
|
|
||||||
DEBUG("\nR = %f\n", *(ltable->R()));
|
DEBUG("\nR = %f\n", *(ltable_->R()));
|
||||||
|
|
||||||
DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i));
|
DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i));
|
||||||
// 10: C2 is never too big
|
// 10: C2 is never too big
|
||||||
ltable->set_tree_c2(c2_prime);
|
ltable_->set_tree_c2(c2_prime);
|
||||||
stats->handed_off_tree();
|
stats->handed_off_tree();
|
||||||
|
|
||||||
DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
|
DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
|
||||||
// 13
|
// 13
|
||||||
ltable->update_persistent_header(xid, 2);
|
ltable_->update_persistent_header(xid, 2);
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
rwlc_unlock(ltable->header_mut);
|
rwlc_unlock(ltable_->header_mut);
|
||||||
// stats->pretty_print(stdout);
|
// stats->pretty_print(stdout);
|
||||||
ltable->merge_mgr->finished_merge(2);
|
ltable_->merge_mgr->finished_merge(2);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -413,14 +361,14 @@ static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, sta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int garbage_collect(logtable<datatuple> * ltable, datatuple ** garbage, int garbage_len, int next_garbage, bool force = false) {
|
static int garbage_collect(logtable<datatuple> * ltable_, datatuple ** garbage, int garbage_len, int next_garbage, bool force = false) {
|
||||||
if(next_garbage == garbage_len || force) {
|
if(next_garbage == garbage_len || force) {
|
||||||
pthread_mutex_lock(<able->rb_mut);
|
pthread_mutex_lock(<able_->rb_mut);
|
||||||
for(int i = 0; i < next_garbage; i++) {
|
for(int i = 0; i < next_garbage; i++) {
|
||||||
datatuple * t2tmp = NULL;
|
datatuple * t2tmp = NULL;
|
||||||
{
|
{
|
||||||
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = ltable->get_tree_c0()->find(garbage[i]);
|
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = ltable_->get_tree_c0()->find(garbage[i]);
|
||||||
if(rbitr != ltable->get_tree_c0()->end()) {
|
if(rbitr != ltable_->get_tree_c0()->end()) {
|
||||||
t2tmp = *rbitr;
|
t2tmp = *rbitr;
|
||||||
if((t2tmp->datalen() == garbage[i]->datalen()) &&
|
if((t2tmp->datalen() == garbage[i]->datalen()) &&
|
||||||
!memcmp(t2tmp->data(), garbage[i]->data(), garbage[i]->datalen())) {
|
!memcmp(t2tmp->data(), garbage[i]->data(), garbage[i]->datalen())) {
|
||||||
|
@ -431,13 +379,13 @@ static int garbage_collect(logtable<datatuple> * ltable, datatuple ** garbage, i
|
||||||
}
|
}
|
||||||
} // close rbitr before touching the tree.
|
} // close rbitr before touching the tree.
|
||||||
if(t2tmp) {
|
if(t2tmp) {
|
||||||
ltable->get_tree_c0()->erase(garbage[i]);
|
ltable_->get_tree_c0()->erase(garbage[i]);
|
||||||
ltable->tree_bytes -= garbage[i]->byte_length();
|
ltable_->tree_bytes -= garbage[i]->byte_length();
|
||||||
datatuple::freetuple(t2tmp);
|
datatuple::freetuple(t2tmp);
|
||||||
}
|
}
|
||||||
datatuple::freetuple(garbage[i]);
|
datatuple::freetuple(garbage[i]);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(<able->rb_mut);
|
pthread_mutex_unlock(<able_->rb_mut);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return next_garbage;
|
return next_garbage;
|
||||||
|
|
35
merger.h
35
merger.h
|
@ -8,33 +8,22 @@
|
||||||
#undef try
|
#undef try
|
||||||
#undef end
|
#undef end
|
||||||
|
|
||||||
//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true...
|
class merge_scheduler {
|
||||||
static const int RB_TREE_OVERHEAD = 400;
|
|
||||||
static const double MIN_R = 3.0;
|
|
||||||
|
|
||||||
struct logtable_mergedata
|
|
||||||
{
|
|
||||||
//merge threads
|
|
||||||
pthread_t diskmerge_thread;
|
|
||||||
pthread_t memmerge_thread;
|
|
||||||
};
|
|
||||||
|
|
||||||
class merge_scheduler
|
|
||||||
{
|
|
||||||
std::vector<std::pair<logtable<datatuple> *, logtable_mergedata*> > mergedata;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
~merge_scheduler();
|
merge_scheduler(logtable<datatuple> * ltable);
|
||||||
|
~merge_scheduler();
|
||||||
|
|
||||||
int addlogtable(logtable<datatuple> * ltable);
|
void start();
|
||||||
void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024);
|
void shutdown();
|
||||||
|
|
||||||
struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;}
|
void * memMergeThread();
|
||||||
|
void * diskMergeThread();
|
||||||
|
|
||||||
void shutdown();
|
private:
|
||||||
|
pthread_t mem_merge_thread_;
|
||||||
|
pthread_t disk_merge_thread_;
|
||||||
|
logtable<datatuple> * ltable_;
|
||||||
|
const double MIN_R;
|
||||||
};
|
};
|
||||||
|
|
||||||
void* memMergeThread(void* arg);
|
|
||||||
void* diskMergeThread(void* arg);
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -21,7 +21,6 @@ int main(int argc, char *argv[])
|
||||||
|
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
merge_scheduler * mscheduler = new merge_scheduler;
|
|
||||||
|
|
||||||
logtable<datatuple> ltable;
|
logtable<datatuple> ltable;
|
||||||
|
|
||||||
|
@ -51,11 +50,9 @@ int main(int argc, char *argv[])
|
||||||
}
|
}
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
ltable.set_max_c0_size(c0_size);
|
||||||
int lindex = mscheduler->addlogtable(<able);
|
merge_scheduler * mscheduler = new merge_scheduler(<able);
|
||||||
ltable.setMergeData(mscheduler->getMergeData(lindex));
|
mscheduler->start();
|
||||||
|
|
||||||
mscheduler->startlogtable(lindex, c0_size);
|
|
||||||
|
|
||||||
simpleServer *lserver = new simpleServer(<able);
|
simpleServer *lserver = new simpleServer(<able);
|
||||||
|
|
||||||
|
|
|
@ -54,7 +54,6 @@ int main(int argc, char *argv[])
|
||||||
|
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
mscheduler = new merge_scheduler;
|
|
||||||
|
|
||||||
logtable<datatuple> ltable;
|
logtable<datatuple> ltable;
|
||||||
|
|
||||||
|
@ -72,8 +71,6 @@ int main(int argc, char *argv[])
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
int lindex = mscheduler->addlogtable(<able);
|
|
||||||
ltable.setMergeData(mscheduler->getMergeData(lindex));
|
|
||||||
|
|
||||||
int64_t c0_size = 1024 * 1024 * 512 * 1;
|
int64_t c0_size = 1024 * 1024 * 512 * 1;
|
||||||
|
|
||||||
|
@ -89,7 +86,9 @@ int main(int argc, char *argv[])
|
||||||
printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server?
|
printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server?
|
||||||
}
|
}
|
||||||
|
|
||||||
mscheduler->startlogtable(lindex, c0_size);
|
ltable.set_max_c0_size(c0_size);
|
||||||
|
mscheduler = new merge_scheduler(<able);
|
||||||
|
mscheduler->start();
|
||||||
|
|
||||||
lserver = new logserver(100, 32432);
|
lserver = new logserver(100, 32432);
|
||||||
|
|
||||||
|
|
|
@ -18,21 +18,21 @@ int main(int argc, char **argv)
|
||||||
|
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
logtable<datatuple> ltable(1000, 10000, 5);
|
logtable<datatuple> *ltable = new logtable<datatuple>(1000, 10000, 5);
|
||||||
|
|
||||||
recordid table_root = ltable.allocTable(xid);
|
recordid table_root = ltable->allocTable(xid);
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
xid = Tbegin();
|
xid = Tbegin();
|
||||||
RegionAllocator * ro_alloc = new RegionAllocator();
|
RegionAllocator * ro_alloc = new RegionAllocator();
|
||||||
|
|
||||||
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ro_alloc, ltable.get_tree_c2()->get_root_rid() );
|
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ro_alloc, ltable->get_tree_c2()->get_root_rid() );
|
||||||
it->close();
|
it->close();
|
||||||
delete it;
|
delete it;
|
||||||
delete ro_alloc;
|
delete ro_alloc;
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
delete ltable;
|
||||||
logtable<datatuple>::deinit_stasis();
|
logtable<datatuple>::deinit_stasis();
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -50,17 +50,15 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
|
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
merge_scheduler mscheduler;
|
logtable<datatuple> * ltable = new logtable<datatuple>(1000, 10000, 5);
|
||||||
logtable<datatuple> ltable(1000, 10000, 5);
|
ltable->set_max_c0_size(10 * 1024 * 1024);
|
||||||
|
merge_scheduler mscheduler(ltable);
|
||||||
|
|
||||||
recordid table_root = ltable.allocTable(xid);
|
recordid table_root = ltable->allocTable(xid);
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
int lindex = mscheduler.addlogtable(<able);
|
mscheduler.start();
|
||||||
ltable.setMergeData(mscheduler.getMergeData(lindex));
|
|
||||||
|
|
||||||
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
|
|
||||||
|
|
||||||
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
|
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
|
||||||
|
|
||||||
|
@ -85,7 +83,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datasize += newtuple->byte_length();
|
datasize += newtuple->byte_length();
|
||||||
|
|
||||||
gettimeofday(&ti_st,0);
|
gettimeofday(&ti_st,0);
|
||||||
ltable.insertTuple(newtuple);
|
ltable->insertTuple(newtuple);
|
||||||
gettimeofday(&ti_end,0);
|
gettimeofday(&ti_end,0);
|
||||||
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
||||||
|
|
||||||
|
@ -122,7 +120,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
//rkey[keylen-1]='\0';
|
//rkey[keylen-1]='\0';
|
||||||
|
|
||||||
//find the key with the given tuple
|
//find the key with the given tuple
|
||||||
datatuple *dt = ltable.findTuple(xid, rkey, keylen);
|
datatuple *dt = ltable->findTuple(xid, rkey, keylen);
|
||||||
|
|
||||||
assert(dt!=0);
|
assert(dt!=0);
|
||||||
//if(dt!=0)
|
//if(dt!=0)
|
||||||
|
@ -150,6 +148,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
|
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
delete ltable;
|
||||||
logtable<datatuple>::deinit_stasis();
|
logtable<datatuple>::deinit_stasis();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,17 +44,15 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
|
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
merge_scheduler mscheduler;
|
logtable<datatuple> *ltable = new logtable<datatuple>(1000, 10000, 100);
|
||||||
logtable<datatuple> ltable(1000, 10000, 100);
|
ltable->set_max_c0_size(10*1024*1024);
|
||||||
|
merge_scheduler mscheduler(ltable);
|
||||||
|
|
||||||
recordid table_root = ltable.allocTable(xid);
|
recordid table_root = ltable->allocTable(xid);
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
int lindex = mscheduler.addlogtable(<able);
|
mscheduler.start();
|
||||||
ltable.setMergeData(mscheduler.getMergeData(lindex));
|
|
||||||
|
|
||||||
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
|
|
||||||
|
|
||||||
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
|
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
|
||||||
|
|
||||||
|
@ -75,7 +73,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datasize += newtuple->byte_length();
|
datasize += newtuple->byte_length();
|
||||||
|
|
||||||
gettimeofday(&ti_st,0);
|
gettimeofday(&ti_st,0);
|
||||||
ltable.insertTuple(newtuple);
|
ltable->insertTuple(newtuple);
|
||||||
gettimeofday(&ti_end,0);
|
gettimeofday(&ti_end,0);
|
||||||
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
||||||
|
|
||||||
|
@ -90,10 +88,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
printf("datasize: %llu\n", (unsigned long long)datasize);
|
printf("datasize: %llu\n", (unsigned long long)datasize);
|
||||||
|
|
||||||
mscheduler.shutdown();
|
mscheduler.shutdown();
|
||||||
|
delete ltable;
|
||||||
printf("merge threads finished.\n");
|
printf("merge threads finished.\n");
|
||||||
gettimeofday(&stop_tv,0);
|
gettimeofday(&stop_tv,0);
|
||||||
printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
|
printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
|
||||||
|
|
||||||
logtable<datatuple>::deinit_stasis();
|
logtable<datatuple>::deinit_stasis();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,16 +102,15 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
|
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
merge_scheduler mscheduler;
|
logtable<datatuple> *ltable = new logtable<datatuple>(1000, 1000, 40);
|
||||||
logtable<datatuple> ltable(1000, 1000, 40);
|
ltable->set_max_c0_size(10 * 1024 * 1024);
|
||||||
|
merge_scheduler mscheduler(ltable);
|
||||||
|
|
||||||
recordid table_root = ltable.allocTable(xid);
|
recordid table_root = ltable->allocTable(xid);
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
int lindex = mscheduler.addlogtable(<able);
|
|
||||||
ltable.setMergeData(mscheduler.getMergeData(lindex));
|
|
||||||
|
|
||||||
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
|
mscheduler.start();
|
||||||
|
|
||||||
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
|
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
|
||||||
|
|
||||||
|
@ -134,7 +133,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datasize += newtuple->byte_length();
|
datasize += newtuple->byte_length();
|
||||||
|
|
||||||
gettimeofday(&ti_st,0);
|
gettimeofday(&ti_st,0);
|
||||||
ltable.insertTuple(newtuple);
|
ltable->insertTuple(newtuple);
|
||||||
gettimeofday(&ti_end,0);
|
gettimeofday(&ti_end,0);
|
||||||
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
||||||
|
|
||||||
|
@ -151,7 +150,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datatuple *deltuple = datatuple::create((*key_arr)[del_index].c_str(), (*key_arr)[del_index].length()+1);
|
datatuple *deltuple = datatuple::create((*key_arr)[del_index].c_str(), (*key_arr)[del_index].length()+1);
|
||||||
|
|
||||||
gettimeofday(&ti_st,0);
|
gettimeofday(&ti_st,0);
|
||||||
ltable.insertTuple(deltuple);
|
ltable->insertTuple(deltuple);
|
||||||
gettimeofday(&ti_end,0);
|
gettimeofday(&ti_end,0);
|
||||||
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
||||||
|
|
||||||
|
@ -172,7 +171,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
datatuple *uptuple = datatuple::create((*key_arr)[up_index].c_str(), (*key_arr)[up_index].length()+1,
|
datatuple *uptuple = datatuple::create((*key_arr)[up_index].c_str(), (*key_arr)[up_index].length()+1,
|
||||||
ditem.c_str(), ditem.length()+1);
|
ditem.c_str(), ditem.length()+1);
|
||||||
gettimeofday(&ti_st,0);
|
gettimeofday(&ti_st,0);
|
||||||
ltable.insertTuple(uptuple);
|
ltable->insertTuple(uptuple);
|
||||||
gettimeofday(&ti_end,0);
|
gettimeofday(&ti_end,0);
|
||||||
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
|
||||||
|
|
||||||
|
@ -207,7 +206,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen);
|
memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen);
|
||||||
|
|
||||||
//find the key with the given tuple
|
//find the key with the given tuple
|
||||||
datatuple *dt = ltable.findTuple(xid, rkey, keylen);
|
datatuple *dt = ltable->findTuple(xid, rkey, keylen);
|
||||||
|
|
||||||
if(std::find(del_list.begin(), del_list.end(), i) == del_list.end())
|
if(std::find(del_list.begin(), del_list.end(), i) == del_list.end())
|
||||||
{
|
{
|
||||||
|
@ -248,6 +247,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
|
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
delete ltable;
|
||||||
logtable<datatuple>::deinit_stasis();
|
logtable<datatuple>::deinit_stasis();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,8 @@
|
||||||
#define NUM_THREADS 128
|
#define NUM_THREADS 128
|
||||||
|
|
||||||
unsigned char vals[NUM_THREADS];
|
unsigned char vals[NUM_THREADS];
|
||||||
logtable<datatuple>* ltbl;
|
|
||||||
|
logtable<datatuple> * ltable;
|
||||||
|
|
||||||
int myucharcmp(const void * ap, const void * bp) {
|
int myucharcmp(const void * ap, const void * bp) {
|
||||||
unsigned char a = *(unsigned char*)ap;
|
unsigned char a = *(unsigned char*)ap;
|
||||||
|
@ -43,7 +44,7 @@ void * worker(void * idp) {
|
||||||
printf("id = %d key = %d\n", (int)id, (int)key);
|
printf("id = %d key = %d\n", (int)id, (int)key);
|
||||||
datatuple * dt = datatuple::create(&key, sizeof(key), &id, sizeof(id));
|
datatuple * dt = datatuple::create(&key, sizeof(key), &id, sizeof(id));
|
||||||
datatuple * dtdelete = datatuple::create(&key, sizeof(key));
|
datatuple * dtdelete = datatuple::create(&key, sizeof(key));
|
||||||
succ = ltbl->testAndSetTuple(dt, dtdelete);
|
succ = ltable->testAndSetTuple(dt, dtdelete);
|
||||||
datatuple::freetuple(dt);
|
datatuple::freetuple(dt);
|
||||||
datatuple::freetuple(dtdelete);
|
datatuple::freetuple(dtdelete);
|
||||||
vals[id] = key;
|
vals[id] = key;
|
||||||
|
@ -60,18 +61,16 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
logtable<datatuple>::init_stasis();
|
logtable<datatuple>::init_stasis();
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
||||||
merge_scheduler mscheduler;
|
ltable = new logtable<datatuple>(1000, 10000, 5);
|
||||||
logtable<datatuple> ltable(1000, 10000, 5);
|
ltable->set_max_c0_size(10*1024*1024);
|
||||||
ltbl = <able;
|
|
||||||
|
|
||||||
recordid table_root = ltable.allocTable(xid);
|
merge_scheduler mscheduler(ltable);
|
||||||
|
|
||||||
|
recordid table_root = ltable->allocTable(xid);
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
int lindex = mscheduler.addlogtable(<able);
|
mscheduler.start();
|
||||||
ltable.setMergeData(mscheduler.getMergeData(lindex));
|
|
||||||
|
|
||||||
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
|
|
||||||
|
|
||||||
pthread_t *threads = (pthread_t*)malloc(NUM_THREADS * sizeof(pthread_t));
|
pthread_t *threads = (pthread_t*)malloc(NUM_THREADS * sizeof(pthread_t));
|
||||||
|
|
||||||
|
@ -92,6 +91,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
||||||
}
|
}
|
||||||
|
|
||||||
mscheduler.shutdown();
|
mscheduler.shutdown();
|
||||||
|
delete ltable;
|
||||||
logtable<datatuple>::deinit_stasis();
|
logtable<datatuple>::deinit_stasis();
|
||||||
|
|
||||||
printf("\npass\n");
|
printf("\npass\n");
|
||||||
|
|
Loading…
Reference in a new issue