diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index a9d1f8c..391ce3d 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -894,9 +894,12 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k } } -diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) : +diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, double* cur_progress_delta, double target_progress_delta, bool * flushing) : ro_alloc_(new RegionAllocator()), - tree_(tree ? tree->get_root_rec() : NULLRID) + tree_(tree ? tree->get_root_rec() : NULLRID), + cur_progress_delta_(cur_progress_delta), + target_progress_delta_(target_progress_delta), + flushing_(flushing) { init_iterators(NULL, NULL); init_helper(NULL); @@ -904,7 +907,10 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) : diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) : ro_alloc_(new RegionAllocator()), - tree_(tree ? tree->get_root_rec() : NULLRID) + tree_(tree ? tree->get_root_rec() : NULLRID), + cur_progress_delta_(NULL), + target_progress_delta_(0.0), + flushing_(NULL) { init_iterators(key,NULL); init_helper(key); @@ -990,6 +996,16 @@ datatuple * diskTreeComponent::iterator::next_callerFrees() } // else readTuple is null. We're done. } + + if(readTuple && cur_progress_delta_) { + // *cur_progress_delta is how far ahead we are, as a fraction of the total merge. + while(*cur_progress_delta_ > target_progress_delta_ && ((!flushing_) || (! *flushing_))) { // TODO: how to pick this threshold? + struct timespec ts; + mergeManager::double_to_ts(&ts, 0.1); + nanosleep(&ts, 0); + } + } + return readTuple; } diff --git a/diskTreeComponent.h b/diskTreeComponent.h index df237e7..77db51f 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -45,8 +45,8 @@ class diskTreeComponent { void writes_done(); - iterator * open_iterator() { - return new iterator(ltree); + iterator * open_iterator(double* cur_size = NULL, double target_size = 0, bool * flushing = NULL) { + return new iterator(ltree, cur_size, target_size, flushing); } iterator * open_iterator(datatuple * key) { if(key != NULL) { @@ -179,7 +179,7 @@ class diskTreeComponent { { public: - explicit iterator(diskTreeComponent::internalNodes *tree); + explicit iterator(diskTreeComponent::internalNodes *tree, double* cur_size = NULL, double target_size = 0, bool * flushing = NULL); explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key); @@ -188,16 +188,18 @@ class diskTreeComponent { datatuple * next_callerFrees(); private: - void init_iterators(datatuple * key1, datatuple * key2); - inline void init_helper(datatuple * key1); + void init_iterators(datatuple * key1, datatuple * key2); + inline void init_helper(datatuple * key1); explicit iterator() { abort(); } void operator=(iterator & t) { abort(); } int operator-(iterator & t) { abort(); } - private: RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans. recordid tree_; //root of the tree + double * cur_progress_delta_; + double target_progress_delta_; + bool * flushing_; diskTreeComponent::internalNodes::iterator* lsmIterator_; diff --git a/logstore.cpp b/logstore.cpp index 2c29279..8fba7d6 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -87,7 +87,7 @@ void logtable::init_stasis() { DataPage::register_stasis_page_impl(); // XXX Workaround Stasis' (still broken) default concurrent buffer manager stasis_buffer_manager_size = 1024 * 1024; // 4GB = 2^10 pages: - // stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; + stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; Tinit(); @@ -506,6 +506,82 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size } + +template +datatuple * logtable::insertTupleHelper(datatuple *tuple) +{ + //find the previous tuple with same key in the memtree if exists + memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); + datatuple * t = 0; + datatuple * pre_t = 0; + if(rbitr != tree_c0->end()) + { + pre_t = *rbitr; + //do the merging + datatuple *new_t = tmerger->merge(pre_t, tuple); + c0_stats->merged_tuples(new_t, tuple, pre_t); + t = new_t; + tree_c0->erase(pre_t); //remove the previous tuple + + tree_c0->insert(new_t); //insert the new tuple + + //update the tree size (+ new_t size - pre_t size) + tree_bytes += ((int64_t)new_t->byte_length() - (int64_t)pre_t->byte_length()); + + } + else //no tuple with same key exists in mem-tree + { + + t = tuple->create_copy(); + + //insert tuple into the rbtree + tree_c0->insert(t); + + tsize++; + tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD; + + } + merge_mgr->wrote_tuple(0, t); // needs to be here; doesn't grab a mutex. + +#ifdef NO_SNOWSHOVEL + //flushing logic + if(tree_bytes >= max_c0_size ) + { + DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); + + // NOTE: we hold rb_mut across the (blocking on merge) flushTable. Therefore: + // *** Blocking in flushTable is REALLY BAD *** + // Because it blocks readers and writers. + // The merge policy does its best to make sure flushTable does not block. + rwlc_writelock(header_mut); + // the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock. + if(tree_bytes >= max_c0_size) { + flushTable(); // this needs to hold rb_mut if snowshoveling is disabled, but can't hold rb_mut if snowshoveling is enabled. + } + rwlc_unlock(header_mut); +#endif + return pre_t; +} +template +void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { + for(int i = 0; i < tuple_count; i++) { + merge_mgr->read_tuple_from_small_component(0, tuples[i]); + } + pthread_mutex_lock(&rb_mut); + int num_old_tups = 0; + pageid_t sum_old_tup_lens = 0; + for(int i = 0; i < tuple_count; i++) { + datatuple * old_tup = insertTupleHelper(tuples[i]); + if(old_tup) { + num_old_tups++; + sum_old_tup_lens += old_tup->byte_length(); + datatuple::freetuple(old_tup); + } + } + pthread_mutex_unlock(&rb_mut); + merge_mgr->read_tuple_from_large_component(0, num_old_tups, sum_old_tup_lens); +} + template void logtable::insertTuple(datatuple *tuple) { @@ -514,55 +590,7 @@ void logtable::insertTuple(datatuple *tuple) datatuple * pre_t = 0; // this is a pointer to any data tuples that we'll be deleting below. We need to update the merge_mgr statistics with it, but have to do so outside of the rb_mut region. pthread_mutex_lock(&rb_mut); - //find the previous tuple with same key in the memtree if exists - memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); - datatuple * t = 0; - if(rbitr != tree_c0->end()) - { - pre_t = *rbitr; - //do the merging - datatuple *new_t = tmerger->merge(pre_t, tuple); - c0_stats->merged_tuples(new_t, tuple, pre_t); - t = new_t; - tree_c0->erase(pre_t); //remove the previous tuple - - tree_c0->insert(new_t); //insert the new tuple - - //update the tree size (+ new_t size - pre_t size) - tree_bytes += ((int64_t)new_t->byte_length() - (int64_t)pre_t->byte_length()); - - } - else //no tuple with same key exists in mem-tree - { - - t = tuple->create_copy(); - - //insert tuple into the rbtree - tree_c0->insert(t); - - tsize++; - tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD; - - } - merge_mgr->wrote_tuple(0, t); // needs to be here; doesn't grab a mutex. - -#ifdef NO_SNOWSHOVEL - //flushing logic - if(tree_bytes >= max_c0_size ) - { - DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); - - // NOTE: we hold rb_mut across the (blocking on merge) flushTable. Therefore: - // *** Blocking in flushTable is REALLY BAD *** - // Because it blocks readers and writers. - // The merge policy does its best to make sure flushTable does not block. - rwlc_writelock(header_mut); - // the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock. - if(tree_bytes >= max_c0_size) { - flushTable(); // this needs to hold rb_mut if snowshoveling is disabled, but can't hold rb_mut if snowshoveling is enabled. - } - rwlc_unlock(header_mut); -#endif + pre_t = insertTupleHelper(tuple); pthread_mutex_unlock(&rb_mut); // XXX is it OK to move this after the NO_SNOWSHOVEL block? diff --git a/logstore.h b/logstore.h index adbbad3..ef20251 100644 --- a/logstore.h +++ b/logstore.h @@ -43,7 +43,11 @@ public: datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize); datatuple * findTuple_first(int xid, datatuple::key_t key, size_t keySize); - + +private: + datatuple * insertTupleHelper(datatuple *tuple); +public: + void insertManyTuples(struct datatuple **tuples, int tuple_count); void insertTuple(struct datatuple *tuple); //other class functions diff --git a/mergeManager.cpp b/mergeManager.cpp index c1766b8..b20c7f4 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -159,6 +159,9 @@ void mergeManager::tick_based_on_merge_progress(mergeStats *s) { overshoot_fudge *= 2; overshoot_fudge2 *= 4; + if(overshoot_fudge > 0.01 * s->target_size) { overshoot_fudge = (int64_t)(0.01 * (double)s->target_size); } + if(overshoot_fudge2 > 0.01 * s->target_size) { overshoot_fudge2 = (int64_t)(0.01 * (double)s->target_size); } + const double max_c0_sleep = 0.1; const double min_c0_sleep = 0.01; const double max_c1_sleep = 0.5; @@ -185,10 +188,23 @@ void mergeManager::tick_based_on_merge_progress(mergeStats *s) { rwlc_readlock(ltable->header_mut); if(s1->active && s->mergeable_size) { raw_overshoot = (int64_t)(((double)s->target_size) * (s->out_progress - s1->in_progress)); + overshoot = raw_overshoot + overshoot_fudge; + overshoot2 = raw_overshoot + overshoot_fudge2; + bps = s1->bps; } rwlc_unlock(ltable->header_mut); } + if(s->merge_level == 1) { + if(s1->active && s->mergeable_size) { + cur_c1_c2_progress_delta = s1->in_progress - s->out_progress; + } else if(!s->mergeable_size) { + cur_c1_c2_progress_delta = 1; + } else { + // s1 is not active. + cur_c1_c2_progress_delta = 0; + } + } //#define PP_THREAD_INFO #ifdef PP_THREAD_INFO printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep); @@ -299,12 +315,12 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * tick(s); } } -void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * tup) { - if(tup) { +void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len) { + if(tuple_count) { mergeStats * s = get_merge_stats(merge_level); - s->num_tuples_in_large++; - s->bytes_in_large += tup->byte_length(); - update_progress(s, tup->byte_length()); + s->num_tuples_in_large += tuple_count; + s->bytes_in_large += byte_len; + update_progress(s, byte_len); } } @@ -351,6 +367,7 @@ void * merge_manager_pretty_print_thread(void * arg) { mergeManager::mergeManager(logtable *ltable): UPDATE_PROGRESS_PERIOD(0.005), + cur_c1_c2_progress_delta(0.0), ltable(ltable), c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)), c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), diff --git a/mergeManager.h b/mergeManager.h index 35a8557..e63c6f9 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -49,12 +49,18 @@ public: void tick_based_on_merge_progress(mergeStats * s); mergeStats* get_merge_stats(int mergeLevel); void read_tuple_from_small_component(int merge_level, datatuple * tup); - void read_tuple_from_large_component(int merge_level, datatuple * tup); + void read_tuple_from_large_component(int merge_level, datatuple * tup) { + if(tup) + read_tuple_from_large_component(merge_level, 1, tup->byte_length()); + } + void read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len); + void wrote_tuple(int merge_level, datatuple * tup); void finished_merge(int merge_level); void pretty_print(FILE * out); void *pretty_print_thread(); + double cur_c1_c2_progress_delta; private: logtable* ltable; double throttle_seconds; diff --git a/merger.cpp b/merger.cpp index 27184e6..6829a6e 100644 --- a/merger.cpp +++ b/merger.cpp @@ -340,7 +340,11 @@ void *diskMergeThread(void*arg) // 4: do the merge. //create the iterators diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); +#ifdef NO_SNOWSHOVEL diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); +#else + diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(<able->merge_mgr->cur_c1_c2_progress_delta, 0.05, 0 /*XXX*/); +#endif //create a new tree diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); diff --git a/network.h b/network.h index 3064bba..e666642 100644 --- a/network.h +++ b/network.h @@ -28,25 +28,27 @@ static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1; static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1; static const network_op_t LOGSTORE_RESPONSE_FAIL = 2; static const network_op_t LOGSTORE_RESPONSE_SENDING_TUPLES = 3; -static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3; +static const network_op_t LOGSTORE_RESPONSE_RECEIVING_TUPLES = 3; +static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 4; //client codes static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; static const network_op_t OP_INSERT = 8; // Create, Update, Delete static const network_op_t OP_FIND = 9; // Read static const network_op_t OP_SCAN = 10; -static const network_op_t OP_DONE = 11; // Please close the connection. -static const network_op_t OP_FLUSH = 12; -static const network_op_t OP_SHUTDOWN = 13; -static const network_op_t OP_STAT_SPACE_USAGE = 14; -static const network_op_t OP_STAT_PERF_REPORT = 15; -static const network_op_t OP_STAT_HISTOGRAM = 16; // Return N approximately equal size partitions (including split points + cardinalities) N=1 estimates table cardinality. +static const network_op_t OP_BULK_INSERT = 11; +static const network_op_t OP_DONE = 12; // Please close the connection. +static const network_op_t OP_FLUSH = 13; +static const network_op_t OP_SHUTDOWN = 14; +static const network_op_t OP_STAT_SPACE_USAGE = 15; +static const network_op_t OP_STAT_PERF_REPORT = 16; +static const network_op_t OP_STAT_HISTOGRAM = 17; // Return N approximately equal size partitions (including split points + cardinalities) N=1 estimates table cardinality. -static const network_op_t OP_DBG_DROP_DATABASE = 17; -static const network_op_t OP_DBG_BLOCKMAP = 18; -static const network_op_t OP_DBG_NOOP = 19; -static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 19; +static const network_op_t OP_DBG_DROP_DATABASE = 18; +static const network_op_t OP_DBG_BLOCKMAP = 19; +static const network_op_t OP_DBG_NOOP = 20; +static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 20; //error codes static const network_op_t LOGSTORE_FIRST_ERROR = 27; @@ -66,10 +68,10 @@ typedef enum { static inline int readfromsocket(FILE * sockf, void *buf, ssize_t count) { ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf); if(i != count) { - if(feof(sockf)) { + if(feof_unlocked(sockf)) { errno = EOF; return EOF; - } else if(ferror(sockf)) { + } else if(ferror_unlocked(sockf)) { perror("readfromsocket failed"); errno = -1; return -1; @@ -104,10 +106,10 @@ static inline int readfromsocket(int sockd, void *buf, ssize_t count) static inline int writetosocket(FILE * sockf, const void *buf, ssize_t count) { ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf); if(i != count) { - if(feof(sockf)) { + if(feof_unlocked(sockf)) { errno = EOF; return errno; - } else if(ferror(sockf)) { + } else if(ferror_unlocked(sockf)) { perror("writetosocket failed"); errno = -1; return -1; @@ -167,7 +169,7 @@ static inline network_op_t readopfromsocket(FILE * sockf, logstore_opcode_type t if(!(opisrequest(ret) || opiserror(ret))) { fprintf(stderr, "Read invalid request code %d\n", (int)ret); if(opisresponse(ret)) { - fprintf(stderr, "(also, the request code is a valid response code)\n"); + fprintf(stderr, "(also, the request code is a valid response code)\n"); } ret = LOGSTORE_PROTOCOL_ERROR; } @@ -176,7 +178,7 @@ static inline network_op_t readopfromsocket(FILE * sockf, logstore_opcode_type t if(!(opisresponse(ret) || opiserror(ret))) { fprintf(stderr, "Read invalid response code %d\n", (int)ret); if(opisrequest(ret)) { - fprintf(stderr, "(also, the response code is a valid request code)\n"); + fprintf(stderr, "(also, the response code is a valid request code)\n"); } ret = LOGSTORE_PROTOCOL_ERROR; } @@ -204,7 +206,7 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type if(!(opisrequest(ret) || opiserror(ret))) { fprintf(stderr, "Read invalid request code %d\n", (int)ret); if(opisresponse(ret)) { - fprintf(stderr, "(also, the request code is a valid response code)\n"); + fprintf(stderr, "(also, the request code is a valid response code)\n"); } ret = LOGSTORE_PROTOCOL_ERROR; } @@ -213,7 +215,7 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type if(!(opisresponse(ret) || opiserror(ret))) { fprintf(stderr, "Read invalid response code %d\n", (int)ret); if(opisrequest(ret)) { - fprintf(stderr, "(also, the response code is a valid request code)\n"); + fprintf(stderr, "(also, the response code is a valid request code)\n"); } ret = LOGSTORE_PROTOCOL_ERROR; } @@ -224,7 +226,8 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type } static inline int writeoptosocket(FILE * sockf, network_op_t op) { assert(opiserror(op) || opisrequest(op) || opisresponse(op)); - return writetosocket(sockf, &op, sizeof(network_op_t)); + int ret = writetosocket(sockf, &op, sizeof(network_op_t)); + return ret; } static inline int writeoptosocket(int sockd, network_op_t op) { assert(opiserror(op) || opisrequest(op) || opisresponse(op)); diff --git a/requestDispatch.cpp b/requestDispatch.cpp index 78c9901..5885a48 100644 --- a/requestDispatch.cpp +++ b/requestDispatch.cpp @@ -15,6 +15,29 @@ inline int requestDispatch::op_insert(logtable * ltable, HAND return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template +inline int requestDispatch::op_bulk_insert(logtable *ltable, HANDLE fd) { + int err = writeoptosocket(fd, LOGSTORE_RESPONSE_RECEIVING_TUPLES); + datatuple ** tups = (datatuple **) malloc(sizeof(tups[0]) * 100); + int tups_size = 100; + int cur_tup_count = 0; + while((tups[cur_tup_count] = readtuplefromsocket(fd, &err))) { + cur_tup_count++; + if(cur_tup_count == tups_size) { + ltable->insertManyTuples(tups, cur_tup_count); + for(int i = 0; i < cur_tup_count; i++) { + datatuple::freetuple(tups[i]); + } + cur_tup_count = 0; + } + } + ltable->insertManyTuples(tups, cur_tup_count); + for(int i = 0; i < cur_tup_count; i++) { + datatuple::freetuple(tups[i]); + } + if(!err) err = writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); + return err; +} +template inline int requestDispatch::op_find(logtable * ltable, HANDLE fd, datatuple * tuple) { //find the tuple datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); @@ -444,6 +467,9 @@ int requestDispatch::dispatch_request(network_op_t opcode, datatuple * t size_t limit = readcountfromsocket(fd, &err); if(!err) { err = op_scan(ltable, fd, tuple, tuple2, limit); } } + else if(opcode == OP_BULK_INSERT) { + err = op_bulk_insert(ltable, fd); + } else if(opcode == OP_FLUSH) { err = op_flush(ltable, fd); diff --git a/requestDispatch.h b/requestDispatch.h index c30c9db..ffded6f 100644 --- a/requestDispatch.h +++ b/requestDispatch.h @@ -16,6 +16,7 @@ private: static inline int op_insert(logtable * ltable, HANDLE fd, datatuple * tuple); static inline int op_find(logtable * ltable, HANDLE fd, datatuple * tuple); static inline int op_scan(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit); + static inline int op_bulk_insert(logtable * ltable, HANDLE fd); static inline int op_flush(logtable * ltable, HANDLE fd); static inline int op_shutdown(logtable * ltable, HANDLE fd); static inline int op_stat_space_usage(logtable * ltable, HANDLE fd); diff --git a/simpleServer.cpp b/simpleServer.cpp index efe713f..266a69f 100644 --- a/simpleServer.cpp +++ b/simpleServer.cpp @@ -42,8 +42,12 @@ void * simpleServer::worker(int self) { } pthread_mutex_unlock(&thread_mut[self]); FILE * f = fdopen(thread_fd[self], "a+"); + int mybufsize =128*1024; + char * bigbuffer = (char*)malloc(mybufsize); + setbuffer(f, bigbuffer, mybufsize); while(!requestDispatch::dispatch_request(f, ltable)) { } fclose(f); + free(bigbuffer); pthread_mutex_lock(&thread_mut[self]); thread_fd[self] = -1; } diff --git a/tcpclient.cpp b/tcpclient.cpp index 77c6eca..f2a2e1e 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -128,6 +128,26 @@ logstore_client_op_returns_many(logstore_handle_t *l, return rcode; } +network_op_t +logstore_client_send_tuple(logstore_handle_t *l, datatuple *t) { + assert(l->server_fsocket != 0); + network_op_t rcode = LOGSTORE_RESPONSE_SUCCESS; + int err; + if(t) { + err = writetupletosocket(l->server_fsocket, t); + } else { + err = writeendofiteratortosocket(l->server_fsocket); + if(!err) { + rcode = readopfromsocket(l->server_fsocket, LOGSTORE_SERVER_RESPONSE); + } + } + if(err) { + close_conn(l); + rcode = LOGSTORE_CONN_CLOSED_ERROR; + } + return rcode; +} + datatuple * logstore_client_next_tuple(logstore_handle_t *l) { assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value... diff --git a/tcpclient.h b/tcpclient.h index 40e8eaa..518bc25 100644 --- a/tcpclient.h +++ b/tcpclient.h @@ -25,7 +25,7 @@ uint8_t logstore_client_op_returns_many(logstore_handle_t *l, uint64_t count = (uint64_t)-1); datatuple * logstore_client_next_tuple(logstore_handle_t *l); - +uint8_t logstore_client_send_tuple(logstore_handle_t *l, datatuple *tuple = NULL); int logstore_client_close(logstore_handle_t* l); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3ad9c2b..48c4f67 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -8,4 +8,5 @@ IF( HAVE_STASIS ) CREATE_CHECK(check_mergetuple) CREATE_CHECK(check_rbtree) CREATE_CLIENT_EXECUTABLE(check_tcpclient) # XXX should build this on non-stasis machines + CREATE_CLIENT_EXECUTABLE(check_tcpbulkinsert) # XXX should build this on non-stasis machines ENDIF( HAVE_STASIS ) diff --git a/test/check_tcpbulkinsert.cpp b/test/check_tcpbulkinsert.cpp new file mode 100644 index 0000000..da3555b --- /dev/null +++ b/test/check_tcpbulkinsert.cpp @@ -0,0 +1,245 @@ +/* + * check_tcpbulkinsert.cpp + * + * Created on: Aug 27, 2010 + * Author: sears + */ + +#include +#include +#include +#include +#include "logstore.h" +#include +#include +#include +#include +#include +#include +#include + +#include "../tcpclient.h" +#include "../network.h" + +#include "check_util.h" + +#undef begin +#undef end + + +static char * svrname = "localhost"; +static int svrport = 32432; + +void insertProbeIter(size_t NUM_ENTRIES) +{ + srand(1000); + + logstore_handle_t * l = logstore_client_open(svrname, svrport, 100); + + //data generation + typedef std::vector key_v_t; + const static size_t max_partition_size = 100000; + int KEY_LEN = 100; + std::vector *key_v_list = new std::vector; + size_t list_size = NUM_ENTRIES / max_partition_size + 1; + for(size_t i =0; ibegin(), key_arr->end(), &mycmp); + key_v_list->push_back(key_arr); + printf("size partition %llu is %llu\n", (unsigned long long)i+1, (unsigned long long)key_arr->size()); + } + + + + key_v_t * key_arr = new key_v_t; + + std::vector iters; + for(size_t i=0; ibegin())); + } + + int lc = 0; + while(true) + { + int list_index = -1; + for(size_t i=0; iend()) + continue; + + if(list_index == -1 || mycmp(**iters[i], **iters[list_index])) + list_index = i; + } + + if(list_index == -1) + break; + + if(key_arr->size() == 0 || mycmp(key_arr->back(), **iters[list_index])) + key_arr->push_back(**iters[list_index]); + + (*iters[list_index])++; + lc++; + if(lc % max_partition_size == 0) + printf("%llu/%llu completed.\n", (unsigned long long)lc, (unsigned long long)NUM_ENTRIES); + } + + for(size_t i=0; iclear(); + delete (*key_v_list)[i]; + delete iters[i]; + } + key_v_list->clear(); + delete key_v_list; + printf("key arr size: %llu\n", (unsigned long long)key_arr->size()); + + if(key_arr->size() > NUM_ENTRIES) + key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end()); + + NUM_ENTRIES=key_arr->size(); + + printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES); + + struct timeval start_tv, stop_tv, ti_st, ti_end; + double insert_time = 0; + int delcount = 0, upcount = 0; + int64_t datasize = 0; + std::vector dsp; + std::vector del_list; + gettimeofday(&start_tv,0); + + // open a bulk insert stream + network_op_t ret = logstore_client_op_returns_many(l, OP_BULK_INSERT); + assert(ret == LOGSTORE_RESPONSE_RECEIVING_TUPLES); + + for(size_t i = 0; i < NUM_ENTRIES; i++) + { + //prepare the key + len_t keylen = (*key_arr)[i].length()+1; + + //prepare the data + std::string ditem; + getnextdata(ditem, 8192); + len_t datalen = ditem.length()+1; + + datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen, + ditem.c_str(), datalen); + + datasize += newtuple->byte_length(); + + gettimeofday(&ti_st,0); + + //send the data + ret = logstore_client_send_tuple(l, newtuple); + assert(ret == LOGSTORE_RESPONSE_SUCCESS); + + gettimeofday(&ti_end,0); + insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); + + datatuple::freetuple(newtuple); + + if(i % 10000 == 0 && i > 0) + printf("%llu / %llu inserted.\n", (unsigned long long)i, (unsigned long long)NUM_ENTRIES); + + } + ret = logstore_client_send_tuple(l, NULL); // NULL -> end of stream. + assert(ret == LOGSTORE_RESPONSE_SUCCESS); + gettimeofday(&stop_tv,0); + printf("insert time: %6.1f\n", insert_time); + printf("insert time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); + printf("#deletions: %d\n#updates: %d\n", delcount, upcount); + + + + printf("Stage 2: Looking up %llu keys:\n", (unsigned long long)NUM_ENTRIES); + + + int found_tuples=0; + for(int i=NUM_ENTRIES-1; i>=0; i--) + { + int ri = i; + //printf("key index%d\n", i); + //fflush(stdout); + + //get the key + len_t keylen = (*key_arr)[ri].length()+1; + + datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen); + + //find the key with the given tuple + datatuple *dt = logstore_client_op(l, OP_FIND, searchtuple); + + assert(dt!=0); + assert(!dt->isDelete()); + found_tuples++; + assert(dt->keylen() == (*key_arr)[ri].length()+1); + + //free dt + datatuple::freetuple(dt); + dt = 0; + + datatuple::freetuple(searchtuple); + } + printf("found %d\n", found_tuples); + + printf("Stage 3: Initiating scan\n"); + + ret = logstore_client_op_returns_many(l, OP_SCAN, NULL, NULL, 0); // start = NULL stop = NULL limit = NONE + assert(ret == LOGSTORE_RESPONSE_SENDING_TUPLES); + datatuple * tup; + size_t i = 0; + while((tup = logstore_client_next_tuple(l))) { + assert(!tup->isDelete()); + assert(tup->keylen() == (*key_arr)[i].length()+1); + assert(!memcmp(tup->key(), (*key_arr)[i].c_str(), (*key_arr)[i].length())); + datatuple::freetuple(tup); + i++; + } + assert(i == NUM_ENTRIES); + + key_arr->clear(); + delete key_arr; + + logstore_client_close(l); + + gettimeofday(&stop_tv,0); + printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); +} + + + +/** @test + */ +int main(int argc, char* argv[]) +{ + if(argc > 1) { + svrname = argv[1]; + } + if(argc > 2) { + svrport = atoi(argv[2]); + } + //insertProbeIter(25000); + insertProbeIter(100000); + //insertProbeIter(5000); +// insertProbeIter(100); + + /* + insertProbeIter(5000); + insertProbeIter(2500); + insertProbeIter(1000); + insertProbeIter(500); + insertProbeIter(1000); + insertProbeIter(100); + insertProbeIter(10); + */ + + return 0; +} + diff --git a/test/check_util.h b/test/check_util.h index a559a24..41c0a0a 100644 --- a/test/check_util.h +++ b/test/check_util.h @@ -35,8 +35,8 @@ void removeduplicates(std::vector *arr) } void scramble(std::vector *arr) { - for(int i = 0; i < arr->size(); i++) { - int other = rand() % arr->size(); + for(unsigned int i = 0; i < arr->size(); i++) { + unsigned int other = rand() % arr->size(); if(other != i) { std::string s = (*arr)[i]; (*arr)[i] = (*arr)[other];