added bulk-insert APIs, prevent c1-c2 from outrunning c0-c1 by too much, and fix the overshoot computations in merge manager (roughly corresponds to run 20)

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1074 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-08-30 22:22:25 +00:00
parent f8248240af
commit a1f3f92cca
16 changed files with 466 additions and 89 deletions

View file

@ -894,9 +894,12 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k
} }
} }
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) : diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, double* cur_progress_delta, double target_progress_delta, bool * flushing) :
ro_alloc_(new RegionAllocator()), ro_alloc_(new RegionAllocator()),
tree_(tree ? tree->get_root_rec() : NULLRID) tree_(tree ? tree->get_root_rec() : NULLRID),
cur_progress_delta_(cur_progress_delta),
target_progress_delta_(target_progress_delta),
flushing_(flushing)
{ {
init_iterators(NULL, NULL); init_iterators(NULL, NULL);
init_helper(NULL); init_helper(NULL);
@ -904,7 +907,10 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) :
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) : diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) :
ro_alloc_(new RegionAllocator()), ro_alloc_(new RegionAllocator()),
tree_(tree ? tree->get_root_rec() : NULLRID) tree_(tree ? tree->get_root_rec() : NULLRID),
cur_progress_delta_(NULL),
target_progress_delta_(0.0),
flushing_(NULL)
{ {
init_iterators(key,NULL); init_iterators(key,NULL);
init_helper(key); init_helper(key);
@ -990,6 +996,16 @@ datatuple * diskTreeComponent::iterator::next_callerFrees()
} }
// else readTuple is null. We're done. // else readTuple is null. We're done.
} }
if(readTuple && cur_progress_delta_) {
// *cur_progress_delta is how far ahead we are, as a fraction of the total merge.
while(*cur_progress_delta_ > target_progress_delta_ && ((!flushing_) || (! *flushing_))) { // TODO: how to pick this threshold?
struct timespec ts;
mergeManager::double_to_ts(&ts, 0.1);
nanosleep(&ts, 0);
}
}
return readTuple; return readTuple;
} }

View file

