From 940a6da6fe826a61326ec4b4885230685541d6f0 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 10 Feb 2010 21:49:50 +0000 Subject: [PATCH] Reworked memory allocation and network protocol. This gargantuan commit also reduces the default size of C0, and removes quite a bit of redundant logic and error handling. The tests now pass, except that check_merge never terminates (it takes too long) and check_mergelarge still is not passing. For better luck running this version of the code, turn off stasis' concurrent buffer manager. We're doing something bad that leads to deadlocks with the concurrent buffer manager. Another (the same?) bug less-frequently leads to page corruption with the old stasis buffer manager. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@556 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- datapage.cpp | 58 ++++----- datapage.h | 6 +- datatuple.h | 242 +++++++++++++++++--------------------- logiterators.h | 5 +- logserver.cpp | 126 ++++++++------------ logserver.h | 21 +--- logstore.cpp | 147 ++++++++++------------- logstore.h | 17 +-- merger.cpp | 60 ++++------ merger.h | 6 +- network.h | 143 ++++++++++++++++++++-- server.cpp | 35 ++---- tcpclient.cpp | 64 +++------- tcpclient.h | 2 +- test/check_datapage.cpp | 57 ++------- test/check_logtable.cpp | 62 ++-------- test/check_merge.cpp | 36 ++---- test/check_mergelarge.cpp | 86 +------------- test/check_mergetuple.cpp | 93 +++------------ test/check_rbtree.cpp | 68 ++--------- test/check_tcpclient.cpp | 79 ++++--------- tuplemerger.cpp | 51 ++------ 22 files changed, 549 insertions(+), 915 deletions(-) diff --git a/datapage.cpp b/datapage.cpp index 0434dba..911fc78 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -86,13 +86,13 @@ void DataPage::initialize(int xid) } template -bool DataPage::append(int xid, TUPLE const & dat) +bool DataPage::append(int xid, TUPLE const * dat) { assert(byte_offset >= HEADER_SIZE); assert(fix_pcount >= 1); //check if there is enough space (for the data length + data) - int32_t blen = dat.byte_length() + sizeof(int32_t); + int32_t blen = dat->byte_length() + sizeof(int32_t); if(PAGE_SIZE * fix_pcount - byte_offset < blen) { //check if the record is too large @@ -118,7 +118,7 @@ bool DataPage::append(int xid, TUPLE const & dat) byte_offset += sizeof(int32_t); //write the data - byte * barr = dat.to_bytes(); + byte * barr = dat->to_bytes(); if(!writebytes(xid, dsize, barr)) //if write fails, undo the previous write { byte_offset -= sizeof(int32_t); @@ -127,7 +127,8 @@ bool DataPage::append(int xid, TUPLE const & dat) if(PAGE_SIZE - (byte_offset % PAGE_SIZE) >= sizeof(int32_t)) { dsize = 0; - writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + int succ = writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + assert(succ); } return false; } @@ -138,7 +139,9 @@ bool DataPage::append(int xid, TUPLE const & dat) if(PAGE_SIZE - (byte_offset % PAGE_SIZE) >= sizeof(int32_t)) { dsize = 0; - writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + int succ = writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + assert(succ); + } return true; @@ -205,12 +208,11 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS int match = -1; while((*buf=itr.getnext(xid)) != 0) { - match = TUPLE::compare((*buf)->get_key(), key); + match = TUPLE::compare((*buf)->key(), key); if(match<0) //keep searching { - free((*buf)->keylen); - free(*buf); + datatuple::freetuple(*buf); *buf=0; } else if(match==0) //found @@ -219,8 +221,7 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS } else // match > 0, then does not exist { - free((*buf)->keylen); - free(*buf); + datatuple::freetuple(*buf); *buf = 0; break; } @@ -230,12 +231,9 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS } template -void DataPage::readbytes(int xid, int32_t offset, int count, byte **data) +void DataPage::readbytes(int xid, int32_t offset, int count, byte *data) { - if(*data==NULL) - *data = (byte*)malloc(count); - int32_t bytes_copied = 0; while(bytes_copied < count) { @@ -251,7 +249,7 @@ void DataPage::readbytes(int xid, int32_t offset, int count, byte **data) int32_t copy_len = ( (PAGE_SIZE - page_offset < count - bytes_copied ) ? PAGE_SIZE - page_offset : count - bytes_copied); byte * pb_ptr = stasis_page_byte_ptr_from_start(p, page_offset); - memcpy((*data)+bytes_copied, pb_ptr, copy_len); + memcpy((data)+bytes_copied, pb_ptr, copy_len); //release the page unlock(p->rwlatch); @@ -425,10 +423,11 @@ TUPLE* DataPage::RecordIterator::getnext(int xid) readlock(p->rwlatch,0); int32_t *dsize_ptr; + int32_t scratch; if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) //int spread in two pages { - dsize_ptr = 0; - dp->readbytes(xid, offset, sizeof(int32_t), (byte**)(&dsize_ptr)); + dsize_ptr = &scratch; + dp->readbytes(xid, offset, sizeof(int32_t), (byte*)dsize_ptr); } else //int in a single page dsize_ptr = (int32_t*)stasis_page_byte_ptr_from_start(p, offset % PAGE_SIZE); @@ -442,10 +441,12 @@ TUPLE* DataPage::RecordIterator::getnext(int xid) return 0; } - byte* tb=0; - dp->readbytes(xid, offset, *dsize_ptr, &tb); + byte* tb = (byte*)malloc(*dsize_ptr); + dp->readbytes(xid, offset, *dsize_ptr, tb); - TUPLE *tup = TUPLE::from_bytes(tb); + TUPLE *tup = TUPLE::from_bytes(tb); // This version of from_bytes does not consume its argument. + + free(tb); offset += *dsize_ptr; @@ -484,15 +485,18 @@ void DataPage::RecordIterator::advance(int xid, int count) readlock(p->rwlatch,0); } - if(pindex == dp->pcount - 1 && (PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t))) - return; - + if(pindex == dp->pcount - 1 && (PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t))) { + assert(!p); // XXX Otherwise, was leaking page. do we reach this branch in testing? + return; + } int32_t *dsize_ptr=0; - if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) //int spread in two pages - dp->readbytes(xid, offset, sizeof(int32_t), (byte**)(&dsize_ptr)); - else //int in a single page + int32_t scratch; + if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) { //int spread in two pages + dsize_ptr = &scratch; + dp->readbytes(xid, offset, sizeof(int32_t), (byte*)dsize_ptr); + } else { //int in a single page dsize_ptr = (int32_t*)stasis_page_byte_ptr_from_start(p, offset % PAGE_SIZE); - + } offset += sizeof(int32_t); if(*dsize_ptr == 0) //no more keys diff --git a/datapage.h b/datapage.h index 9c2a109..8eec35a 100644 --- a/datapage.h +++ b/datapage.h @@ -59,7 +59,7 @@ public: ~DataPage(); - bool append(int xid, TUPLE const & dat); + bool append(int xid, TUPLE const * dat); bool recordRead(int xid, typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); inline uint16_t recordCount(int xid); @@ -89,10 +89,10 @@ private: void incrementPageCount(int xid, pageid_t pid, int add=1); bool writebytes(int xid, int count, byte *data); - inline void readbytes(int xid, int32_t offset, int count, byte **data=0); + inline void readbytes(int xid, int32_t offset, int count, byte *data); private: - int pcount; + int32_t pcount; pageid_t *pidarr; int32_t byte_offset;//points to the next free byte diff --git a/datatuple.h b/datatuple.h index d25ed1a..41899ff 100644 --- a/datatuple.h +++ b/datatuple.h @@ -1,167 +1,147 @@ #ifndef _DATATUPLE_H_ #define _DATATUPLE_H_ - -typedef unsigned char uchar; - #include - -//#define byte unsigned char typedef unsigned char byte; #include - -//#include -//#include -//#include +#include typedef struct datatuple { - typedef uchar* key_t; - typedef uchar* data_t; - static const size_t isize = sizeof(uint32_t); - uint32_t *keylen; //key length should be size of string + 1 for \n - uint32_t *datalen; - key_t key; - data_t data; +public: + typedef uint32_t len_t ; + typedef unsigned char* key_t ; + typedef unsigned char* data_t ; +private: + static const len_t DELETE = ((len_t)0) - 1; + len_t datalen_; + byte* key_; + byte* data_; // aliases key. data_ - 1 should be the \0 terminating key_. + + datatuple* sanity_check() { + assert(keylen() < 3000); + return this; + } +public: + + inline len_t keylen() const { + return data_ - key_; + } + inline len_t datalen() const { + return (datalen_ == DELETE) ? 0 : datalen_; + } + + //returns the length of the byte array representation + len_t byte_length() const { + return sizeof(len_t) + sizeof(len_t) + keylen() + datalen(); + } + static len_t length_from_header(len_t keylen, len_t datalen) { + return keylen + ((datalen == DELETE) ? 0 : datalen); + } + + inline key_t key() const { + return key_; + } + inline data_t data() const { + return data_; + } //this is used by the stl set - bool operator() (const datatuple& lhs, const datatuple& rhs) const - { - //std::basic_string s1(lhs.key); - //std::basic_string s2(rhs.key); - return strcmp((char*)lhs.key,(char*)rhs.key) < 0; - //return (*((int32_t*)lhs.key)) <= (*((int32_t*)rhs.key)); - } - - void clone(const datatuple& tuple) { - //create a copy - - byte * arr = (byte*) malloc(tuple.byte_length()); - - keylen = (uint32_t*) arr; - *keylen = *tuple.keylen; - datalen = (uint32_t*) (arr+isize); - *datalen = *tuple.datalen; - key = (datatuple::key_t) (arr+isize+isize); - memcpy((byte*)key, (byte*)tuple.key, *keylen); - if(!tuple.isDelete()) - { - data = (datatuple::data_t) (arr+isize+isize+ *keylen); - memcpy((byte*)data, (byte*)tuple.data, *datalen); - } - else - data = 0; - } + bool operator() (const datatuple* lhs, const datatuple* rhs) const { + return compare(lhs->key(), rhs->key()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0; + } /** * return -1 if k1 < k2 * 0 if k1 == k2 * 1 of k1 > k2 **/ - static int compare(const key_t k1,const key_t k2) - { - //for char* ending with \0 - return strcmp((char*)k1,(char*)k2); + static int compare(const byte* k1,const byte* k2) { + // XXX string comparison is probably not the right approach. + //for char* ending with \0 + return strcmp((char*)k1,(char*)k2); + } - //for int32_t - //printf("%d\t%d\n",(*((int32_t*)k1)) ,(*((int32_t*)k2))); - //return (*((int32_t*)k1)) <= (*((int32_t*)k2)); - } + inline void setDelete() { + datalen_ = DELETE; + } - void setDelete() - { - *datalen = UINT_MAX; - } + inline bool isDelete() const { + return datalen_ == DELETE; + } - inline bool isDelete() const - { - return *datalen == UINT_MAX; - } + static std::string key_to_str(const byte* k) { + //for strings + return std::string((char*)k); + //for int + /* + std::ostringstream ostr; + ostr << *((int32_t*)k); + return ostr.str(); + */ + } - static std::string key_to_str(const byte* k) - { - //for strings - return std::string((char*)k); - //for int - /* - std::ostringstream ostr; - ostr << *((int32_t*)k); - return ostr.str(); - */ - } + //copy the tuple. does a deep copy of the contents. + datatuple* create_copy() const { + return create(key(), keylen(), data(), datalen_)->sanity_check(); + } - //returns the length of the byte array representation - int32_t byte_length() const{ - static const size_t isize = sizeof(uint32_t); - if(isDelete()) - return isize + *keylen + isize; - else - return isize + *keylen + isize + (*datalen); + + static datatuple* create(const void* key, len_t keylen) { + return create(key, keylen, 0, DELETE)->sanity_check(); + } + static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) { + datatuple *ret = (datatuple*)malloc(sizeof(datatuple)); + ret->key_ = (byte*)malloc(length_from_header(keylen, datalen)); + memcpy(ret->key_, key, keylen); + ret->data_ = ret->key_ + keylen; // need to set this even if delete, since it encodes the key length. + if(datalen != DELETE) { + memcpy(ret->data_, data, datalen); + } + ret->datalen_ = datalen; + return ret->sanity_check(); } //format: key length _ data length _ key _ data byte * to_bytes() const { - static const size_t isize = sizeof(uint32_t); - byte * ret; - if(!isDelete()) - ret = (byte*) malloc(isize + *keylen + isize + *datalen); - else - ret = (byte*) malloc(isize + *keylen + isize); - - memcpy(ret, (byte*)(keylen), isize); - memcpy(ret+isize, (byte*)(datalen), isize); - memcpy(ret+isize+isize, key, *keylen); - if(!isDelete()) - memcpy(ret+isize+isize+*keylen, data, *datalen); + byte *ret = (byte*)malloc(byte_length()); + ((len_t*)ret)[0] = keylen(); + ((len_t*)ret)[1] = datalen_; + memcpy(((len_t*)ret)+2, key_, length_from_header(keylen(), datalen_)); return ret; } - //does not copy the data again - //just sets the pointers in the datatuple to - //right positions in the given arr - - static datatuple* from_bytes(const byte * arr) - { - static const size_t isize = sizeof(uint32_t); - datatuple *dt = (datatuple*) malloc(sizeof(datatuple)); + const byte* get_bytes(len_t *keylen, len_t *datalen) const { + *keylen = this->keylen(); + *datalen = datalen_; + return key_; + } - dt->keylen = (uint32_t*) arr; - dt->datalen = (uint32_t*) (arr+isize); - dt->key = (key_t) (arr+isize+isize); - if(!dt->isDelete()) - dt->data = (data_t) (arr+isize+isize+ *(dt->keylen)); - else - dt->data = 0; + //format of buf: key _ data. The caller needs to 'peel' off key length and data length for this call. + static datatuple* from_bytes(len_t keylen, len_t datalen, byte* buf) { + datatuple *dt = (datatuple*) malloc(sizeof(datatuple)); + dt->datalen_ = datalen; + dt->key_ = buf; + dt->data_ = dt->key_ + keylen; + return dt->sanity_check(); + } + static datatuple* from_bytes(byte* buf) { + datatuple *dt = (datatuple*) malloc(sizeof(datatuple)); + len_t keylen = ((len_t*)buf)[0]; + dt->datalen_ = ((len_t*)buf)[1]; + len_t buflen = length_from_header(keylen, dt->datalen_); + dt->key_ = (byte*)malloc(buflen); + memcpy(dt->key_,((len_t*)buf)+2,buflen); + dt->data_ = dt->key_ + keylen; - return dt; - } - /* - static datatuple form_tuple(const byte * arr) - { - static const size_t isize = sizeof(uint32_t); - datatuple dt; + return dt->sanity_check(); + } - dt.keylen = (uint32_t*) arr; - dt.datalen = (uint32_t*) (arr+isize); - dt.key = (key_t) (arr+isize+isize); - if(!dt.isDelete()) - dt.data = (data_t) (arr+isize+isize+ *(dt.keylen)); - else - dt.data = 0; + static inline void freetuple(datatuple* dt) { + free(dt->key_); + free(dt); + } - return dt; - } - */ - - byte * get_key() { return (byte*) key; } - byte * get_data() { return (byte*) data; } - - //releases only the tuple - static void release(datatuple *dt) - { - free(dt); - } - } datatuple; diff --git a/logiterators.h b/logiterators.h index 85f3d07..ee3c9c9 100644 --- a/logiterators.h +++ b/logiterators.h @@ -30,9 +30,8 @@ public: if(done_) { return NULL; } if(first_) { first_ = 0;} else { it_++; } if(it_==itend_) { done_= true; return NULL; } - TUPLE *t = new TUPLE(); - t->clone(*it_); - return t; + + return (*it_)->create_copy(); } diff --git a/logserver.cpp b/logserver.cpp index c7e5bb3..16d7426 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -45,6 +45,11 @@ void logserver::startserver(logtable *ltable) worker_data->ready_queue = &ready_queue; worker_data->work_queue = &work_queue; +#ifdef STATS_ENABLED + worker_data->num_reqs = 0; +#endif + + worker_data->qlock = qlock; worker_data->selcond = selcond; @@ -340,14 +345,14 @@ void *serverLoop(void *args) } //start listening on the server socket - //second arg is the max number of coonections waiting in queue + //second arg is the max number of connections waiting in queue if(listen(sockfd,SOMAXCONN)==-1) { printf("ERROR on listen.\n"); return 0; } - printf("LSM Server listenning...\n"); + printf("LSM Server listening...\n"); *(sdata->server_socket) = sockfd; int flag, result; @@ -427,22 +432,23 @@ void * thread_work_fn( void * args) } //step 1: read the opcode - uint8_t opcode; - ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t)); - if(n == 0) { + network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST); + if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { opcode = OP_DONE; - n = sizeof(uint8_t); printf("Obsolescent client closed connection uncleanly\n"); } - assert( n == sizeof(uint8_t)); - assert( opcode < OP_INVALID ); - if( opcode == OP_DONE ) //close the conn on failure + if( opcode == OP_DONE || (opiserror(opcode))) //close the conn on failure { - pthread_mutex_lock(item->data->qlock); - printf("client done. conn closed. (%d, %d, %d, %d)\n", - n, errno, *(item->data->workitem), item->data->work_queue->size()); - close(*(item->data->workitem)); + pthread_mutex_lock(item->data->qlock); + if(opiserror(opcode)) { + printf("network error. conn closed. (%d, %d, %d, %d)\n", + opcode, errno, *(item->data->workitem), item->data->work_queue->size()); + } else { + printf("client done. conn closed. (%d, %d)\n", + *(item->data->workitem), item->data->work_queue->size()); + } + close(*(item->data->workitem)); if(item->data->work_queue->size() > 0) { @@ -464,30 +470,8 @@ void * thread_work_fn( void * args) } - //step 2: read the tuple from client - datatuple tuple; - tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t)); - - //read the key length - n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - //read the data length - n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - - //read the key - tuple.key = (byte*) malloc(*tuple.keylen); - readfromsocket(*(item->data->workitem), (char*) tuple.key, *tuple.keylen); - //read the data - if(!tuple.isDelete() && opcode != OP_FIND) - { - tuple.data = (byte*) malloc(*tuple.datalen); - readfromsocket(*(item->data->workitem), (char*) tuple.data, *tuple.datalen); - } - else - tuple.data = 0; - + //step 2: read the tuple from client + datatuple * tuple = readtuplefromsocket(*(item->data->workitem)); //step 3: process the tuple //pthread_mutex_lock(item->data->table_lock); //readlock(item->data->table_lock,0); @@ -500,67 +484,61 @@ void * thread_work_fn( void * args) //pthread_mutex_unlock(item->data->table_lock); //unlock(item->data->table_lock); //step 4: send response - uint8_t rcode = OP_SUCCESS; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); - + uint8_t rcode = LOGSTORE_RESPONSE_SUCCESS; + int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS); + if(err) { + perror("could not respond to client"); + } } else if(opcode == OP_FIND) { //find the tuple - datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen); + datatuple *dt = item->data->ltable->findTuple(-1, tuple->key(), tuple->keylen()); //unlock the lsmlock //pthread_mutex_unlock(item->data->table_lock); //unlock(item->data->table_lock); #ifdef STATS_ENABLED - if(dt == 0) - DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key).c_str()); - else if( *dt->datalen != 1024) - DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key).c_str(), - *dt->datalen); + if(dt == 0) { + DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str()); + } else if( dt->datalen() != 1024) { + DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(), + dt->datalen); + if(datatuple::compare(tuple->key(), dt->key()) != 0) { + DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(), + datatuple::key_to_str(dt->key).c_str()); + } - if(datatuple::compare(tuple.key, dt->key) != 0) - DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key).c_str(), - datatuple::key_to_str(dt->key).c_str()); - + } #endif - if(dt == 0) //tuple deleted + bool dt_needs_free; + if(dt == 0) //tuple does not exist. { - dt = (datatuple*) malloc(sizeof(datatuple)); - dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen); - *dt->keylen = *tuple.keylen; - dt->datalen = dt->keylen + 1; - dt->key = (datatuple::key_t) (dt->datalen+1); - memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen); - dt->setDelete(); + dt = tuple; + dt->setDelete(); + dt_needs_free = false; + } else { + dt_needs_free = true; } //send the reply code - uint8_t rcode = OP_SENDING_TUPLE; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); + int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); //send the tuple - writetosocket(*(item->data->workitem), (char*) dt->keylen, dt->byte_length()); + writetupletosocket(*(item->data->workitem), dt); //free datatuple - free(dt->keylen); - free(dt); + if(dt_needs_free) { + datatuple::freetuple(dt); + } } - //close the socket - //close(*(item->data->workitem)); - //free the tuple - free(tuple.keylen); - free(tuple.datalen); - free(tuple.key); - free(tuple.data); + datatuple::freetuple(tuple); - //printf("socket %d: work completed.", *(item->data->workitem)); + //printf("socket %d: work completed.", *(item->data->workitem)); pthread_mutex_lock(item->data->qlock); @@ -619,5 +597,3 @@ void * thread_work_fn( void * args) return NULL; } - - diff --git a/logserver.h b/logserver.h index e11a7f8..1ff2873 100644 --- a/logserver.h +++ b/logserver.h @@ -81,31 +81,14 @@ struct serverth_data std::queue *ready_queue; pthread_cond_t *selcond; - - pthread_mutex_t *qlock; - - + pthread_mutex_t *qlock; }; -void * thread_work_fn( void *); +void * thread_work_fn( void *); class logserver { -public: - //server codes -// static uint8_t OP_SUCCESS; -// static uint8_t OP_FAIL; -// static uint8_t OP_SENDING_TUPLE; -// -// //client codes -// static uint8_t OP_FIND; -// static uint8_t OP_INSERT; -// -// static uint8_t OP_DONE; -// -// static uint8_t OP_INVALID; - public: logserver(int nthreads, int server_port){ this->nthreads = nthreads; diff --git a/logstore.cpp b/logstore.cpp index 0c96af8..a637240 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -8,6 +8,7 @@ #include "logiterators.h" #include "datapage.cpp" +#include #include ///////////////////////////////////////////////////////////////// @@ -292,7 +293,7 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, Page *p = loadPage(xid, tree.page); writelock(p->rwlatch, 0); //logtree_state *s = (logtree_state*)p->impl; - + tree.slot = 0; //tree.size = sizeof(lsmTreeNodeRecord)+keySize; @@ -367,20 +368,27 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, // NOTE: stasis_record_free call goes to slottedFree in slotted.c // this function only reduces the numslots when you call it // with the last slot. so thats why i go backwards here. - for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) + printf("slots %d (%d) keysize=%lld\n", (int)*stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1, (long long int)keySize); + assert(*stasis_page_slotted_numslots_ptr(p) >= FIRST_SLOT+1); + for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) { + assert(*stasis_page_slotted_numslots_ptr(p) > FIRST_SLOT+1); const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0); int reclen = readRecordLength(xid, p, i); recordid tmp_rec= {p->id, i, reclen}; stasis_record_free(xid, p, tmp_rec); - } + } //TODO: could change with stasis_slotted_page_initialize(...); + // TODO: fsck? + // stasis_page_slotted_initialize_page(p); + // reinsert first. - recordid pFirstSlot = { p->id, FIRST_SLOT, readRecordLength(xid, p, FIRST_SLOT)}; - - assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1); + if(*stasis_page_slotted_numslots_ptr(p) != FIRST_SLOT+1) { + printf("slots %d (%d)\n", *stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1); + assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1); + } indexnode_rec *nr = (indexnode_rec*)stasis_record_write_begin(xid, p, pFirstSlot); @@ -833,6 +841,21 @@ logtable::logtable() } +void logtable::tearDownTree(rbtree_ptr_t tree) { + datatuple * t = 0; + for(rbtree_t::iterator delitr = tree->begin(); + delitr != tree->end(); + delitr++) { + if(t) { + datatuple::freetuple(t); + } + t = *delitr; + tree->erase(delitr); + } + if(t) { datatuple::freetuple(t); } + delete tree; +} + logtable::~logtable() { if(tree_c1 != NULL) @@ -842,23 +865,10 @@ logtable::~logtable() if(tree_c0 != NULL) { - for(rbtree_t::iterator delitr=tree_c0->begin(); - delitr != tree_c0->end(); delitr++) - free((*delitr).keylen); - - delete tree_c0; + tearDownTree(tree_c0); } delete tmerger; - - /* - if(rbtree_mut) - delete rbtree_mut; - if(tree_c0) - delete tree_c0; - if(input_needed) - delete input_needed; - */ } recordid logtable::allocTable(int xid) @@ -917,7 +927,7 @@ void logtable::flushTable() while(*mergedata->old_c0) { unlock(mergedata->header_lock); // pthread_mutex_lock(mergedata->rbtree_mut); - if(tree_bytes >= MAX_C0_SIZE) + if(tree_bytes >= max_c0_size) pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut); else { @@ -974,13 +984,10 @@ void logtable::flushTable() } -datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) +datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize) { //prepare a search tuple - datatuple search_tuple; - search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - *(search_tuple.keylen) = keySize; - search_tuple.key = key; + datatuple *search_tuple = datatuple::create(key, keySize); readlock(mergedata->header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); @@ -992,10 +999,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) if(rbitr != tree_c0->end()) { DEBUG("tree_c0 size %d\n", tree_c0->size()); - datatuple tuple = *rbitr; - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = (*rbitr)->create_copy(); } bool done = false; @@ -1006,22 +1010,19 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) rbitr = (*(mergedata->old_c0))->find(search_tuple); if(rbitr != (*(mergedata->old_c0))->end()) { - datatuple tuple = *rbitr; + datatuple *tuple = *rbitr; - if(tuple.isDelete()) //tuple deleted + if(tuple->isDelete()) //tuple deleted done = true; //return ret_tuple else if(ret_tuple != 0) //merge the two { - datatuple *mtuple = tmerger->merge(&tuple, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from current tree - free(ret_tuple); + datatuple *mtuple = tmerger->merge(tuple, ret_tuple); //merge the two + datatuple::freetuple(ret_tuple); //free tuple from current tree ret_tuple = mtuple; //set return tuple to merge result } else //key first found in old mem tree { - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = tuple->create_copy(); } //we cannot free tuple from old-tree 'cos it is not a copy } @@ -1042,8 +1043,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) else if(ret_tuple != 0) //merge the two { datatuple *mtuple = tmerger->merge(tuple_c1, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from before - free(ret_tuple); + datatuple::freetuple(ret_tuple); //free tuple from before ret_tuple = mtuple; //set return tuple to merge result } else //found for the first time @@ -1057,8 +1057,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) if(!use_copy) { - free(tuple_c1->keylen); //free tuple from tree c1 - free(tuple_c1); + datatuple::freetuple(tuple_c1); //free tuple from tree c1 } } } @@ -1078,8 +1077,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) else if(ret_tuple != 0) //merge the two { datatuple *mtuple = tmerger->merge(tuple_oc1, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from before - free(ret_tuple); + datatuple::freetuple(ret_tuple); //free tuple from before ret_tuple = mtuple; //set return tuple to merge result } else //found for the first time @@ -1093,8 +1091,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) if(!use_copy) { - free(tuple_oc1->keylen); //free tuple from tree old c1 - free(tuple_oc1); + datatuple::freetuple(tuple_oc1); //free tuple from tree old c1 } } } @@ -1113,31 +1110,25 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) else if(ret_tuple != 0) { datatuple *mtuple = tmerger->merge(tuple_c2, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from before - free(ret_tuple); + datatuple::freetuple(ret_tuple); //free tuple from before ret_tuple = mtuple; //set return tuple to merge result } else //found for the first time { use_copy = true; ret_tuple = tuple_c2; - //byte *barr = (byte*)malloc(tuple_c2->byte_length()); - //memcpy(barr, (byte*)tuple_c2->keylen, tuple_c2->byte_length()); - //ret_tuple = datatuple::from_bytes(barr); } if(!use_copy) { - free(tuple_c2->keylen); //free tuple from tree c2 - free(tuple_c2); + datatuple::freetuple(tuple_c2); //free tuple from tree c2 } } } //pthread_mutex_unlock(mergedata->rbtree_mut); unlock(mergedata->header_lock); - free(search_tuple.keylen); - + datatuple::freetuple(search_tuple); return ret_tuple; } @@ -1149,10 +1140,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize) { //prepare a search tuple - datatuple search_tuple; - search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - *(search_tuple.keylen) = keySize; - search_tuple.key = key; + datatuple * search_tuple = datatuple::create(key, keySize); pthread_mutex_lock(mergedata->rbtree_mut); @@ -1163,10 +1151,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS if(rbitr != tree_c0->end()) { DEBUG("tree_c0 size %d\n", tree_c0->size()); - datatuple tuple = *rbitr; - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = (*rbitr)->create_copy(); } else @@ -1179,10 +1164,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS rbitr = (*(mergedata->old_c0))->find(search_tuple); if(rbitr != (*(mergedata->old_c0))->end()) { - datatuple tuple = *rbitr; - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = (*rbitr)->create_copy(); } } @@ -1221,13 +1203,13 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS pthread_mutex_unlock(mergedata->rbtree_mut); - free(search_tuple.keylen); + datatuple::freetuple(search_tuple); return ret_tuple; } -void logtable::insertTuple(struct datatuple &tuple) +void logtable::insertTuple(datatuple *tuple) { //static int count = LATCH_INTERVAL; //static int tsize = 0; //number of tuples @@ -1240,29 +1222,27 @@ void logtable::insertTuple(struct datatuple &tuple) rbtree_t::iterator rbitr = tree_c0->find(tuple); if(rbitr != tree_c0->end()) { - datatuple pre_t = *rbitr; + datatuple *pre_t = *rbitr; //do the merging - datatuple *new_t = tmerger->merge(&pre_t, &tuple); + datatuple *new_t = tmerger->merge(pre_t, tuple); tree_c0->erase(pre_t); //remove the previous tuple - tree_c0->insert( *new_t); //insert the new tuple + tree_c0->insert(new_t); //insert the new tuple //update the tree size (+ new_t size - pre_t size) - tree_bytes += (new_t->byte_length() - pre_t.byte_length()); - - free(pre_t.keylen); //free the previous tuple - free(new_t); // frees the malloc(sizeof(datatuple)) coming from merge + tree_bytes += (new_t->byte_length() - pre_t->byte_length()); + + datatuple::freetuple(pre_t); //free the previous tuple } else //no tuple with same key exists in mem-tree { - datatuple t; - t.clone(tuple); + datatuple *t = tuple->create_copy(); //insert tuple into the rbtree tree_c0->insert(t); tsize++; - tree_bytes += t.byte_length() + RB_TREE_OVERHEAD; + tree_bytes += t->byte_length() + RB_TREE_OVERHEAD; } @@ -1276,7 +1256,7 @@ void logtable::insertTuple(struct datatuple &tuple) } */ - if(tree_bytes >= MAX_C0_SIZE ) + if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); pthread_mutex_unlock(mergedata->rbtree_mut); @@ -1300,7 +1280,7 @@ void logtable::insertTuple(struct datatuple &tuple) } -DataPage* logtable::insertTuple(int xid, struct datatuple &tuple, recordid &dpstate, logtree *ltree) +DataPage* logtable::insertTuple(int xid, datatuple *tuple, recordid &dpstate, logtree *ltree) { //create a new data page @@ -1326,8 +1306,8 @@ DataPage* logtable::insertTuple(int xid, struct datatuple &tuple, rec //insert the record key and id of the first page of the datapage to the logtree Tread(xid,ltree->get_tree_state(), &alloc_conf); logtree::appendPage(xid, ltree->get_root_rec(), ltree->lastLeaf, - tuple.get_key(), - *tuple.keylen, + tuple->key(), + tuple->keylen(), ltree->alloc_region, &alloc_conf, dp->get_start_pid() @@ -1507,6 +1487,7 @@ int logtreeIterator::next(int xid, lladdIterator_t *it) } else { + assert(!impl->p); if(impl->t != NULL) free(impl->t); impl->t = 0; diff --git a/logstore.h b/logstore.h index 268793b..d7f1838 100644 --- a/logstore.h +++ b/logstore.h @@ -41,7 +41,8 @@ double tv_to_double(struct timeval tv); struct logtable_mergedata; - +typedef std::set rbtree_t; +typedef rbtree_t* rbtree_ptr_t; typedef struct RegionAllocConf_t { @@ -174,11 +175,11 @@ public: ~logtable(); //user access functions - datatuple * findTuple(int xid, datatuple::key_t key, size_t keySize); + datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize); datatuple * findTuple_first(int xid, datatuple::key_t key, size_t keySize); - void insertTuple(struct datatuple &tuple); + void insertTuple(struct datatuple *tuple); //other class functions @@ -186,9 +187,11 @@ public: void flushTable(); - DataPage* insertTuple(int xid, struct datatuple &tuple, recordid &dpstate,logtree *ltree); + static inline void tearDownTree(rbtree_ptr_t t); - datatuple * findTuple(int xid, datatuple::key_t key, size_t keySize, logtree *ltree); + DataPage* insertTuple(int xid, datatuple *tuple, recordid &dpstate,logtree *ltree); + + datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree); inline recordid & get_table_rec(){return table_rec;} @@ -198,8 +201,6 @@ public: inline void set_tree_c1(logtree *t){tree_c1=t;} inline void set_tree_c2(logtree *t){tree_c2=t;} - typedef std::set rbtree_t; - typedef rbtree_t* rbtree_ptr_t; inline rbtree_ptr_t get_tree_c0(){return tree_c0;} void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;} @@ -233,6 +234,8 @@ public: logtable_mergedata * mergedata; + int64_t max_c0_size; + private: diff --git a/merger.cpp b/merger.cpp index 5f2be4f..ba6e54f 100644 --- a/merger.cpp +++ b/merger.cpp @@ -3,12 +3,13 @@ #include "merger.h" #include "logiterators.cpp" #include "datapage.h" + //pageid_t merge_scheduler::C0_MEM_SIZE = 1000 * 1000 * 1000; //template <> struct merger_args; //template <> struct merger_args; inline DataPage* -insertTuple(int xid, DataPage *dp, datatuple &t, +insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, logtree * ltree, recordid & dpstate, @@ -124,8 +125,9 @@ void merge_scheduler::shutdown() } -void merge_scheduler::startlogtable(int index) +void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) { + logtable * ltable = mergedata[index].first; struct logtable_mergedata *mdata = mergedata[index].second; @@ -158,6 +160,8 @@ void merge_scheduler::startlogtable(int index) recordid * oldridp = new recordid; *oldridp = NULLRID; + ltable->max_c0_size = MAX_C0_SIZE; + logtree ** block1_scratch = new logtree*; *block1_scratch=0; @@ -391,12 +395,12 @@ void* memMergeThread(void*arg) double target_R = *(a->r_i); double new_c1_size = npages * PAGE_SIZE; assert(target_R >= MIN_R); - if( (new_c1_size / MAX_C0_SIZE > target_R) || + if( (new_c1_size / ltable->max_c0_size > target_R) || (a->max_size && new_c1_size > a->max_size ) ) { printf("mmt:\tsignaling C2 for merge\n"); printf("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size, - MAX_C0_SIZE, a->max_size, target_R); + ltable->max_c0_size, a->max_size, target_R); // XXX need to report backpressure here! while(*a->out_tree) { @@ -454,22 +458,10 @@ void* memMergeThread(void*arg) //TODO: get the freeing outside of the lock //// ----------- Free in_tree - for(rbtree_t::iterator delitr=deltree->begin(); - delitr != deltree->end(); delitr++) - free((*delitr).keylen); - - delete deltree; + logtable::tearDownTree(deltree); //deltree = 0; - /* - for(rbtree_t::iterator delitr=(*a->in_tree)->begin(); - delitr != (*a->in_tree)->end(); delitr++) - free((*delitr).keylen); - - delete *a->in_tree; - *a->in_tree = 0; - */ } //pthread_mutex_unlock(a->block_ready_mut); @@ -580,7 +572,7 @@ void *diskMergeThread(void*arg) merge_count++; *a->my_tree_size = mergedPages; //update the current optimal R value - *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (MAX_C0_SIZE/PAGE_SIZE) ) ); + *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) ); printf("dmt:\tmerge_count %d\t#written pages: %lld\n optimal r %.2f", merge_count, npages, *(a->r_i)); @@ -664,55 +656,50 @@ int64_t merge_iterators(int xid, DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *(t2->keylen),*(t2->datalen) ); - while(t1 != 0 && datatuple::compare(t1->key, t2->key) < 0) // t1 is less than t2 + while(t1 != 0 && datatuple::compare(t1->key(), t2->key()) < 0) // t1 is less than t2 { //insert t1 - dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, + dp = insertTuple(xid, dp, t1, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); - free(t1->keylen); - free(t1); + datatuple::freetuple(t1); ntuples++; //advance itrA t1 = itrA->getnext(); } - if(t1 != 0 && datatuple::compare(t1->key, t2->key) == 0) + if(t1 != 0 && datatuple::compare(t1->key(), t2->key()) == 0) { datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); //insert merged tuple, drop deletes if(dropDeletes && !mtuple->isDelete()) - dp = insertTuple(xid, dp, *mtuple, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); - free(t1->keylen); - free(t1); + datatuple::freetuple(t1); t1 = itrA->getnext(); //advance itrA - free(mtuple->keylen); - free(mtuple); + datatuple::freetuple(mtuple); } else { //insert t2 - dp = insertTuple(xid, dp, *t2, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, t2, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); // cannot free any tuples here; they may still be read through a lookup } - - free(t2->keylen); - free(t2); + + datatuple::freetuple(t2); ntuples++; } while(t1 != 0) // t1 is less than t2 { - dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, t1, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); - free(t1->keylen); - free(t1); + datatuple::freetuple(t1); ntuples++; //advance itrA t1 = itrA->getnext(); @@ -727,9 +714,8 @@ int64_t merge_iterators(int xid, } - inline DataPage* -insertTuple(int xid, DataPage *dp, datatuple &t, +insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, logtree * ltree, recordid & dpstate, diff --git a/merger.h b/merger.h index 842dc0a..c17e478 100644 --- a/merger.h +++ b/merger.h @@ -7,12 +7,8 @@ #include "logstore.h" #include "logiterators.h" -typedef std::set rbtree_t; -typedef rbtree_t* rbtree_ptr_t; - //TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... static const int RB_TREE_OVERHEAD = 400; -static const int64_t MAX_C0_SIZE = 800 *1024*1024; //max size of c0 static const double MIN_R = 3.0; //T is either logtree or red-black tree template @@ -91,7 +87,7 @@ public: ~merge_scheduler(); int addlogtable(logtable * ltable); - void startlogtable(int index); + void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024); struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;} diff --git a/network.h b/network.h index e5a46ac..9f28f0c 100644 --- a/network.h +++ b/network.h @@ -9,39 +9,156 @@ #define NETWORK_H_ #include +#include + +typedef uint8_t network_op_t; //server codes -static const uint8_t OP_SUCCESS = 1; -static const uint8_t OP_FAIL = 2; -static const uint8_t OP_SENDING_TUPLE = 3; +static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1; +static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1; +static const network_op_t LOGSTORE_RESPONSE_FAIL = 2; +static const network_op_t LOGSTORE_RESPONSE_SENDING_TUPLES = 3; +static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3; //client codes -static const uint8_t OP_FIND = 4; -static const uint8_t OP_INSERT = 5; +static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; +static const network_op_t OP_INSERT = 8; // Create, Update, Delete +static const network_op_t OP_FIND = 9; // Read -static const uint8_t OP_DONE = 6; +static const network_op_t OP_DONE = 10; // Please close the connection. +static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 10; -static const uint8_t OP_INVALID = 32; +//error codes +static const network_op_t LOGSTORE_FIRST_ERROR = 28; +static const network_op_t LOGSTORE_CONN_CLOSED_ERROR = 28; // Unexpected EOF +static const network_op_t LOGSTORE_SOCKET_ERROR = 29; // The OS returned an error. +static const network_op_t LOGSTORE_REMOTE_ERROR = 30; // The other side didn't like our request +static const network_op_t LOGSTORE_PROTOCOL_ERROR = 31; // The other side responeded with gibberish. +static const network_op_t LOGSTORE_LAST_ERROR = 31; +static const network_op_t OP_INVALID = 32; +typedef enum { + LOGSTORE_CLIENT_REQUEST, + LOGSTORE_SERVER_RESPONSE +} logstore_opcode_type; -static inline void readfromsocket(int sockd, char *buf, int count) +static inline int readfromsocket(int sockd, void *buf, ssize_t count) { - int n = 0; + ssize_t n = 0; + while( n < count ) { - n += read( sockd, buf + n, count - n); + ssize_t i = read( sockd, ((byte*)buf) + n, count - n); + if(i == -1) { + perror("readfromsocket failed"); + return errno; + } else if(i == 0) { + errno = EOF; + return errno; + } + n += i; } + return 0; } -static inline void writetosocket(int sockd, char *buf, int count) +static inline int writetosocket(int sockd, const void *buf, ssize_t count) { - int n = 0; + ssize_t n = 0; + while( n < count ) { - n += write( sockd, buf + n, count - n); + ssize_t i = write( sockd, ((byte*)buf) + n, count - n); + if(i == -1) { + perror("writetosocket failed"); + return errno; + } else if(i == 0) { + errno = EOF; + return errno; + } + n += i; } + return 0; +} + +static inline bool opiserror(network_op_t op) { + return (LOGSTORE_FIRST_ERROR <= op && op <= LOGSTORE_LAST_ERROR); +} +static inline bool opisrequest(network_op_t op) { + return (LOGSTORE_FIRST_REQUEST_CODE <= op && op <= LOGSTORE_LAST_REQUEST_CODE); +} +static inline bool opisresponse(network_op_t op) { + return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE); +} + +static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type) { + network_op_t ret; + ssize_t n = read(sockd, &ret, sizeof(network_op_t)); + if(n == sizeof(network_op_t)) { + // done. + } else if(n == 0) { // EOF + perror("Socket closed mid request."); + return LOGSTORE_CONN_CLOSED_ERROR; + } else { + assert(n == -1); // sizeof(network_op_t) is 1, so short reads are impossible. + perror("Could not read opcode from socket"); + return LOGSTORE_SOCKET_ERROR; + } + // sanity checking + switch(type) { + case LOGSTORE_CLIENT_REQUEST: { + if(!(opisrequest(ret) || opiserror(ret))) { + fprintf(stderr, "Read invalid request code %d\n", (int)ret); + if(opisresponse(ret)) { + fprintf(stderr, "(also, the request code is a valid response code)\n"); + } + ret = LOGSTORE_PROTOCOL_ERROR; + } + } break; + case LOGSTORE_SERVER_RESPONSE: { + if(!(opisresponse(ret) || opiserror(ret))) { + fprintf(stderr, "Read invalid response code %d\n", (int)ret); + if(opisrequest(ret)) { + fprintf(stderr, "(also, the response code is a valid request code)\n"); + } + ret = LOGSTORE_PROTOCOL_ERROR; + } + + } + } + return ret; +} +static inline int writeoptosocket(int sockd, network_op_t op) { + assert(opiserror(op) || opisrequest(op) || opisresponse(op)); + return writetosocket(sockd, &op, sizeof(network_op_t)); +} + +static inline datatuple* readtuplefromsocket(int sockd) { + + datatuple::len_t keylen, datalen, buflen; + + if( readfromsocket(sockd, &keylen, sizeof(keylen)) ) return NULL; + if( readfromsocket(sockd, &datalen, sizeof(datalen)) ) return NULL; + + buflen = datatuple::length_from_header(keylen, datalen); + byte* bytes = (byte*) malloc(buflen); + + if( readfromsocket(sockd, bytes, buflen) ) return NULL; + + return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. +} + +static inline int writetupletosocket(int sockd, const datatuple* tup) { + datatuple::len_t keylen, datalen; + + const byte* buf = tup->get_bytes(&keylen, &datalen); + int err; + if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err; + if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err; + if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err; + return 0; + } #endif /* NETWORK_H_ */ diff --git a/server.cpp b/server.cpp index 4f847b0..912b6b2 100644 --- a/server.cpp +++ b/server.cpp @@ -38,19 +38,14 @@ void terminate (int param) exit(0); } -void insertProbeIter(int NUM_ENTRIES) +void initialize_server() { //signal handling void (*prev_fn)(int); prev_fn = signal (SIGINT,terminate); - //if (prev_fn==SIG_IGN) - //signal (SIGTERM,SIG_IGN); - - sync(); - - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; Tinit(); @@ -59,8 +54,6 @@ void insertProbeIter(int NUM_ENTRIES) mscheduler = new merge_scheduler; logtable ltable; - - int pcount = 40; ltable.set_fixed_page_count(pcount); @@ -71,16 +64,13 @@ void insertProbeIter(int NUM_ENTRIES) int lindex = mscheduler->addlogtable(<able); ltable.setMergeData(mscheduler->getMergeData(lindex)); - mscheduler->startlogtable(lindex); - + int64_t c0_size = 1024 * 1024 * 10; + printf("warning: running w/ tiny c0 for testing"); // XXX + mscheduler->startlogtable(lindex, c0_size); lserver = new logserver(10, 32432); lserver->startserver(<able); - - -// Tdeinit(); - } @@ -90,18 +80,7 @@ void insertProbeIter(int NUM_ENTRIES) */ int main() { - //insertProbeIter(25000); - insertProbeIter(10000); - /* - insertProbeIter(5000); - insertProbeIter(2500); - insertProbeIter(1000); - insertProbeIter(500); - insertProbeIter(1000); - insertProbeIter(100); - insertProbeIter(10); - */ - - return 0; + initialize_server(); + abort(); // can't get here. } diff --git a/tcpclient.cpp b/tcpclient.cpp index a77c59a..a90c378 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -78,12 +78,17 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time return ret; } +static inline void close_conn(logstore_handle_t *l) { + printf("read/write err.. conn closed.\n"); + close(l->server_socket); //close the connection + l->server_socket = -1; +} datatuple * logstore_client_op(logstore_handle_t *l, // int *server_socket, // struct sockaddr_in serveraddr, // struct hostent *server, - uint8_t opcode, datatuple &tuple) + uint8_t opcode, datatuple * tuple) { if(l->server_socket < 0) @@ -122,60 +127,27 @@ logstore_client_op(logstore_handle_t *l, } + //send the opcode - int n = write(l->server_socket, (byte*) &opcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); + if( writetosocket(l->server_socket, &opcode, sizeof(opcode)) ) { close_conn(l); return 0; } //send the tuple - n = write(l->server_socket, (byte*) tuple.keylen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); + if( writetupletosocket(l->server_socket, tuple) ) { close_conn(l); return 0; } - n = write(l->server_socket, (byte*) tuple.datalen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); + network_op_t rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE); - writetosocket(l->server_socket, (char*) tuple.key, *tuple.keylen); - if(!tuple.isDelete() && *tuple.datalen != 0) - writetosocket(l->server_socket, (char*) tuple.data, *tuple.datalen); + if( opiserror(rcode) ) { close_conn(l); return 0; } - //printf("\nssocket %d ", *server_socket); - //read the reply code - uint8_t rcode; - n = read(l->server_socket, (byte*) &rcode, sizeof(uint8_t)); - if( n <= 0 ) - { - printf("read err.. conn closed.\n"); - close(l->server_socket); //close the connection - l->server_socket = -1; - return 0; - } - - //printf("rdone\n"); datatuple * ret; - if(rcode == OP_SENDING_TUPLE) - { - datatuple *rcvdtuple = (datatuple*)malloc(sizeof(datatuple)); - //read the keylen - rcvdtuple->keylen = (uint32_t*) malloc(sizeof(uint32_t)); - n = read(l->server_socket, (char*) rcvdtuple->keylen, sizeof(uint32_t)); - assert(n == sizeof(uint32_t)); - //read the datalen - rcvdtuple->datalen = (uint32_t*) malloc(sizeof(uint32_t)); - n = read(l->server_socket, (byte*) rcvdtuple->datalen, sizeof(uint32_t)); - assert(n == sizeof(uint32_t)); - //read key - rcvdtuple->key = (byte*) malloc(*rcvdtuple->keylen); - readfromsocket(l->server_socket, (char*) rcvdtuple->key, *rcvdtuple->keylen); - if(!rcvdtuple->isDelete()) - { - //read key - rcvdtuple->data = (byte*) malloc(*rcvdtuple->datalen); - readfromsocket(l->server_socket, (char*) rcvdtuple->data, *rcvdtuple->datalen); - } - ret = rcvdtuple; - } else if(rcode == OP_SUCCESS) { - ret = &tuple; + if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) + { + ret = readtuplefromsocket(l->server_socket); + + } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { + ret = tuple; } else { + assert(rcode == LOGSTORE_RESPONSE_FAIL); // if this is an invalid response, we should have noticed above ret = 0; } diff --git a/tcpclient.h b/tcpclient.h index 8ca8d61..1458a49 100644 --- a/tcpclient.h +++ b/tcpclient.h @@ -16,7 +16,7 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time datatuple * logstore_client_op(logstore_handle_t* l, uint8_t opcode, - datatuple &tuple); + datatuple *tuple); int logstore_client_close(logstore_handle_t* l); diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 10c1e3a..4ab4c03 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -78,23 +78,7 @@ void insertProbeIter(size_t NUM_ENTRIES) for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = key_arr[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jbyte_length(); if(dp==NULL || !dp->append(xid, newtuple)) { dpages++; @@ -145,46 +129,19 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt=0; while( (dt=itr.getnext(xid)) != NULL) { - assert(*(dt->keylen) == key_arr[tuplenum].length()+1); - assert(*(dt->datalen) == data_arr[tuplenum].length()+1); + assert(dt->keylen() == key_arr[tuplenum].length()+1); + assert(dt->datalen() == data_arr[tuplenum].length()+1); tuplenum++; - free(dt->keylen); - free(dt); + datatuple::freetuple(dt); dt = 0; } } printf("Reads completed.\n"); -/* - - int64_t count = 0; - lladdIterator_t * it = logtreeIterator::open(xid, tree); - - while(logtreeIterator::next(xid, it)) { - byte * key; - byte **key_ptr = &key; - int keysize = logtreeIterator::key(xid, it, (byte**)key_ptr); - - pageid_t *value; - pageid_t **value_ptr = &value; - int valsize = lsmTreeIterator_value(xid, it, (byte**)value_ptr); - //printf("keylen %d key %s\n", keysize, (char*)(key)) ; - assert(valsize == sizeof(pageid_t)); - assert(!mycmp(std::string((char*)key), arr[count]) && !mycmp(arr[count],std::string((char*)key))); - assert(keysize == arr[count].length()+1); - count++; - } - assert(count == NUM_ENTRIES); - - logtreeIterator::close(xid, it); - - - */ - - Tcommit(xid); - Tdeinit(); + Tcommit(xid); + Tdeinit(); } diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index a24a210..75efa14 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -77,32 +77,10 @@ void insertProbeIter(size_t NUM_ENTRIES) std::vector dsp; for(size_t i = 0; i < NUM_ENTRIES; i++) { - //prepare the key - datatuple newtuple; - uint32_t keylen = key_arr[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jbyte_length(); if(dp == NULL) { @@ -122,10 +100,7 @@ void insertProbeIter(size_t NUM_ENTRIES) } } - free(newtuple.key); - free(newtuple.data); - - + datatuple::freetuple(newtuple); } printf("\nTREE STRUCTURE\n"); @@ -139,12 +114,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); xid = Tbegin(); - - - - printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES); - size_t tuplenum = 0; treeIterator tree_itr(tree_root); @@ -153,11 +123,10 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt=0; while( (dt=tree_itr.getnext()) != NULL) { - assert(*(dt->keylen) == key_arr[tuplenum].length()+1); - assert(*(dt->datalen) == data_arr[tuplenum].length()+1); + assert(dt->keylen() == key_arr[tuplenum].length()+1); + assert(dt->datalen() == data_arr[tuplenum].length()+1); tuplenum++; - free(dt->keylen); - free(dt); + datatuple::freetuple(dt); dt = 0; } @@ -173,21 +142,12 @@ void insertProbeIter(size_t NUM_ENTRIES) //randomly pick a key int ri = rand()%key_arr.size(); - //get the key - uint32_t keylen = key_arr[ri].length()+1; - datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jkeylen) == key_arr[ri].length()+1); - assert(*(dt->datalen) == data_arr[ri].length()+1); - free(dt->keylen); - free(dt); + assert(dt->keylen() == key_arr[ri].length()+1); + assert(dt->datalen() == data_arr[ri].length()+1); + datatuple::freetuple(dt); dt = 0; } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 4634252..c5ea608 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -72,34 +72,14 @@ void insertProbeIter(size_t NUM_ENTRIES) struct timeval start_tv, stop_tv, ti_st, ti_end; double insert_time = 0; - int dpages = 0; - int npages = 0; - DataPage *dp=0; int64_t datasize = 0; std::vector dsp; gettimeofday(&start_tv,0); for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); - //for(int j=0; jbyte_length(); gettimeofday(&ti_st,0); ltable.insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(newtuple.key); - free(newtuple.data); + datatuple::freetuple(newtuple); } gettimeofday(&stop_tv,0); @@ -155,11 +134,10 @@ void insertProbeIter(size_t NUM_ENTRIES) assert(dt!=0); //if(dt!=0) { - found_tuples++; - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); - assert(*(dt->datalen) == (*data_arr)[ri].length()+1); - free(dt->keylen); - free(dt); + found_tuples++; + assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->datalen() == (*data_arr)[ri].length()+1); + datatuple::freetuple(dt); } dt = 0; free(rkey); diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 09e38c2..fce9368 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -68,54 +68,26 @@ void insertProbeIter(size_t NUM_ENTRIES) struct timeval start_tv, stop_tv, ti_st, ti_end; double insert_time = 0; - int dpages = 0; - int npages = 0; - DataPage *dp=0; int64_t datasize = 0; std::vector dsp; gettimeofday(&start_tv,0); for(size_t i = 0; i < NUM_ENTRIES; i++) { - //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); - //for(int j=0; jbyte_length(); gettimeofday(&ti_st,0); ltable.insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(newtuple.key); - free(newtuple.data); - + datatuple::freetuple(newtuple); } gettimeofday(&stop_tv,0); printf("insert time: %6.1f\n", insert_time); @@ -124,52 +96,6 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("\nTREE STRUCTURE\n"); //ltable.get_tree_c1()->print_tree(xid); printf("datasize: %lld\n", datasize); - //sleep(20); - - /* - //Tcommit(xid); - xid = Tbegin(); - - - printf("Stage 2: Looking up %d keys:\n", NUM_ENTRIES); - - int found_tuples=0; - for(int i=NUM_ENTRIES-1; i>=0; i--) - { - int ri = i; - //printf("key index%d\n", i); - fflush(stdout); - - //get the key - uint32_t keylen = (*key_arr)[ri].length()+1; - datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); - memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen); - //for(int j=0; jkeylen) == (*key_arr)[ri].length()+1); - //assert(*(dt->datalen) == (*data_arr)[ri].length()+1); - free(dt->keylen); - free(dt); - } - dt = 0; - free(rkey); - } - printf("found %d\n", found_tuples); - - key_arr->clear(); - //data_arr->clear(); - delete key_arr; - //delete data_arr; - */ mscheduler.shutdown(); printf("merge threads finished.\n"); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index f0e3494..fb3a4eb 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -21,8 +21,8 @@ void insertProbeIter(size_t NUM_ENTRIES) { srand(1000); - //unlink("storefile.txt"); - //unlink("logfile.txt"); + unlink("storefile.txt"); + unlink("logfile.txt"); sync(); double delete_freq = .05; @@ -91,11 +91,8 @@ void insertProbeIter(size_t NUM_ENTRIES) key_v_list->clear(); delete key_v_list; -// preprandstr(NUM_ENTRIES, data_arr, 10*8192); - printf("key arr size: %d\n", key_arr->size()); - //removeduplicates(key_arr); if(key_arr->size() > NUM_ENTRIES) key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end()); @@ -138,45 +135,21 @@ void insertProbeIter(size_t NUM_ENTRIES) gettimeofday(&start_tv,0); for(size_t i = 0; i < NUM_ENTRIES; i++) { - //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); - //for(int j=0; jbyte_length(); gettimeofday(&ti_st,0); ltable.insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(newtuple.key); - free(newtuple.data); + datatuple::freetuple(newtuple); double rval = ((rand() % 100)+.0)/100; if( rval < delete_freq) //delete a key @@ -185,22 +158,15 @@ void insertProbeIter(size_t NUM_ENTRIES) if(del_index >= 0 && std::find(del_list.begin(), del_list.end(), del_index) == del_list.end()) { delcount++; - datatuple deltuple; - keylen = (*key_arr)[del_index].length()+1; - deltuple.keylen = &keylen; - - deltuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)deltuple.key, (*key_arr)[del_index].c_str(), keylen); - deltuple.datalen = &datalen; - deltuple.setDelete(); + datatuple *deltuple = datatuple::create((*key_arr)[del_index].c_str(), (*key_arr)[del_index].length()+1); gettimeofday(&ti_st,0); ltable.insertTuple(deltuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(deltuple.key); + datatuple::freetuple(deltuple); del_list.push_back(del_index); @@ -211,28 +177,17 @@ void insertProbeIter(size_t NUM_ENTRIES) int up_index = i - (rand()%50); //update one of the last inserted 50 elements if(up_index >= 0 && std::find(del_list.begin(), del_list.end(), up_index) == del_list.end()) {//only update non-deleted elements - upcount++; - datatuple uptuple; - keylen = (*key_arr)[up_index].length()+1; - uptuple.keylen = &keylen; - - uptuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)uptuple.key, (*key_arr)[up_index].c_str(), keylen); - getnextdata(ditem, 512); - datalen = ditem.length()+1; - uptuple.datalen = &datalen; - uptuple.data = (datatuple::data_t) malloc(datalen); - memcpy((byte*)uptuple.data, ditem.c_str(), datalen); - + + upcount++; + datatuple *uptuple = datatuple::create((*key_arr)[up_index].c_str(), (*key_arr)[up_index].length()+1, + ditem.c_str(), ditem.length()+1); gettimeofday(&ti_st,0); ltable.insertTuple(uptuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(uptuple.key); - free(uptuple.data); - + datatuple::freetuple(uptuple); } } @@ -244,9 +199,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("#deletions: %d\n#updates: %d\n", delcount, upcount); printf("\nTREE STRUCTURE\n"); - //ltable.get_tree_c1()->print_tree(xid); printf("datasize: %lld\n", datasize); - //sleep(20); Tcommit(xid); xid = Tbegin(); @@ -259,16 +212,11 @@ void insertProbeIter(size_t NUM_ENTRIES) for(int i=NUM_ENTRIES-1; i>=0; i--) { int ri = i; - //printf("key index%d\n", i); - fflush(stdout); //get the key uint32_t keylen = (*key_arr)[ri].length()+1; datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen); - //for(int j=0; jisDelete()); found_tuples++; - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); - //assert(*(dt->datalen) == (*data_arr)[ri].length()+1); - free(dt->keylen); - free(dt); + assert(dt->keylen() == (*key_arr)[ri].length()+1); + datatuple::freetuple(dt); } else { if(dt!=0) { - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); + assert(dt->keylen() == (*key_arr)[ri].length()+1); assert(dt->isDelete()); - free(dt->keylen); - free(dt); + datatuple::freetuple(dt); } } dt = 0; @@ -326,7 +271,7 @@ void insertProbeIter(size_t NUM_ENTRIES) */ int main() { - //insertProbeIter(25000); +// insertProbeIter(25000); insertProbeIter(400000); /* insertProbeIter(5000); diff --git a/test/check_rbtree.cpp b/test/check_rbtree.cpp index dd45a03..40a52eb 100644 --- a/test/check_rbtree.cpp +++ b/test/check_rbtree.cpp @@ -20,7 +20,8 @@ void insertProbeIter(size_t NUM_ENTRIES) { - + unlink("logfile.txt"); + unlink("storefile.txt"); //data generation std::vector data_arr; std::vector key_arr; @@ -38,45 +39,18 @@ void insertProbeIter(size_t NUM_ENTRIES) if(data_arr.size() > NUM_ENTRIES) data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end()); - std::set rbtree; + rbtree_t rbtree; + int64_t datasize = 0; std::vector dsp; for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = key_arr[i].length()+1; - newtuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - *newtuple.keylen = keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jbyte_length(); rbtree.insert(newtuple); - - } printf("\nTREE STRUCTURE\n"); @@ -88,47 +62,29 @@ void insertProbeIter(size_t NUM_ENTRIES) int found_tuples=0; for(int i=NUM_ENTRIES-1; i>=0; i--) { + //find the key with the given tuple int ri = i; - //get the key - uint32_t keylen = key_arr[ri].length()+1; - datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jkeylen) == key_arr[ri].length()+1); - assert(*(ret_tuple->datalen) == data_arr[ri].length()+1); - free(barr); - free(ret_tuple); + assert(tuple->keylen() == key_arr[ri].length()+1); + assert(tuple->datalen() == data_arr[ri].length()+1); } else { printf("Not in scratch_tree\n"); } - free(search_tuple.keylen); - free(rkey); + datatuple::freetuple(search_tuple); } printf("found %d\n", found_tuples); } diff --git a/test/check_tcpclient.cpp b/test/check_tcpclient.cpp index 529f17c..dacfa14 100644 --- a/test/check_tcpclient.cpp +++ b/test/check_tcpclient.cpp @@ -26,8 +26,6 @@ static int svrport = 32432; void insertProbeIter(size_t NUM_ENTRIES) { srand(1000); -// std::string servername = svrname; //"sherpa4"; - // int serverport = svrport; //32432; logstore_handle_t * l = logstore_client_open(svrname, svrport, 100); @@ -97,12 +95,8 @@ void insertProbeIter(size_t NUM_ENTRIES) } key_v_list->clear(); delete key_v_list; - -// preprandstr(NUM_ENTRIES, data_arr, 10*8192); - printf("key arr size: %d\n", key_arr->size()); - //removeduplicates(key_arr); if(key_arr->size() > NUM_ENTRIES) key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end()); @@ -122,35 +116,21 @@ void insertProbeIter(size_t NUM_ENTRIES) for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); + datatuple::len_t keylen = (*key_arr)[i].length()+1; //prepare the data std::string ditem; getnextdata(ditem, 8192); - uint32_t datalen = ditem.length()+1; - newtuple.datalen = &datalen; - newtuple.data = (datatuple::data_t) malloc(datalen); - memcpy((byte*)newtuple.data, ditem.c_str(), datalen); + datatuple::len_t datalen = ditem.length()+1; - /* - printf("key: \t, keylen: %u\ndata: datalen: %u\n", - //newtuple.key, - *newtuple.keylen, - //newtuple.data, - *newtuple.datalen); - */ - - datasize += newtuple.byte_length(); + datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen, + ditem.c_str(), datalen); - gettimeofday(&ti_st,0); + datasize += newtuple->byte_length(); + + gettimeofday(&ti_st,0); //send the data -// datatuple * ret = sendTuple(servername, serverport, OP_INSERT, newtuple); datatuple * ret = logstore_client_op(l, OP_INSERT, newtuple); assert(ret); @@ -158,8 +138,7 @@ void insertProbeIter(size_t NUM_ENTRIES) // insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); insert_time ++; // XXX - free(newtuple.key); - free(newtuple.data); + datatuple::freetuple(newtuple); if(i % 10000 == 0 && i > 0) printf("%d / %d inserted.\n", i, NUM_ENTRIES); @@ -177,56 +156,40 @@ void insertProbeIter(size_t NUM_ENTRIES) int found_tuples=0; for(int i=NUM_ENTRIES-1; i>=0; i--) - { - int ri = i; + { + int ri = i; //printf("key index%d\n", i); - fflush(stdout); - + //fflush(stdout); + //get the key - uint32_t keylen = (*key_arr)[ri].length()+1; - datatuple searchtuple; - searchtuple.keylen = (uint32_t*)malloc(2*sizeof(uint32_t) + keylen); - *searchtuple.keylen = keylen; + datatuple::len_t keylen = (*key_arr)[ri].length()+1; + datatuple::len_t datalen = 0; - searchtuple.datalen = searchtuple.keylen + 1; - *searchtuple.datalen = 0; - - searchtuple.key = (datatuple::key_t)(searchtuple.keylen + 2); - memcpy((byte*)searchtuple.key, (*key_arr)[ri].c_str(), keylen); + datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen); //find the key with the given tuple - datatuple *dt = logstore_client_op(l, OP_FIND, //servername, serverport, OP_FIND, - searchtuple); - + datatuple *dt = logstore_client_op(l, OP_FIND, searchtuple); + assert(dt!=0); assert(!dt->isDelete()); found_tuples++; - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); + assert(dt->keylen() == (*key_arr)[ri].length()+1); //free dt - free(dt->keylen); - free(dt->datalen); - free(dt->key); - free(dt->data); - free(dt); - + datatuple::freetuple(dt); dt = 0; - free(searchtuple.keylen); - + datatuple::freetuple(searchtuple); } printf("found %d\n", found_tuples); key_arr->clear(); - //data_arr->clear(); delete key_arr; - //delete data_arr; - + logstore_client_close(l); gettimeofday(&stop_tv,0); printf("run time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv))); - } diff --git a/tuplemerger.cpp b/tuplemerger.cpp index 0adbf22..a8a02ef 100644 --- a/tuplemerger.cpp +++ b/tuplemerger.cpp @@ -1,6 +1,8 @@ #include "tuplemerger.h" #include "logstore.h" +// XXX make the imputs 'const' +// XXX test / reason about this... datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2) { assert(!t1->isDelete() || !t2->isDelete()); //both cannot be delete @@ -9,11 +11,11 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2) if(t1->isDelete()) //delete - t2 { - t = datatuple::from_bytes(t2->to_bytes()); + t = t2->create_copy(); } else if(t2->isDelete()) { - t = datatuple::from_bytes(t2->to_bytes()); + t = t2->create_copy(); } else //neither is a delete { @@ -32,26 +34,15 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2) **/ datatuple* append_merger(datatuple *t1, datatuple *t2) { - static const size_t isize = sizeof(uint32_t); - struct datatuple *t = (datatuple*) malloc(sizeof(datatuple)); - byte *arr = (byte*)malloc(t1->byte_length() + *t2->datalen); - - t->keylen = (uint32_t*) arr; - *(t->keylen) = *(t1->keylen); - - t->datalen = (uint32_t*) (arr+isize); - *(t->datalen) = *(t1->datalen) + *(t2->datalen); - - t->key = (datatuple::key_t) (arr+isize+isize); - memcpy((byte*)t->key, (byte*)t1->key, *(t1->keylen)); - - t->data = (datatuple::data_t) (arr+isize+isize+ *(t1->keylen)); - memcpy((byte*)t->data, (byte*)t1->data, *(t1->datalen)); - memcpy(((byte*)t->data) + *(t1->datalen), (byte*)t2->data, *(t2->datalen)); - - return t; + assert(!(t1->isDelete() || t2->isDelete())); + datatuple::len_t keylen = t1->keylen(); + datatuple::len_t datalen = t1->datalen() + t2->datalen(); + byte * data = (byte*)malloc(datalen); + memcpy(data, t1->data(), t1->datalen()); + memcpy(data + t1->datalen(), t2->data(), t2->datalen()); + return datatuple::create(t1->key(), keylen, data, datalen); } /** @@ -62,23 +53,5 @@ datatuple* append_merger(datatuple *t1, datatuple *t2) **/ datatuple* replace_merger(datatuple *t1, datatuple *t2) { - static const size_t isize = sizeof(uint32_t); - struct datatuple *t = (datatuple*) malloc(sizeof(datatuple)); - - byte *arr = (byte*)malloc(t2->byte_length()); - - t->keylen = (uint32_t*) arr; - *(t->keylen) = *(t2->keylen); - - t->datalen = (uint32_t*) (arr+isize); - *(t->datalen) = *(t2->datalen); - - t->key = (datatuple::key_t) (arr+isize+isize); - memcpy((byte*)t->key, (byte*)t2->key, *(t2->keylen)); - - t->data = (datatuple::data_t) (arr+isize+isize+ *(t2->keylen)); - memcpy((byte*)t->data, (byte*)t2->data, *(t2->datalen)); - - return t; - + return t2->create_copy(); }