@ -45,8 +45,8 @@ class diskTreeComponent {
void writes_done(); void writes_done();
iterator * open_iterator() { iterator * open_iterator(double* cur_size = NULL, double target_size = 0, bool * flushing = NULL) {
return new iterator(ltree); return new iterator(ltree, cur_size, target_size, flushing);
} }
iterator * open_iterator(datatuple * key) { iterator * open_iterator(datatuple * key) {
if(key != NULL) { if(key != NULL) {
@ -179,7 +179,7 @@ class diskTreeComponent {
{ {
public: public:
explicit iterator(diskTreeComponent::internalNodes *tree); explicit iterator(diskTreeComponent::internalNodes *tree, double* cur_size = NULL, double target_size = 0, bool * flushing = NULL);
explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key); explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key);
@ -188,16 +188,18 @@ class diskTreeComponent {
datatuple * next_callerFrees(); datatuple * next_callerFrees();
private: private:
void init_iterators(datatuple * key1, datatuple * key2); void init_iterators(datatuple * key1, datatuple * key2);
inline void init_helper(datatuple * key1); inline void init_helper(datatuple * key1);
explicit iterator() { abort(); } explicit iterator() { abort(); }
void operator=(iterator & t) { abort(); } void operator=(iterator & t) { abort(); }
int operator-(iterator & t) { abort(); } int operator-(iterator & t) { abort(); }
private:
RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans. RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans.
recordid tree_; //root of the tree recordid tree_; //root of the tree
double * cur_progress_delta_;
double target_progress_delta_;
bool * flushing_;
diskTreeComponent::internalNodes::iterator* lsmIterator_; diskTreeComponent::internalNodes::iterator* lsmIterator_;

View file

@ -87,7 +87,7 @@ void logtable<TUPLE>::init_stasis() {
DataPage<datatuple>::register_stasis_page_impl(); DataPage<datatuple>::register_stasis_page_impl();
// XXX Workaround Stasis' (still broken) default concurrent buffer manager // XXX Workaround Stasis' (still broken) default concurrent buffer manager
stasis_buffer_manager_size = 1024 * 1024; // 4GB = 2^10 pages: stasis_buffer_manager_size = 1024 * 1024; // 4GB = 2^10 pages:
// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
Tinit(); Tinit();
@ -506,6 +506,82 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
} }
template<class TUPLE>
datatuple * logtable<TUPLE>::insertTupleHelper(datatuple *tuple)
{
//find the previous tuple with same key in the memtree if exists
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
datatuple * t = 0;
datatuple * pre_t = 0;
if(rbitr != tree_c0->end())
{
pre_t = *rbitr;
//do the merging
datatuple *new_t = tmerger->merge(pre_t, tuple);
c0_stats->merged_tuples(new_t, tuple, pre_t);
t = new_t;
tree_c0->erase(pre_t); //remove the previous tuple
tree_c0->insert(new_t); //insert the new tuple
//update the tree size (+ new_t size - pre_t size)
tree_bytes += ((int64_t)new_t->byte_length() - (int64_t)pre_t->byte_length());
}
else //no tuple with same key exists in mem-tree
{
t = tuple->create_copy();
//insert tuple into the rbtree
tree_c0->insert(t);
tsize++;
tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD;
}
merge_mgr->wrote_tuple(0, t); // needs to be here; doesn't grab a mutex.
#ifdef NO_SNOWSHOVEL
//flushing logic
if(tree_bytes >= max_c0_size )
{
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
// NOTE: we hold rb_mut across the (blocking on merge) flushTable. Therefore:
// *** Blocking in flushTable is REALLY BAD ***
// Because it blocks readers and writers.
// The merge policy does its best to make sure flushTable does not block.
rwlc_writelock(header_mut);
// the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock.
if(tree_bytes >= max_c0_size) {
flushTable(); // this needs to hold rb_mut if snowshoveling is disabled, but can't hold rb_mut if snowshoveling is enabled.
}
rwlc_unlock(header_mut);
#endif
return pre_t;
}
template<class TUPLE>
void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
for(int i = 0; i < tuple_count; i++) {
merge_mgr->read_tuple_from_small_component(0, tuples[i]);
}
pthread_mutex_lock(&rb_mut);
int num_old_tups = 0;
pageid_t sum_old_tup_lens = 0;
for(int i = 0; i < tuple_count; i++) {
datatuple * old_tup = insertTupleHelper(tuples[i]);
if(old_tup) {
num_old_tups++;
sum_old_tup_lens += old_tup->byte_length();
datatuple::freetuple(old_tup);
}
}
pthread_mutex_unlock(&rb_mut);
merge_mgr->read_tuple_from_large_component(0, num_old_tups, sum_old_tup_lens);
}
template<class TUPLE> template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple) void logtable<TUPLE>::insertTuple(datatuple *tuple)
{ {
@ -514,55 +590,7 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
datatuple * pre_t = 0; // this is a pointer to any data tuples that we'll be deleting below. We need to update the merge_mgr statistics with it, but have to do so outside of the rb_mut region. datatuple * pre_t = 0; // this is a pointer to any data tuples that we'll be deleting below. We need to update the merge_mgr statistics with it, but have to do so outside of the rb_mut region.
pthread_mutex_lock(&rb_mut); pthread_mutex_lock(&rb_mut);
//find the previous tuple with same key in the memtree if exists pre_t = insertTupleHelper(tuple);
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
datatuple * t = 0;
if(rbitr != tree_c0->end())
{
pre_t = *rbitr;
//do the merging
datatuple *new_t = tmerger->merge(pre_t, tuple);
c0_stats->merged_tuples(new_t, tuple, pre_t);
t = new_t;
tree_c0->erase(pre_t); //remove the previous tuple
tree_c0->insert(new_t); //insert the new tuple
//update the tree size (+ new_t size - pre_t size)
tree_bytes += ((int64_t)new_t->byte_length() - (int64_t)pre_t->byte_length());
}
else //no tuple with same key exists in mem-tree
{
t = tuple->create_copy();
//insert tuple into the rbtree
tree_c0->insert(t);
tsize++;
tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD;
}
merge_mgr->wrote_tuple(0, t); // needs to be here; doesn't grab a mutex.
#ifdef NO_SNOWSHOVEL
//flushing logic
if(tree_bytes >= max_c0_size )
{
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
// NOTE: we hold rb_mut across the (blocking on merge) flushTable. Therefore:
// *** Blocking in flushTable is REALLY BAD ***
// Because it blocks readers and writers.
// The merge policy does its best to make sure flushTable does not block.
rwlc_writelock(header_mut);
// the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock.
if(tree_bytes >= max_c0_size) {
flushTable(); // this needs to hold rb_mut if snowshoveling is disabled, but can't hold rb_mut if snowshoveling is enabled.
}
rwlc_unlock(header_mut);
#endif
pthread_mutex_unlock(&rb_mut); pthread_mutex_unlock(&rb_mut);
// XXX is it OK to move this after the NO_SNOWSHOVEL block? // XXX is it OK to move this after the NO_SNOWSHOVEL block?

View file

@ -43,7 +43,11 @@ public:
datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize); datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize);
datatuple * findTuple_first(int xid, datatuple::key_t key, size_t keySize); datatuple * findTuple_first(int xid, datatuple::key_t key, size_t keySize);
private:
datatuple * insertTupleHelper(datatuple *tuple);
public:
void insertManyTuples(struct datatuple **tuples, int tuple_count);
void insertTuple(struct datatuple *tuple); void insertTuple(struct datatuple *tuple);
//other class functions //other class functions

View file

@ -159,6 +159,9 @@ void mergeManager::tick_based_on_merge_progress(mergeStats *s) {
overshoot_fudge *= 2; overshoot_fudge *= 2;
overshoot_fudge2 *= 4; overshoot_fudge2 *= 4;
if(overshoot_fudge > 0.01 * s->target_size) { overshoot_fudge = (int64_t)(0.01 * (double)s->target_size); }
if(overshoot_fudge2 > 0.01 * s->target_size) { overshoot_fudge2 = (int64_t)(0.01 * (double)s->target_size); }
const double max_c0_sleep = 0.1; const double max_c0_sleep = 0.1;
const double min_c0_sleep = 0.01; const double min_c0_sleep = 0.01;
const double max_c1_sleep = 0.5; const double max_c1_sleep = 0.5;
@ -185,10 +188,23 @@ void mergeManager::tick_based_on_merge_progress(mergeStats *s) {
rwlc_readlock(ltable->header_mut); rwlc_readlock(ltable->header_mut);
if(s1->active && s->mergeable_size) { if(s1->active && s->mergeable_size) {
raw_overshoot = (int64_t)(((double)s->target_size) * (s->out_progress - s1->in_progress)); raw_overshoot = (int64_t)(((double)s->target_size) * (s->out_progress - s1->in_progress));
overshoot = raw_overshoot + overshoot_fudge;
overshoot2 = raw_overshoot + overshoot_fudge2;
bps = s1->bps; bps = s1->bps;
} }
rwlc_unlock(ltable->header_mut); rwlc_unlock(ltable->header_mut);
} }
if(s->merge_level == 1) {
if(s1->active && s->mergeable_size) {
cur_c1_c2_progress_delta = s1->in_progress - s->out_progress;
} else if(!s->mergeable_size) {
cur_c1_c2_progress_delta = 1;
} else {
// s1 is not active.
cur_c1_c2_progress_delta = 0;
}
}
//#define PP_THREAD_INFO //#define PP_THREAD_INFO
#ifdef PP_THREAD_INFO #ifdef PP_THREAD_INFO
printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep); printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep);
@ -299,12 +315,12 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple *
tick(s); tick(s);
} }
} }
void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * tup) { void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len) {
if(tup) { if(tuple_count) {
mergeStats * s = get_merge_stats(merge_level); mergeStats * s = get_merge_stats(merge_level);
s->num_tuples_in_large++; s->num_tuples_in_large += tuple_count;
s->bytes_in_large += tup->byte_length(); s->bytes_in_large += byte_len;
update_progress(s, tup->byte_length()); update_progress(s, byte_len);
} }
} }
@ -351,6 +367,7 @@ void * merge_manager_pretty_print_thread(void * arg) {
mergeManager::mergeManager(logtable<datatuple> *ltable): mergeManager::mergeManager(logtable<datatuple> *ltable):
UPDATE_PROGRESS_PERIOD(0.005), UPDATE_PROGRESS_PERIOD(0.005),
cur_c1_c2_progress_delta(0.0),
ltable(ltable), ltable(ltable),
c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)), c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)),
c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )),

View file

@ -49,12 +49,18 @@ public:
void tick_based_on_merge_progress(mergeStats * s); void tick_based_on_merge_progress(mergeStats * s);
mergeStats* get_merge_stats(int mergeLevel); mergeStats* get_merge_stats(int mergeLevel);
void read_tuple_from_small_component(int merge_level, datatuple * tup); void read_tuple_from_small_component(int merge_level, datatuple * tup);
void read_tuple_from_large_component(int merge_level, datatuple * tup); void read_tuple_from_large_component(int merge_level, datatuple * tup) {
if(tup)
read_tuple_from_large_component(merge_level, 1, tup->byte_length());
}
void read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len);
void wrote_tuple(int merge_level, datatuple * tup); void wrote_tuple(int merge_level, datatuple * tup);
void finished_merge(int merge_level); void finished_merge(int merge_level);
void pretty_print(FILE * out); void pretty_print(FILE * out);
void *pretty_print_thread(); void *pretty_print_thread();
double cur_c1_c2_progress_delta;
private: private:
logtable<datatuple>* ltable; logtable<datatuple>* ltable;
double throttle_seconds; double throttle_seconds;

View file

@ -340,7 +340,11 @@ void *diskMergeThread(void*arg)
// 4: do the merge. // 4: do the merge.
//create the iterators //create the iterators
diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator();
#ifdef NO_SNOWSHOVEL
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator();
#else
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(&ltable->merge_mgr->cur_c1_c2_progress_delta, 0.05, 0 /*XXX*/);
#endif
//create a new tree //create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats);

View file

@ -28,25 +28,27 @@ static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1;
static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1; static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1;
static const network_op_t LOGSTORE_RESPONSE_FAIL = 2; static const network_op_t LOGSTORE_RESPONSE_FAIL = 2;
static const network_op_t LOGSTORE_RESPONSE_SENDING_TUPLES = 3; static const network_op_t LOGSTORE_RESPONSE_SENDING_TUPLES = 3;
static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3; static const network_op_t LOGSTORE_RESPONSE_RECEIVING_TUPLES = 3;
static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 4;
//client codes //client codes
static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8;
static const network_op_t OP_INSERT = 8; // Create, Update, Delete static const network_op_t OP_INSERT = 8; // Create, Update, Delete
static const network_op_t OP_FIND = 9; // Read static const network_op_t OP_FIND = 9; // Read
static const network_op_t OP_SCAN = 10; static const network_op_t OP_SCAN = 10;
static const network_op_t OP_DONE = 11; // Please close the connection. static const network_op_t OP_BULK_INSERT = 11;
static const network_op_t OP_FLUSH = 12; static const network_op_t OP_DONE = 12; // Please close the connection.
static const network_op_t OP_SHUTDOWN = 13; static const network_op_t OP_FLUSH = 13;
static const network_op_t OP_STAT_SPACE_USAGE = 14; static const network_op_t OP_SHUTDOWN = 14;
static const network_op_t OP_STAT_PERF_REPORT = 15; static const network_op_t OP_STAT_SPACE_USAGE = 15;
static const network_op_t OP_STAT_HISTOGRAM = 16; // Return N approximately equal size partitions (including split points + cardinalities) N=1 estimates table cardinality. static const network_op_t OP_STAT_PERF_REPORT = 16;
static const network_op_t OP_STAT_HISTOGRAM = 17; // Return N approximately equal size partitions (including split points + cardinalities) N=1 estimates table cardinality.
static const network_op_t OP_DBG_DROP_DATABASE = 17; static const network_op_t OP_DBG_DROP_DATABASE = 18;
static const network_op_t OP_DBG_BLOCKMAP = 18; static const network_op_t OP_DBG_BLOCKMAP = 19;
static const network_op_t OP_DBG_NOOP = 19; static const network_op_t OP_DBG_NOOP = 20;
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 19; static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 20;
//error codes //error codes
static const network_op_t LOGSTORE_FIRST_ERROR = 27; static const network_op_t LOGSTORE_FIRST_ERROR = 27;
@ -66,10 +68,10 @@ typedef enum {
static inline int readfromsocket(FILE * sockf, void *buf, ssize_t count) { static inline int readfromsocket(FILE * sockf, void *buf, ssize_t count) {
ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf); ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf);
if(i != count) { if(i != count) {
if(feof(sockf)) { if(feof_unlocked(sockf)) {
errno = EOF; errno = EOF;
return EOF; return EOF;
} else if(ferror(sockf)) { } else if(ferror_unlocked(sockf)) {
perror("readfromsocket failed"); perror("readfromsocket failed");
errno = -1; errno = -1;
return -1; return -1;
@ -104,10 +106,10 @@ static inline int readfromsocket(int sockd, void *buf, ssize_t count)
static inline int writetosocket(FILE * sockf, const void *buf, ssize_t count) { static inline int writetosocket(FILE * sockf, const void *buf, ssize_t count) {
ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf); ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf);
if(i != count) { if(i != count) {
if(feof(sockf)) { if(feof_unlocked(sockf)) {
errno = EOF; errno = EOF;
return errno; return errno;
} else if(ferror(sockf)) { } else if(ferror_unlocked(sockf)) {
perror("writetosocket failed"); perror("writetosocket failed");
errno = -1; errno = -1;
return -1; return -1;
@ -167,7 +169,7 @@ static inline network_op_t readopfromsocket(FILE * sockf, logstore_opcode_type t
if(!(opisrequest(ret) || opiserror(ret))) { if(!(opisrequest(ret) || opiserror(ret))) {
fprintf(stderr, "Read invalid request code %d\n", (int)ret); fprintf(stderr, "Read invalid request code %d\n", (int)ret);
if(opisresponse(ret)) { if(opisresponse(ret)) {
fprintf(stderr, "(also, the request code is a valid response code)\n"); fprintf(stderr, "(also, the request code is a valid response code)\n");
} }
ret = LOGSTORE_PROTOCOL_ERROR; ret = LOGSTORE_PROTOCOL_ERROR;
} }
@ -176,7 +178,7 @@ static inline network_op_t readopfromsocket(FILE * sockf, logstore_opcode_type t
if(!(opisresponse(ret) || opiserror(ret))) { if(!(opisresponse(ret) || opiserror(ret))) {
fprintf(stderr, "Read invalid response code %d\n", (int)ret); fprintf(stderr, "Read invalid response code %d\n", (int)ret);
if(opisrequest(ret)) { if(opisrequest(ret)) {
fprintf(stderr, "(also, the response code is a valid request code)\n"); fprintf(stderr, "(also, the response code is a valid request code)\n");
} }
ret = LOGSTORE_PROTOCOL_ERROR; ret = LOGSTORE_PROTOCOL_ERROR;
} }
@ -204,7 +206,7 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type
if(!(opisrequest(ret) || opiserror(ret))) { if(!(opisrequest(ret) || opiserror(ret))) {
fprintf(stderr, "Read invalid request code %d\n", (int)ret); fprintf(stderr, "Read invalid request code %d\n", (int)ret);
if(opisresponse(ret)) { if(opisresponse(ret)) {
fprintf(stderr, "(also, the request code is a valid response code)\n"); fprintf(stderr, "(also, the request code is a valid response code)\n");
} }
ret = LOGSTORE_PROTOCOL_ERROR; ret = LOGSTORE_PROTOCOL_ERROR;
} }
@ -213,7 +215,7 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type
if(!(opisresponse(ret) || opiserror(ret))) { if(!(opisresponse(ret) || opiserror(ret))) {
fprintf(stderr, "Read invalid response code %d\n", (int)ret); fprintf(stderr, "Read invalid response code %d\n", (int)ret);
if(opisrequest(ret)) { if(opisrequest(ret)) {
fprintf(stderr, "(also, the response code is a valid request code)\n"); fprintf(stderr, "(also, the response code is a valid request code)\n");
} }
ret = LOGSTORE_PROTOCOL_ERROR; ret = LOGSTORE_PROTOCOL_ERROR;
} }
@ -224,7 +226,8 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type
} }
static inline int writeoptosocket(FILE * sockf, network_op_t op) { static inline int writeoptosocket(FILE * sockf, network_op_t op) {
assert(opiserror(op) || opisrequest(op) || opisresponse(op)); assert(opiserror(op) || opisrequest(op) || opisresponse(op));
return writetosocket(sockf, &op, sizeof(network_op_t)); int ret = writetosocket(sockf, &op, sizeof(network_op_t));
return ret;
} }
static inline int writeoptosocket(int sockd, network_op_t op) { static inline int writeoptosocket(int sockd, network_op_t op) {
assert(opiserror(op) || opisrequest(op) || opisresponse(op)); assert(opiserror(op) || opisrequest(op) || opisresponse(op));

View file

@ -15,6 +15,29 @@ inline int requestDispatch<HANDLE>::op_insert(logtable<datatuple> * ltable, HAND
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
} }
template<class HANDLE> template<class HANDLE>
inline int requestDispatch<HANDLE>::op_bulk_insert(logtable<datatuple> *ltable, HANDLE fd) {
int err = writeoptosocket(fd, LOGSTORE_RESPONSE_RECEIVING_TUPLES);
datatuple ** tups = (datatuple **) malloc(sizeof(tups[0]) * 100);
int tups_size = 100;
int cur_tup_count = 0;
while((tups[cur_tup_count] = readtuplefromsocket(fd, &err))) {
cur_tup_count++;
if(cur_tup_count == tups_size) {
ltable->insertManyTuples(tups, cur_tup_count);
for(int i = 0; i < cur_tup_count; i++) {
datatuple::freetuple(tups[i]);
}
cur_tup_count = 0;
}
}
ltable->insertManyTuples(tups, cur_tup_count);
for(int i = 0; i < cur_tup_count; i++) {
datatuple::freetuple(tups[i]);
}
if(!err) err = writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
return err;
}
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) { inline int requestDispatch<HANDLE>::op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
//find the tuple //find the tuple
datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen());
@ -444,6 +467,9 @@ int requestDispatch<HANDLE>::dispatch_request(network_op_t opcode, datatuple * t
size_t limit = readcountfromsocket(fd, &err); size_t limit = readcountfromsocket(fd, &err);
if(!err) { err = op_scan(ltable, fd, tuple, tuple2, limit); } if(!err) { err = op_scan(ltable, fd, tuple, tuple2, limit); }
} }
else if(opcode == OP_BULK_INSERT) {
err = op_bulk_insert(ltable, fd);
}
else if(opcode == OP_FLUSH) else if(opcode == OP_FLUSH)
{ {
err = op_flush(ltable, fd); err = op_flush(ltable, fd);

View file

@ -16,6 +16,7 @@ private:
static inline int op_insert(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple); static inline int op_insert(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple);
static inline int op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple); static inline int op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple);
static inline int op_scan(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit); static inline int op_scan(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit);
static inline int op_bulk_insert(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_flush(logtable<datatuple> * ltable, HANDLE fd); static inline int op_flush(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_shutdown(logtable<datatuple> * ltable, HANDLE fd); static inline int op_shutdown(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_stat_space_usage(logtable<datatuple> * ltable, HANDLE fd); static inline int op_stat_space_usage(logtable<datatuple> * ltable, HANDLE fd);

View file

@ -42,8 +42,12 @@ void * simpleServer::worker(int self) {
} }
pthread_mutex_unlock(&thread_mut[self]); pthread_mutex_unlock(&thread_mut[self]);
FILE * f = fdopen(thread_fd[self], "a+"); FILE * f = fdopen(thread_fd[self], "a+");
int mybufsize =128*1024;
char * bigbuffer = (char*)malloc(mybufsize);
setbuffer(f, bigbuffer, mybufsize);
while(!requestDispatch<FILE*>::dispatch_request(f, ltable)) { } while(!requestDispatch<FILE*>::dispatch_request(f, ltable)) { }
fclose(f); fclose(f);
free(bigbuffer);
pthread_mutex_lock(&thread_mut[self]); pthread_mutex_lock(&thread_mut[self]);
thread_fd[self] = -1; thread_fd[self] = -1;
} }

View file

@ -128,6 +128,26 @@ logstore_client_op_returns_many(logstore_handle_t *l,
return rcode; return rcode;
} }
network_op_t
logstore_client_send_tuple(logstore_handle_t *l, datatuple *t) {
assert(l->server_fsocket != 0);
network_op_t rcode = LOGSTORE_RESPONSE_SUCCESS;
int err;
if(t) {
err = writetupletosocket(l->server_fsocket, t);
} else {
err = writeendofiteratortosocket(l->server_fsocket);
if(!err) {
rcode = readopfromsocket(l->server_fsocket, LOGSTORE_SERVER_RESPONSE);
}
}
if(err) {
close_conn(l);
rcode = LOGSTORE_CONN_CLOSED_ERROR;
}
return rcode;
}
datatuple * datatuple *
logstore_client_next_tuple(logstore_handle_t *l) { logstore_client_next_tuple(logstore_handle_t *l) {
assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value... assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value...

View file

@ -25,7 +25,7 @@ uint8_t logstore_client_op_returns_many(logstore_handle_t *l,
uint64_t count = (uint64_t)-1); uint64_t count = (uint64_t)-1);
datatuple * logstore_client_next_tuple(logstore_handle_t *l); datatuple * logstore_client_next_tuple(logstore_handle_t *l);
uint8_t logstore_client_send_tuple(logstore_handle_t *l, datatuple *tuple = NULL);
int logstore_client_close(logstore_handle_t* l); int logstore_client_close(logstore_handle_t* l);

View file

@ -8,4 +8,5 @@ IF( HAVE_STASIS )
CREATE_CHECK(check_mergetuple) CREATE_CHECK(check_mergetuple)
CREATE_CHECK(check_rbtree) CREATE_CHECK(check_rbtree)
CREATE_CLIENT_EXECUTABLE(check_tcpclient) # XXX should build this on non-stasis machines CREATE_CLIENT_EXECUTABLE(check_tcpclient) # XXX should build this on non-stasis machines
CREATE_CLIENT_EXECUTABLE(check_tcpbulkinsert) # XXX should build this on non-stasis machines
ENDIF( HAVE_STASIS ) ENDIF( HAVE_STASIS )

View file

@ -0,0 +1,245 @@
/*
* check_tcpbulkinsert.cpp
*
* Created on: Aug 27, 2010
* Author: sears
*/
#include <string>
#include <vector>
#include <iostream>
#include <sstream>
#include "logstore.h"
#include <assert.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <sys/time.h>
#include <time.h>
#include <sys/types.h>
#include "../tcpclient.h"
#include "../network.h"
#include "check_util.h"
#undef begin
#undef end
static char * svrname = "localhost";
static int svrport = 32432;
void insertProbeIter(size_t NUM_ENTRIES)
{
srand(1000);
logstore_handle_t * l = logstore_client_open(svrname, svrport, 100);
//data generation
typedef std::vector<std::string> key_v_t;
const static size_t max_partition_size = 100000;
int KEY_LEN = 100;
std::vector<key_v_t*> *key_v_list = new std::vector<key_v_t*>;
size_t list_size = NUM_ENTRIES / max_partition_size + 1;
for(size_t i =0; i<list_size; i++)
{
key_v_t * key_arr = new key_v_t;
if(NUM_ENTRIES < max_partition_size*(i+1))
preprandstr(NUM_ENTRIES-max_partition_size*i, key_arr, KEY_LEN, true);
else
preprandstr(max_partition_size, key_arr, KEY_LEN, true);
std::sort(key_arr->begin(), key_arr->end(), &mycmp);
key_v_list->push_back(key_arr);
printf("size partition %llu is %llu\n", (unsigned long long)i+1, (unsigned long long)key_arr->size());
}
key_v_t * key_arr = new key_v_t;
std::vector<key_v_t::iterator*> iters;
for(size_t i=0; i<list_size; i++)
{
iters.push_back(new key_v_t::iterator((*key_v_list)[i]->begin()));
}
int lc = 0;
while(true)
{
int list_index = -1;
for(size_t i=0; i<list_size; i++)
{
if(*iters[i] == (*key_v_list)[i]->end())
continue;
if(list_index == -1 || mycmp(**iters[i], **iters[list_index]))
list_index = i;
}
if(list_index == -1)
break;
if(key_arr->size() == 0 || mycmp(key_arr->back(), **iters[list_index]))
key_arr->push_back(**iters[list_index]);
(*iters[list_index])++;
lc++;
if(lc % max_partition_size == 0)
printf("%llu/%llu completed.\n", (unsigned long long)lc, (unsigned long long)NUM_ENTRIES);
}
for(size_t i=0; i<list_size; i++)
{
(*key_v_list)[i]->clear();
delete (*key_v_list)[i];
delete iters[i];
}
key_v_list->clear();
delete key_v_list;
printf("key arr size: %llu\n", (unsigned long long)key_arr->size());
if(key_arr->size() > NUM_ENTRIES)
key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end());
NUM_ENTRIES=key_arr->size();
printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES);
struct timeval start_tv, stop_tv, ti_st, ti_end;
double insert_time = 0;
int delcount = 0, upcount = 0;
int64_t datasize = 0;
std::vector<pageid_t> dsp;
std::vector<int> del_list;
gettimeofday(&start_tv,0);
// open a bulk insert stream
network_op_t ret = logstore_client_op_returns_many(l, OP_BULK_INSERT);
assert(ret == LOGSTORE_RESPONSE_RECEIVING_TUPLES);
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
len_t keylen = (*key_arr)[i].length()+1;
//prepare the data
std::string ditem;
getnextdata(ditem, 8192);
len_t datalen = ditem.length()+1;
datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen,
ditem.c_str(), datalen);
datasize += newtuple->byte_length();
gettimeofday(&ti_st,0);
//send the data
ret = logstore_client_send_tuple(l, newtuple);
assert(ret == LOGSTORE_RESPONSE_SUCCESS);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
datatuple::freetuple(newtuple);
if(i % 10000 == 0 && i > 0)
printf("%llu / %llu inserted.\n", (unsigned long long)i, (unsigned long long)NUM_ENTRIES);
}
ret = logstore_client_send_tuple(l, NULL); // NULL -> end of stream.
assert(ret == LOGSTORE_RESPONSE_SUCCESS);
gettimeofday(&stop_tv,0);
printf("insert time: %6.1f\n", insert_time);
printf("insert time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
printf("#deletions: %d\n#updates: %d\n", delcount, upcount);
printf("Stage 2: Looking up %llu keys:\n", (unsigned long long)NUM_ENTRIES);
int found_tuples=0;
for(int i=NUM_ENTRIES-1; i>=0; i--)
{
int ri = i;
//printf("key index%d\n", i);
//fflush(stdout);
//get the key
len_t keylen = (*key_arr)[ri].length()+1;
datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen);
//find the key with the given tuple
datatuple *dt = logstore_client_op(l, OP_FIND, searchtuple);
assert(dt!=0);
assert(!dt->isDelete());
found_tuples++;
assert(dt->keylen() == (*key_arr)[ri].length()+1);
//free dt
datatuple::freetuple(dt);
dt = 0;
datatuple::freetuple(searchtuple);
}
printf("found %d\n", found_tuples);
printf("Stage 3: Initiating scan\n");
ret = logstore_client_op_returns_many(l, OP_SCAN, NULL, NULL, 0); // start = NULL stop = NULL limit = NONE
assert(ret == LOGSTORE_RESPONSE_SENDING_TUPLES);
datatuple * tup;
size_t i = 0;
while((tup = logstore_client_next_tuple(l))) {
assert(!tup->isDelete());
assert(tup->keylen() == (*key_arr)[i].length()+1);
assert(!memcmp(tup->key(), (*key_arr)[i].c_str(), (*key_arr)[i].length()));
datatuple::freetuple(tup);
i++;
}
assert(i == NUM_ENTRIES);
key_arr->clear();
delete key_arr;
logstore_client_close(l);
gettimeofday(&stop_tv,0);
printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
}
/** @test
*/
int main(int argc, char* argv[])
{
if(argc > 1) {
svrname = argv[1];
}
if(argc > 2) {
svrport = atoi(argv[2]);
}
//insertProbeIter(25000);
insertProbeIter(100000);
//insertProbeIter(5000);
// insertProbeIter(100);
/*
insertProbeIter(5000);
insertProbeIter(2500);
insertProbeIter(1000);
insertProbeIter(500);
insertProbeIter(1000);
insertProbeIter(100);
insertProbeIter(10);
*/
return 0;
}

View file

@ -35,8 +35,8 @@ void removeduplicates(std::vector<std::string> *arr)
} }
void scramble(std::vector<std::string> *arr) { void scramble(std::vector<std::string> *arr) {
for(int i = 0; i < arr->size(); i++) { for(unsigned int i = 0; i < arr->size(); i++) {
int other = rand() % arr->size(); unsigned int other = rand() % arr->size();
if(other != i) { if(other != i) {
std::string s = (*arr)[i]; std::string s = (*arr)[i];
(*arr)[i] = (*arr)[other]; (*arr)[i] = (*arr)[other];