diff --git a/datapage.cpp b/datapage.cpp index 0434dba..911fc78 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -86,13 +86,13 @@ void DataPage::initialize(int xid) } template -bool DataPage::append(int xid, TUPLE const & dat) +bool DataPage::append(int xid, TUPLE const * dat) { assert(byte_offset >= HEADER_SIZE); assert(fix_pcount >= 1); //check if there is enough space (for the data length + data) - int32_t blen = dat.byte_length() + sizeof(int32_t); + int32_t blen = dat->byte_length() + sizeof(int32_t); if(PAGE_SIZE * fix_pcount - byte_offset < blen) { //check if the record is too large @@ -118,7 +118,7 @@ bool DataPage::append(int xid, TUPLE const & dat) byte_offset += sizeof(int32_t); //write the data - byte * barr = dat.to_bytes(); + byte * barr = dat->to_bytes(); if(!writebytes(xid, dsize, barr)) //if write fails, undo the previous write { byte_offset -= sizeof(int32_t); @@ -127,7 +127,8 @@ bool DataPage::append(int xid, TUPLE const & dat) if(PAGE_SIZE - (byte_offset % PAGE_SIZE) >= sizeof(int32_t)) { dsize = 0; - writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + int succ = writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + assert(succ); } return false; } @@ -138,7 +139,9 @@ bool DataPage::append(int xid, TUPLE const & dat) if(PAGE_SIZE - (byte_offset % PAGE_SIZE) >= sizeof(int32_t)) { dsize = 0; - writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + int succ = writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page + assert(succ); + } return true; @@ -205,12 +208,11 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS int match = -1; while((*buf=itr.getnext(xid)) != 0) { - match = TUPLE::compare((*buf)->get_key(), key); + match = TUPLE::compare((*buf)->key(), key); if(match<0) //keep searching { - free((*buf)->keylen); - free(*buf); + datatuple::freetuple(*buf); *buf=0; } else if(match==0) //found @@ -219,8 +221,7 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS } else // match > 0, then does not exist { - free((*buf)->keylen); - free(*buf); + datatuple::freetuple(*buf); *buf = 0; break; } @@ -230,12 +231,9 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS } template -void DataPage::readbytes(int xid, int32_t offset, int count, byte **data) +void DataPage::readbytes(int xid, int32_t offset, int count, byte *data) { - if(*data==NULL) - *data = (byte*)malloc(count); - int32_t bytes_copied = 0; while(bytes_copied < count) { @@ -251,7 +249,7 @@ void DataPage::readbytes(int xid, int32_t offset, int count, byte **data) int32_t copy_len = ( (PAGE_SIZE - page_offset < count - bytes_copied ) ? PAGE_SIZE - page_offset : count - bytes_copied); byte * pb_ptr = stasis_page_byte_ptr_from_start(p, page_offset); - memcpy((*data)+bytes_copied, pb_ptr, copy_len); + memcpy((data)+bytes_copied, pb_ptr, copy_len); //release the page unlock(p->rwlatch); @@ -425,10 +423,11 @@ TUPLE* DataPage::RecordIterator::getnext(int xid) readlock(p->rwlatch,0); int32_t *dsize_ptr; + int32_t scratch; if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) //int spread in two pages { - dsize_ptr = 0; - dp->readbytes(xid, offset, sizeof(int32_t), (byte**)(&dsize_ptr)); + dsize_ptr = &scratch; + dp->readbytes(xid, offset, sizeof(int32_t), (byte*)dsize_ptr); } else //int in a single page dsize_ptr = (int32_t*)stasis_page_byte_ptr_from_start(p, offset % PAGE_SIZE); @@ -442,10 +441,12 @@ TUPLE* DataPage::RecordIterator::getnext(int xid) return 0; } - byte* tb=0; - dp->readbytes(xid, offset, *dsize_ptr, &tb); + byte* tb = (byte*)malloc(*dsize_ptr); + dp->readbytes(xid, offset, *dsize_ptr, tb); - TUPLE *tup = TUPLE::from_bytes(tb); + TUPLE *tup = TUPLE::from_bytes(tb); // This version of from_bytes does not consume its argument. + + free(tb); offset += *dsize_ptr; @@ -484,15 +485,18 @@ void DataPage::RecordIterator::advance(int xid, int count) readlock(p->rwlatch,0); } - if(pindex == dp->pcount - 1 && (PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t))) - return; - + if(pindex == dp->pcount - 1 && (PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t))) { + assert(!p); // XXX Otherwise, was leaking page. do we reach this branch in testing? + return; + } int32_t *dsize_ptr=0; - if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) //int spread in two pages - dp->readbytes(xid, offset, sizeof(int32_t), (byte**)(&dsize_ptr)); - else //int in a single page + int32_t scratch; + if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) { //int spread in two pages + dsize_ptr = &scratch; + dp->readbytes(xid, offset, sizeof(int32_t), (byte*)dsize_ptr); + } else { //int in a single page dsize_ptr = (int32_t*)stasis_page_byte_ptr_from_start(p, offset % PAGE_SIZE); - + } offset += sizeof(int32_t); if(*dsize_ptr == 0) //no more keys diff --git a/datapage.h b/datapage.h index 9c2a109..8eec35a 100644 --- a/datapage.h +++ b/datapage.h @@ -59,7 +59,7 @@ public: ~DataPage(); - bool append(int xid, TUPLE const & dat); + bool append(int xid, TUPLE const * dat); bool recordRead(int xid, typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); inline uint16_t recordCount(int xid); @@ -89,10 +89,10 @@ private: void incrementPageCount(int xid, pageid_t pid, int add=1); bool writebytes(int xid, int count, byte *data); - inline void readbytes(int xid, int32_t offset, int count, byte **data=0); + inline void readbytes(int xid, int32_t offset, int count, byte *data); private: - int pcount; + int32_t pcount; pageid_t *pidarr; int32_t byte_offset;//points to the next free byte diff --git a/datatuple.h b/datatuple.h index d25ed1a..41899ff 100644 --- a/datatuple.h +++ b/datatuple.h @@ -1,167 +1,147 @@ #ifndef _DATATUPLE_H_ #define _DATATUPLE_H_ - -typedef unsigned char uchar; - #include - -//#define byte unsigned char typedef unsigned char byte; #include - -//#include -//#include -//#include +#include typedef struct datatuple { - typedef uchar* key_t; - typedef uchar* data_t; - static const size_t isize = sizeof(uint32_t); - uint32_t *keylen; //key length should be size of string + 1 for \n - uint32_t *datalen; - key_t key; - data_t data; +public: + typedef uint32_t len_t ; + typedef unsigned char* key_t ; + typedef unsigned char* data_t ; +private: + static const len_t DELETE = ((len_t)0) - 1; + len_t datalen_; + byte* key_; + byte* data_; // aliases key. data_ - 1 should be the \0 terminating key_. + + datatuple* sanity_check() { + assert(keylen() < 3000); + return this; + } +public: + + inline len_t keylen() const { + return data_ - key_; + } + inline len_t datalen() const { + return (datalen_ == DELETE) ? 0 : datalen_; + } + + //returns the length of the byte array representation + len_t byte_length() const { + return sizeof(len_t) + sizeof(len_t) + keylen() + datalen(); + } + static len_t length_from_header(len_t keylen, len_t datalen) { + return keylen + ((datalen == DELETE) ? 0 : datalen); + } + + inline key_t key() const { + return key_; + } + inline data_t data() const { + return data_; + } //this is used by the stl set - bool operator() (const datatuple& lhs, const datatuple& rhs) const - { - //std::basic_string s1(lhs.key); - //std::basic_string s2(rhs.key); - return strcmp((char*)lhs.key,(char*)rhs.key) < 0; - //return (*((int32_t*)lhs.key)) <= (*((int32_t*)rhs.key)); - } - - void clone(const datatuple& tuple) { - //create a copy - - byte * arr = (byte*) malloc(tuple.byte_length()); - - keylen = (uint32_t*) arr; - *keylen = *tuple.keylen; - datalen = (uint32_t*) (arr+isize); - *datalen = *tuple.datalen; - key = (datatuple::key_t) (arr+isize+isize); - memcpy((byte*)key, (byte*)tuple.key, *keylen); - if(!tuple.isDelete()) - { - data = (datatuple::data_t) (arr+isize+isize+ *keylen); - memcpy((byte*)data, (byte*)tuple.data, *datalen); - } - else - data = 0; - } + bool operator() (const datatuple* lhs, const datatuple* rhs) const { + return compare(lhs->key(), rhs->key()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0; + } /** * return -1 if k1 < k2 * 0 if k1 == k2 * 1 of k1 > k2 **/ - static int compare(const key_t k1,const key_t k2) - { - //for char* ending with \0 - return strcmp((char*)k1,(char*)k2); + static int compare(const byte* k1,const byte* k2) { + // XXX string comparison is probably not the right approach. + //for char* ending with \0 + return strcmp((char*)k1,(char*)k2); + } - //for int32_t - //printf("%d\t%d\n",(*((int32_t*)k1)) ,(*((int32_t*)k2))); - //return (*((int32_t*)k1)) <= (*((int32_t*)k2)); - } + inline void setDelete() { + datalen_ = DELETE; + } - void setDelete() - { - *datalen = UINT_MAX; - } + inline bool isDelete() const { + return datalen_ == DELETE; + } - inline bool isDelete() const - { - return *datalen == UINT_MAX; - } + static std::string key_to_str(const byte* k) { + //for strings + return std::string((char*)k); + //for int + /* + std::ostringstream ostr; + ostr << *((int32_t*)k); + return ostr.str(); + */ + } - static std::string key_to_str(const byte* k) - { - //for strings - return std::string((char*)k); - //for int - /* - std::ostringstream ostr; - ostr << *((int32_t*)k); - return ostr.str(); - */ - } + //copy the tuple. does a deep copy of the contents. + datatuple* create_copy() const { + return create(key(), keylen(), data(), datalen_)->sanity_check(); + } - //returns the length of the byte array representation - int32_t byte_length() const{ - static const size_t isize = sizeof(uint32_t); - if(isDelete()) - return isize + *keylen + isize; - else - return isize + *keylen + isize + (*datalen); + + static datatuple* create(const void* key, len_t keylen) { + return create(key, keylen, 0, DELETE)->sanity_check(); + } + static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) { + datatuple *ret = (datatuple*)malloc(sizeof(datatuple)); + ret->key_ = (byte*)malloc(length_from_header(keylen, datalen)); + memcpy(ret->key_, key, keylen); + ret->data_ = ret->key_ + keylen; // need to set this even if delete, since it encodes the key length. + if(datalen != DELETE) { + memcpy(ret->data_, data, datalen); + } + ret->datalen_ = datalen; + return ret->sanity_check(); } //format: key length _ data length _ key _ data byte * to_bytes() const { - static const size_t isize = sizeof(uint32_t); - byte * ret; - if(!isDelete()) - ret = (byte*) malloc(isize + *keylen + isize + *datalen); - else - ret = (byte*) malloc(isize + *keylen + isize); - - memcpy(ret, (byte*)(keylen), isize); - memcpy(ret+isize, (byte*)(datalen), isize); - memcpy(ret+isize+isize, key, *keylen); - if(!isDelete()) - memcpy(ret+isize+isize+*keylen, data, *datalen); + byte *ret = (byte*)malloc(byte_length()); + ((len_t*)ret)[0] = keylen(); + ((len_t*)ret)[1] = datalen_; + memcpy(((len_t*)ret)+2, key_, length_from_header(keylen(), datalen_)); return ret; } - //does not copy the data again - //just sets the pointers in the datatuple to - //right positions in the given arr - - static datatuple* from_bytes(const byte * arr) - { - static const size_t isize = sizeof(uint32_t); - datatuple *dt = (datatuple*) malloc(sizeof(datatuple)); + const byte* get_bytes(len_t *keylen, len_t *datalen) const { + *keylen = this->keylen(); + *datalen = datalen_; + return key_; + } - dt->keylen = (uint32_t*) arr; - dt->datalen = (uint32_t*) (arr+isize); - dt->key = (key_t) (arr+isize+isize); - if(!dt->isDelete()) - dt->data = (data_t) (arr+isize+isize+ *(dt->keylen)); - else - dt->data = 0; + //format of buf: key _ data. The caller needs to 'peel' off key length and data length for this call. + static datatuple* from_bytes(len_t keylen, len_t datalen, byte* buf) { + datatuple *dt = (datatuple*) malloc(sizeof(datatuple)); + dt->datalen_ = datalen; + dt->key_ = buf; + dt->data_ = dt->key_ + keylen; + return dt->sanity_check(); + } + static datatuple* from_bytes(byte* buf) { + datatuple *dt = (datatuple*) malloc(sizeof(datatuple)); + len_t keylen = ((len_t*)buf)[0]; + dt->datalen_ = ((len_t*)buf)[1]; + len_t buflen = length_from_header(keylen, dt->datalen_); + dt->key_ = (byte*)malloc(buflen); + memcpy(dt->key_,((len_t*)buf)+2,buflen); + dt->data_ = dt->key_ + keylen; - return dt; - } - /* - static datatuple form_tuple(const byte * arr) - { - static const size_t isize = sizeof(uint32_t); - datatuple dt; + return dt->sanity_check(); + } - dt.keylen = (uint32_t*) arr; - dt.datalen = (uint32_t*) (arr+isize); - dt.key = (key_t) (arr+isize+isize); - if(!dt.isDelete()) - dt.data = (data_t) (arr+isize+isize+ *(dt.keylen)); - else - dt.data = 0; + static inline void freetuple(datatuple* dt) { + free(dt->key_); + free(dt); + } - return dt; - } - */ - - byte * get_key() { return (byte*) key; } - byte * get_data() { return (byte*) data; } - - //releases only the tuple - static void release(datatuple *dt) - { - free(dt); - } - } datatuple; diff --git a/logiterators.h b/logiterators.h index 85f3d07..ee3c9c9 100644 --- a/logiterators.h +++ b/logiterators.h @@ -30,9 +30,8 @@ public: if(done_) { return NULL; } if(first_) { first_ = 0;} else { it_++; } if(it_==itend_) { done_= true; return NULL; } - TUPLE *t = new TUPLE(); - t->clone(*it_); - return t; + + return (*it_)->create_copy(); } diff --git a/logserver.cpp b/logserver.cpp index c7e5bb3..16d7426 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -45,6 +45,11 @@ void logserver::startserver(logtable *ltable) worker_data->ready_queue = &ready_queue; worker_data->work_queue = &work_queue; +#ifdef STATS_ENABLED + worker_data->num_reqs = 0; +#endif + + worker_data->qlock = qlock; worker_data->selcond = selcond; @@ -340,14 +345,14 @@ void *serverLoop(void *args) } //start listening on the server socket - //second arg is the max number of coonections waiting in queue + //second arg is the max number of connections waiting in queue if(listen(sockfd,SOMAXCONN)==-1) { printf("ERROR on listen.\n"); return 0; } - printf("LSM Server listenning...\n"); + printf("LSM Server listening...\n"); *(sdata->server_socket) = sockfd; int flag, result; @@ -427,22 +432,23 @@ void * thread_work_fn( void * args) } //step 1: read the opcode - uint8_t opcode; - ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t)); - if(n == 0) { + network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST); + if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { opcode = OP_DONE; - n = sizeof(uint8_t); printf("Obsolescent client closed connection uncleanly\n"); } - assert( n == sizeof(uint8_t)); - assert( opcode < OP_INVALID ); - if( opcode == OP_DONE ) //close the conn on failure + if( opcode == OP_DONE || (opiserror(opcode))) //close the conn on failure { - pthread_mutex_lock(item->data->qlock); - printf("client done. conn closed. (%d, %d, %d, %d)\n", - n, errno, *(item->data->workitem), item->data->work_queue->size()); - close(*(item->data->workitem)); + pthread_mutex_lock(item->data->qlock); + if(opiserror(opcode)) { + printf("network error. conn closed. (%d, %d, %d, %d)\n", + opcode, errno, *(item->data->workitem), item->data->work_queue->size()); + } else { + printf("client done. conn closed. (%d, %d)\n", + *(item->data->workitem), item->data->work_queue->size()); + } + close(*(item->data->workitem)); if(item->data->work_queue->size() > 0) { @@ -464,30 +470,8 @@ void * thread_work_fn( void * args) } - //step 2: read the tuple from client - datatuple tuple; - tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t)); - - //read the key length - n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - //read the data length - n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - - //read the key - tuple.key = (byte*) malloc(*tuple.keylen); - readfromsocket(*(item->data->workitem), (char*) tuple.key, *tuple.keylen); - //read the data - if(!tuple.isDelete() && opcode != OP_FIND) - { - tuple.data = (byte*) malloc(*tuple.datalen); - readfromsocket(*(item->data->workitem), (char*) tuple.data, *tuple.datalen); - } - else - tuple.data = 0; - + //step 2: read the tuple from client + datatuple * tuple = readtuplefromsocket(*(item->data->workitem)); //step 3: process the tuple //pthread_mutex_lock(item->data->table_lock); //readlock(item->data->table_lock,0); @@ -500,67 +484,61 @@ void * thread_work_fn( void * args) //pthread_mutex_unlock(item->data->table_lock); //unlock(item->data->table_lock); //step 4: send response - uint8_t rcode = OP_SUCCESS; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); - + uint8_t rcode = LOGSTORE_RESPONSE_SUCCESS; + int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS); + if(err) { + perror("could not respond to client"); + } } else if(opcode == OP_FIND) { //find the tuple - datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen); + datatuple *dt = item->data->ltable->findTuple(-1, tuple->key(), tuple->keylen()); //unlock the lsmlock //pthread_mutex_unlock(item->data->table_lock); //unlock(item->data->table_lock); #ifdef STATS_ENABLED - if(dt == 0) - DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key).c_str()); - else if( *dt->datalen != 1024) - DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key).c_str(), - *dt->datalen); + if(dt == 0) { + DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str()); + } else if( dt->datalen() != 1024) { + DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(), + dt->datalen); + if(datatuple::compare(tuple->key(), dt->key()) != 0) { + DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(), + datatuple::key_to_str(dt->key).c_str()); + } - if(datatuple::compare(tuple.key, dt->key) != 0) - DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key).c_str(), - datatuple::key_to_str(dt->key).c_str()); - + } #endif - if(dt == 0) //tuple deleted + bool dt_needs_free; + if(dt == 0) //tuple does not exist. { - dt = (datatuple*) malloc(sizeof(datatuple)); - dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen); - *dt->keylen = *tuple.keylen; - dt->datalen = dt->keylen + 1; - dt->key = (datatuple::key_t) (dt->datalen+1); - memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen); - dt->setDelete(); + dt = tuple; + dt->setDelete(); + dt_needs_free = false; + } else { + dt_needs_free = true; } //send the reply code - uint8_t rcode = OP_SENDING_TUPLE; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); + int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); //send the tuple - writetosocket(*(item->data->workitem), (char*) dt->keylen, dt->byte_length()); + writetupletosocket(*(item->data->workitem), dt); //free datatuple - free(dt->keylen); - free(dt); + if(dt_needs_free) { + datatuple::freetuple(dt); + } } - //close the socket - //close(*(item->data->workitem)); - //free the tuple - free(tuple.keylen); - free(tuple.datalen); - free(tuple.key); - free(tuple.data); + datatuple::freetuple(tuple); - //printf("socket %d: work completed.", *(item->data->workitem)); + //printf("socket %d: work completed.", *(item->data->workitem)); pthread_mutex_lock(item->data->qlock); @@ -619,5 +597,3 @@ void * thread_work_fn( void * args) return NULL; } - - diff --git a/logserver.h b/logserver.h index e11a7f8..1ff2873 100644 --- a/logserver.h +++ b/logserver.h @@ -81,31 +81,14 @@ struct serverth_data std::queue *ready_queue; pthread_cond_t *selcond; - - pthread_mutex_t *qlock; - - + pthread_mutex_t *qlock; }; -void * thread_work_fn( void *); +void * thread_work_fn( void *); class logserver { -public: - //server codes -// static uint8_t OP_SUCCESS; -// static uint8_t OP_FAIL; -// static uint8_t OP_SENDING_TUPLE; -// -// //client codes -// static uint8_t OP_FIND; -// static uint8_t OP_INSERT; -// -// static uint8_t OP_DONE; -// -// static uint8_t OP_INVALID; - public: logserver(int nthreads, int server_port){ this->nthreads = nthreads; diff --git a/logstore.cpp b/logstore.cpp index 0c96af8..a637240 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -8,6 +8,7 @@ #include "logiterators.h" #include "datapage.cpp" +#include #include ///////////////////////////////////////////////////////////////// @@ -292,7 +293,7 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, Page *p = loadPage(xid, tree.page); writelock(p->rwlatch, 0); //logtree_state *s = (logtree_state*)p->impl; - + tree.slot = 0; //tree.size = sizeof(lsmTreeNodeRecord)+keySize; @@ -367,20 +368,27 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, // NOTE: stasis_record_free call goes to slottedFree in slotted.c // this function only reduces the numslots when you call it // with the last slot. so thats why i go backwards here. - for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) + printf("slots %d (%d) keysize=%lld\n", (int)*stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1, (long long int)keySize); + assert(*stasis_page_slotted_numslots_ptr(p) >= FIRST_SLOT+1); + for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) { + assert(*stasis_page_slotted_numslots_ptr(p) > FIRST_SLOT+1); const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0); int reclen = readRecordLength(xid, p, i); recordid tmp_rec= {p->id, i, reclen}; stasis_record_free(xid, p, tmp_rec); - } + } //TODO: could change with stasis_slotted_page_initialize(...); + // TODO: fsck? + // stasis_page_slotted_initialize_page(p); + // reinsert first. - recordid pFirstSlot = { p->id, FIRST_SLOT, readRecordLength(xid, p, FIRST_SLOT)}; - - assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1); + if(*stasis_page_slotted_numslots_ptr(p) != FIRST_SLOT+1) { + printf("slots %d (%d)\n", *stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1); + assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1); + } indexnode_rec *nr = (indexnode_rec*)stasis_record_write_begin(xid, p, pFirstSlot); @@ -833,6 +841,21 @@ logtable::logtable() } +void logtable::tearDownTree(rbtree_ptr_t tree) { + datatuple * t = 0; + for(rbtree_t::iterator delitr = tree->begin(); + delitr != tree->end(); + delitr++) { + if(t) { + datatuple::freetuple(t); + } + t = *delitr; + tree->erase(delitr); + } + if(t) { datatuple::freetuple(t); } + delete tree; +} + logtable::~logtable() { if(tree_c1 != NULL) @@ -842,23 +865,10 @@ logtable::~logtable() if(tree_c0 != NULL) { - for(rbtree_t::iterator delitr=tree_c0->begin(); - delitr != tree_c0->end(); delitr++) - free((*delitr).keylen); - - delete tree_c0; + tearDownTree(tree_c0); } delete tmerger; - - /* - if(rbtree_mut) - delete rbtree_mut; - if(tree_c0) - delete tree_c0; - if(input_needed) - delete input_needed; - */ } recordid logtable::allocTable(int xid) @@ -917,7 +927,7 @@ void logtable::flushTable() while(*mergedata->old_c0) { unlock(mergedata->header_lock); // pthread_mutex_lock(mergedata->rbtree_mut); - if(tree_bytes >= MAX_C0_SIZE) + if(tree_bytes >= max_c0_size) pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut); else { @@ -974,13 +984,10 @@ void logtable::flushTable() } -datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) +datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize) { //prepare a search tuple - datatuple search_tuple; - search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - *(search_tuple.keylen) = keySize; - search_tuple.key = key; + datatuple *search_tuple = datatuple::create(key, keySize); readlock(mergedata->header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); @@ -992,10 +999,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) if(rbitr != tree_c0->end()) { DEBUG("tree_c0 size %d\n", tree_c0->size()); - datatuple tuple = *rbitr; - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = (*rbitr)->create_copy(); } bool done = false; @@ -1006,22 +1010,19 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) rbitr = (*(mergedata->old_c0))->find(search_tuple); if(rbitr != (*(mergedata->old_c0))->end()) { - datatuple tuple = *rbitr; + datatuple *tuple = *rbitr; - if(tuple.isDelete()) //tuple deleted + if(tuple->isDelete()) //tuple deleted done = true; //return ret_tuple else if(ret_tuple != 0) //merge the two { - datatuple *mtuple = tmerger->merge(&tuple, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from current tree - free(ret_tuple); + datatuple *mtuple = tmerger->merge(tuple, ret_tuple); //merge the two + datatuple::freetuple(ret_tuple); //free tuple from current tree ret_tuple = mtuple; //set return tuple to merge result } else //key first found in old mem tree { - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = tuple->create_copy(); } //we cannot free tuple from old-tree 'cos it is not a copy } @@ -1042,8 +1043,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) else if(ret_tuple != 0) //merge the two { datatuple *mtuple = tmerger->merge(tuple_c1, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from before - free(ret_tuple); + datatuple::freetuple(ret_tuple); //free tuple from before ret_tuple = mtuple; //set return tuple to merge result } else //found for the first time @@ -1057,8 +1057,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) if(!use_copy) { - free(tuple_c1->keylen); //free tuple from tree c1 - free(tuple_c1); + datatuple::freetuple(tuple_c1); //free tuple from tree c1 } } } @@ -1078,8 +1077,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) else if(ret_tuple != 0) //merge the two { datatuple *mtuple = tmerger->merge(tuple_oc1, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from before - free(ret_tuple); + datatuple::freetuple(ret_tuple); //free tuple from before ret_tuple = mtuple; //set return tuple to merge result } else //found for the first time @@ -1093,8 +1091,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) if(!use_copy) { - free(tuple_oc1->keylen); //free tuple from tree old c1 - free(tuple_oc1); + datatuple::freetuple(tuple_oc1); //free tuple from tree old c1 } } } @@ -1113,31 +1110,25 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) else if(ret_tuple != 0) { datatuple *mtuple = tmerger->merge(tuple_c2, ret_tuple); //merge the two - free(ret_tuple->keylen); //free tuple from before - free(ret_tuple); + datatuple::freetuple(ret_tuple); //free tuple from before ret_tuple = mtuple; //set return tuple to merge result } else //found for the first time { use_copy = true; ret_tuple = tuple_c2; - //byte *barr = (byte*)malloc(tuple_c2->byte_length()); - //memcpy(barr, (byte*)tuple_c2->keylen, tuple_c2->byte_length()); - //ret_tuple = datatuple::from_bytes(barr); } if(!use_copy) { - free(tuple_c2->keylen); //free tuple from tree c2 - free(tuple_c2); + datatuple::freetuple(tuple_c2); //free tuple from tree c2 } } } //pthread_mutex_unlock(mergedata->rbtree_mut); unlock(mergedata->header_lock); - free(search_tuple.keylen); - + datatuple::freetuple(search_tuple); return ret_tuple; } @@ -1149,10 +1140,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize) datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize) { //prepare a search tuple - datatuple search_tuple; - search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - *(search_tuple.keylen) = keySize; - search_tuple.key = key; + datatuple * search_tuple = datatuple::create(key, keySize); pthread_mutex_lock(mergedata->rbtree_mut); @@ -1163,10 +1151,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS if(rbitr != tree_c0->end()) { DEBUG("tree_c0 size %d\n", tree_c0->size()); - datatuple tuple = *rbitr; - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = (*rbitr)->create_copy(); } else @@ -1179,10 +1164,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS rbitr = (*(mergedata->old_c0))->find(search_tuple); if(rbitr != (*(mergedata->old_c0))->end()) { - datatuple tuple = *rbitr; - byte *barr = (byte*)malloc(tuple.byte_length()); - memcpy(barr, (byte*)tuple.keylen, tuple.byte_length()); - ret_tuple = datatuple::from_bytes(barr); + ret_tuple = (*rbitr)->create_copy(); } } @@ -1221,13 +1203,13 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS pthread_mutex_unlock(mergedata->rbtree_mut); - free(search_tuple.keylen); + datatuple::freetuple(search_tuple); return ret_tuple; } -void logtable::insertTuple(struct datatuple &tuple) +void logtable::insertTuple(datatuple *tuple) { //static int count = LATCH_INTERVAL; //static int tsize = 0; //number of tuples @@ -1240,29 +1222,27 @@ void logtable::insertTuple(struct datatuple &tuple) rbtree_t::iterator rbitr = tree_c0->find(tuple); if(rbitr != tree_c0->end()) { - datatuple pre_t = *rbitr; + datatuple *pre_t = *rbitr; //do the merging - datatuple *new_t = tmerger->merge(&pre_t, &tuple); + datatuple *new_t = tmerger->merge(pre_t, tuple); tree_c0->erase(pre_t); //remove the previous tuple - tree_c0->insert( *new_t); //insert the new tuple + tree_c0->insert(new_t); //insert the new tuple //update the tree size (+ new_t size - pre_t size) - tree_bytes += (new_t->byte_length() - pre_t.byte_length()); - - free(pre_t.keylen); //free the previous tuple - free(new_t); // frees the malloc(sizeof(datatuple)) coming from merge + tree_bytes += (new_t->byte_length() - pre_t->byte_length()); + + datatuple::freetuple(pre_t); //free the previous tuple } else //no tuple with same key exists in mem-tree { - datatuple t; - t.clone(tuple); + datatuple *t = tuple->create_copy(); //insert tuple into the rbtree tree_c0->insert(t); tsize++; - tree_bytes += t.byte_length() + RB_TREE_OVERHEAD; + tree_bytes += t->byte_length() + RB_TREE_OVERHEAD; } @@ -1276,7 +1256,7 @@ void logtable::insertTuple(struct datatuple &tuple) } */ - if(tree_bytes >= MAX_C0_SIZE ) + if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); pthread_mutex_unlock(mergedata->rbtree_mut); @@ -1300,7 +1280,7 @@ void logtable::insertTuple(struct datatuple &tuple) } -DataPage* logtable::insertTuple(int xid, struct datatuple &tuple, recordid &dpstate, logtree *ltree) +DataPage* logtable::insertTuple(int xid, datatuple *tuple, recordid &dpstate, logtree *ltree) { //create a new data page @@ -1326,8 +1306,8 @@ DataPage* logtable::insertTuple(int xid, struct datatuple &tuple, rec //insert the record key and id of the first page of the datapage to the logtree Tread(xid,ltree->get_tree_state(), &alloc_conf); logtree::appendPage(xid, ltree->get_root_rec(), ltree->lastLeaf, - tuple.get_key(), - *tuple.keylen, + tuple->key(), + tuple->keylen(), ltree->alloc_region, &alloc_conf, dp->get_start_pid() @@ -1507,6 +1487,7 @@ int logtreeIterator::next(int xid, lladdIterator_t *it) } else { + assert(!impl->p); if(impl->t != NULL) free(impl->t); impl->t = 0; diff --git a/logstore.h b/logstore.h index 268793b..d7f1838 100644 --- a/logstore.h +++ b/logstore.h @@ -41,7 +41,8 @@ double tv_to_double(struct timeval tv); struct logtable_mergedata; - +typedef std::set rbtree_t; +typedef rbtree_t* rbtree_ptr_t; typedef struct RegionAllocConf_t { @@ -174,11 +175,11 @@ public: ~logtable(); //user access functions - datatuple * findTuple(int xid, datatuple::key_t key, size_t keySize); + datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize); datatuple * findTuple_first(int xid, datatuple::key_t key, size_t keySize); - void insertTuple(struct datatuple &tuple); + void insertTuple(struct datatuple *tuple); //other class functions @@ -186,9 +187,11 @@ public: void flushTable(); - DataPage* insertTuple(int xid, struct datatuple &tuple, recordid &dpstate,logtree *ltree); + static inline void tearDownTree(rbtree_ptr_t t); - datatuple * findTuple(int xid, datatuple::key_t key, size_t keySize, logtree *ltree); + DataPage* insertTuple(int xid, datatuple *tuple, recordid &dpstate,logtree *ltree); + + datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree); inline recordid & get_table_rec(){return table_rec;} @@ -198,8 +201,6 @@ public: inline void set_tree_c1(logtree *t){tree_c1=t;} inline void set_tree_c2(logtree *t){tree_c2=t;} - typedef std::set rbtree_t; - typedef rbtree_t* rbtree_ptr_t; inline rbtree_ptr_t get_tree_c0(){return tree_c0;} void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;} @@ -233,6 +234,8 @@ public: logtable_mergedata * mergedata; + int64_t max_c0_size; + private: diff --git a/merger.cpp b/merger.cpp index 5f2be4f..ba6e54f 100644 --- a/merger.cpp +++ b/merger.cpp @@ -3,12 +3,13 @@ #include "merger.h" #include "logiterators.cpp" #include "datapage.h" + //pageid_t merge_scheduler::C0_MEM_SIZE = 1000 * 1000 * 1000; //template <> struct merger_args; //template <> struct merger_args; inline DataPage* -insertTuple(int xid, DataPage *dp, datatuple &t, +insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, logtree * ltree, recordid & dpstate, @@ -124,8 +125,9 @@ void merge_scheduler::shutdown() } -void merge_scheduler::startlogtable(int index) +void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) { + logtable * ltable = mergedata[index].first; struct logtable_mergedata *mdata = mergedata[index].second; @@ -158,6 +160,8 @@ void merge_scheduler::startlogtable(int index) recordid * oldridp = new recordid; *oldridp = NULLRID; + ltable->max_c0_size = MAX_C0_SIZE; + logtree ** block1_scratch = new logtree*; *block1_scratch=0; @@ -391,12 +395,12 @@ void* memMergeThread(void*arg) double target_R = *(a->r_i); double new_c1_size = npages * PAGE_SIZE; assert(target_R >= MIN_R); - if( (new_c1_size / MAX_C0_SIZE > target_R) || + if( (new_c1_size / ltable->max_c0_size > target_R) || (a->max_size && new_c1_size > a->max_size ) ) { printf("mmt:\tsignaling C2 for merge\n"); printf("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size, - MAX_C0_SIZE, a->max_size, target_R); + ltable->max_c0_size, a->max_size, target_R); // XXX need to report backpressure here! while(*a->out_tree) { @@ -454,22 +458,10 @@ void* memMergeThread(void*arg) //TODO: get the freeing outside of the lock //// ----------- Free in_tree - for(rbtree_t::iterator delitr=deltree->begin(); - delitr != deltree->end(); delitr++) - free((*delitr).keylen); - - delete deltree; + logtable::tearDownTree(deltree); //deltree = 0; - /* - for(rbtree_t::iterator delitr=(*a->in_tree)->begin(); - delitr != (*a->in_tree)->end(); delitr++) - free((*delitr).keylen); - - delete *a->in_tree; - *a->in_tree = 0; - */ } //pthread_mutex_unlock(a->block_ready_mut); @@ -580,7 +572,7 @@ void *diskMergeThread(void*arg) merge_count++; *a->my_tree_size = mergedPages; //update the current optimal R value - *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (MAX_C0_SIZE/PAGE_SIZE) ) ); + *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) ); printf("dmt:\tmerge_count %d\t#written pages: %lld\n optimal r %.2f", merge_count, npages, *(a->r_i)); @@ -664,55 +656,50 @@ int64_t merge_iterators(int xid, DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *(t2->keylen),*(t2->datalen) ); - while(t1 != 0 && datatuple::compare(t1->key, t2->key) < 0) // t1 is less than t2 + while(t1 != 0 && datatuple::compare(t1->key(), t2->key()) < 0) // t1 is less than t2 { //insert t1 - dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, + dp = insertTuple(xid, dp, t1, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); - free(t1->keylen); - free(t1); + datatuple::freetuple(t1); ntuples++; //advance itrA t1 = itrA->getnext(); } - if(t1 != 0 && datatuple::compare(t1->key, t2->key) == 0) + if(t1 != 0 && datatuple::compare(t1->key(), t2->key()) == 0) { datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); //insert merged tuple, drop deletes if(dropDeletes && !mtuple->isDelete()) - dp = insertTuple(xid, dp, *mtuple, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); - free(t1->keylen); - free(t1); + datatuple::freetuple(t1); t1 = itrA->getnext(); //advance itrA - free(mtuple->keylen); - free(mtuple); + datatuple::freetuple(mtuple); } else { //insert t2 - dp = insertTuple(xid, dp, *t2, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, t2, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); // cannot free any tuples here; they may still be read through a lookup } - - free(t2->keylen); - free(t2); + + datatuple::freetuple(t2); ntuples++; } while(t1 != 0) // t1 is less than t2 { - dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, t1, ltable, scratch_tree, ltable->get_dpstate2(), dpages, npages); - free(t1->keylen); - free(t1); + datatuple::freetuple(t1); ntuples++; //advance itrA t1 = itrA->getnext(); @@ -727,9 +714,8 @@ int64_t merge_iterators(int xid, } - inline DataPage* -insertTuple(int xid, DataPage *dp, datatuple &t, +insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, logtree * ltree, recordid & dpstate, diff --git a/merger.h b/merger.h index 842dc0a..c17e478 100644 --- a/merger.h +++ b/merger.h @@ -7,12 +7,8 @@ #include "logstore.h" #include "logiterators.h" -typedef std::set rbtree_t; -typedef rbtree_t* rbtree_ptr_t; - //TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... static const int RB_TREE_OVERHEAD = 400; -static const int64_t MAX_C0_SIZE = 800 *1024*1024; //max size of c0 static const double MIN_R = 3.0; //T is either logtree or red-black tree template @@ -91,7 +87,7 @@ public: ~merge_scheduler(); int addlogtable(logtable * ltable); - void startlogtable(int index); + void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024); struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;} diff --git a/network.h b/network.h index e5a46ac..9f28f0c 100644 --- a/network.h +++ b/network.h @@ -9,39 +9,156 @@ #define NETWORK_H_ #include +#include + +typedef uint8_t network_op_t; //server codes -static const uint8_t OP_SUCCESS = 1; -static const uint8_t OP_FAIL = 2; -static const uint8_t OP_SENDING_TUPLE = 3; +static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1; +static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1; +static const network_op_t LOGSTORE_RESPONSE_FAIL = 2; +static const network_op_t LOGSTORE_RESPONSE_SENDING_TUPLES = 3; +static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3; //client codes -static const uint8_t OP_FIND = 4; -static const uint8_t OP_INSERT = 5; +static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; +static const network_op_t OP_INSERT = 8; // Create, Update, Delete +static const network_op_t OP_FIND = 9; // Read -static const uint8_t OP_DONE = 6; +static const network_op_t OP_DONE = 10; // Please close the connection. +static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 10; -static const uint8_t OP_INVALID = 32; +//error codes +static const network_op_t LOGSTORE_FIRST_ERROR = 28; +static const network_op_t LOGSTORE_CONN_CLOSED_ERROR = 28; // Unexpected EOF +static const network_op_t LOGSTORE_SOCKET_ERROR = 29; // The OS returned an error. +static const network_op_t LOGSTORE_REMOTE_ERROR = 30; // The other side didn't like our request +static const network_op_t LOGSTORE_PROTOCOL_ERROR = 31; // The other side responeded with gibberish. +static const network_op_t LOGSTORE_LAST_ERROR = 31; +static const network_op_t OP_INVALID = 32; +typedef enum { + LOGSTORE_CLIENT_REQUEST, + LOGSTORE_SERVER_RESPONSE +} logstore_opcode_type; -static inline void readfromsocket(int sockd, char *buf, int count) +static inline int readfromsocket(int sockd, void *buf, ssize_t count) { - int n = 0; + ssize_t n = 0; + while( n < count ) { - n += read( sockd, buf + n, count - n); + ssize_t i = read( sockd, ((byte*)buf) + n, count - n); + if(i == -1) { + perror("readfromsocket failed"); + return errno; + } else if(i == 0) { + errno = EOF; + return errno; + } + n += i; } + return 0; } -static inline void writetosocket(int sockd, char *buf, int count) +static inline int writetosocket(int sockd, const void *buf, ssize_t count) { - int n = 0; + ssize_t n = 0; + while( n < count ) { - n += write( sockd, buf + n, count - n); + ssize_t i = write( sockd, ((byte*)buf) + n, count - n); + if(i == -1) { + perror("writetosocket failed"); + return errno; + } else if(i == 0) { + errno = EOF; + return errno; + } + n += i; } + return 0; +} + +static inline bool opiserror(network_op_t op) { + return (LOGSTORE_FIRST_ERROR <= op && op <= LOGSTORE_LAST_ERROR); +} +static inline bool opisrequest(network_op_t op) { + return (LOGSTORE_FIRST_REQUEST_CODE <= op && op <= LOGSTORE_LAST_REQUEST_CODE); +} +static inline bool opisresponse(network_op_t op) { + return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE); +} + +static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type) { + network_op_t ret; + ssize_t n = read(sockd, &ret, sizeof(network_op_t)); + if(n == sizeof(network_op_t)) { + // done. + } else if(n == 0) { // EOF + perror("Socket closed mid request."); + return LOGSTORE_CONN_CLOSED_ERROR; + } else { + assert(n == -1); // sizeof(network_op_t) is 1, so short reads are impossible. + perror("Could not read opcode from socket"); + return LOGSTORE_SOCKET_ERROR; + } + // sanity checking + switch(type) { + case LOGSTORE_CLIENT_REQUEST: { + if(!(opisrequest(ret) || opiserror(ret))) { + fprintf(stderr, "Read invalid request code %d\n", (int)ret); + if(opisresponse(ret)) { + fprintf(stderr, "(also, the request code is a valid response code)\n"); + } + ret = LOGSTORE_PROTOCOL_ERROR; + } + } break; + case LOGSTORE_SERVER_RESPONSE: { + if(!(opisresponse(ret) || opiserror(ret))) { + fprintf(stderr, "Read invalid response code %d\n", (int)ret); + if(opisrequest(ret)) { + fprintf(stderr, "(also, the response code is a valid request code)\n"); + } + ret = LOGSTORE_PROTOCOL_ERROR; + } + + } + } + return ret; +} +static inline int writeoptosocket(int sockd, network_op_t op) { + assert(opiserror(op) || opisrequest(op) || opisresponse(op)); + return writetosocket(sockd, &op, sizeof(network_op_t)); +} + +static inline datatuple* readtuplefromsocket(int sockd) { + + datatuple::len_t keylen, datalen, buflen; + + if( readfromsocket(sockd, &keylen, sizeof(keylen)) ) return NULL; + if( readfromsocket(sockd, &datalen, sizeof(datalen)) ) return NULL; + + buflen = datatuple::length_from_header(keylen, datalen); + byte* bytes = (byte*) malloc(buflen); + + if( readfromsocket(sockd, bytes, buflen) ) return NULL; + + return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. +} + +static inline int writetupletosocket(int sockd, const datatuple* tup) { + datatuple::len_t keylen, datalen; + + const byte* buf = tup->get_bytes(&keylen, &datalen); + int err; + if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err; + if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err; + if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err; + return 0; + } #endif /* NETWORK_H_ */ diff --git a/server.cpp b/server.cpp index 4f847b0..912b6b2 100644 --- a/server.cpp +++ b/server.cpp @@ -38,19 +38,14 @@ void terminate (int param) exit(0); } -void insertProbeIter(int NUM_ENTRIES) +void initialize_server() { //signal handling void (*prev_fn)(int); prev_fn = signal (SIGINT,terminate); - //if (prev_fn==SIG_IGN) - //signal (SIGTERM,SIG_IGN); - - sync(); - - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; Tinit(); @@ -59,8 +54,6 @@ void insertProbeIter(int NUM_ENTRIES) mscheduler = new merge_scheduler; logtable ltable; - - int pcount = 40; ltable.set_fixed_page_count(pcount); @@ -71,16 +64,13 @@ void insertProbeIter(int NUM_ENTRIES) int lindex = mscheduler->addlogtable(<able); ltable.setMergeData(mscheduler->getMergeData(lindex)); - mscheduler->startlogtable(lindex); - + int64_t c0_size = 1024 * 1024 * 10; + printf("warning: running w/ tiny c0 for testing"); // XXX + mscheduler->startlogtable(lindex, c0_size); lserver = new logserver(10, 32432); lserver->startserver(<able); - - -// Tdeinit(); - } @@ -90,18 +80,7 @@ void insertProbeIter(int NUM_ENTRIES) */ int main() { - //insertProbeIter(25000); - insertProbeIter(10000); - /* - insertProbeIter(5000); - insertProbeIter(2500); - insertProbeIter(1000); - insertProbeIter(500); - insertProbeIter(1000); - insertProbeIter(100); - insertProbeIter(10); - */ - - return 0; + initialize_server(); + abort(); // can't get here. } diff --git a/tcpclient.cpp b/tcpclient.cpp index a77c59a..a90c378 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -78,12 +78,17 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time return ret; } +static inline void close_conn(logstore_handle_t *l) { + printf("read/write err.. conn closed.\n"); + close(l->server_socket); //close the connection + l->server_socket = -1; +} datatuple * logstore_client_op(logstore_handle_t *l, // int *server_socket, // struct sockaddr_in serveraddr, // struct hostent *server, - uint8_t opcode, datatuple &tuple) + uint8_t opcode, datatuple * tuple) { if(l->server_socket < 0) @@ -122,60 +127,27 @@ logstore_client_op(logstore_handle_t *l, } + //send the opcode - int n = write(l->server_socket, (byte*) &opcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); + if( writetosocket(l->server_socket, &opcode, sizeof(opcode)) ) { close_conn(l); return 0; } //send the tuple - n = write(l->server_socket, (byte*) tuple.keylen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); + if( writetupletosocket(l->server_socket, tuple) ) { close_conn(l); return 0; } - n = write(l->server_socket, (byte*) tuple.datalen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); + network_op_t rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE); - writetosocket(l->server_socket, (char*) tuple.key, *tuple.keylen); - if(!tuple.isDelete() && *tuple.datalen != 0) - writetosocket(l->server_socket, (char*) tuple.data, *tuple.datalen); + if( opiserror(rcode) ) { close_conn(l); return 0; } - //printf("\nssocket %d ", *server_socket); - //read the reply code - uint8_t rcode; - n = read(l->server_socket, (byte*) &rcode, sizeof(uint8_t)); - if( n <= 0 ) - { - printf("read err.. conn closed.\n"); - close(l->server_socket); //close the connection - l->server_socket = -1; - return 0; - } - - //printf("rdone\n"); datatuple * ret; - if(rcode == OP_SENDING_TUPLE) - { - datatuple *rcvdtuple = (datatuple*)malloc(sizeof(datatuple)); - //read the keylen - rcvdtuple->keylen = (uint32_t*) malloc(sizeof(uint32_t)); - n = read(l->server_socket, (char*) rcvdtuple->keylen, sizeof(uint32_t)); - assert(n == sizeof(uint32_t)); - //read the datalen - rcvdtuple->datalen = (uint32_t*) malloc(sizeof(uint32_t)); - n = read(l->server_socket, (byte*) rcvdtuple->datalen, sizeof(uint32_t)); - assert(n == sizeof(uint32_t)); - //read key - rcvdtuple->key = (byte*) malloc(*rcvdtuple->keylen); - readfromsocket(l->server_socket, (char*) rcvdtuple->key, *rcvdtuple->keylen); - if(!rcvdtuple->isDelete()) - { - //read key - rcvdtuple->data = (byte*) malloc(*rcvdtuple->datalen); - readfromsocket(l->server_socket, (char*) rcvdtuple->data, *rcvdtuple->datalen); - } - ret = rcvdtuple; - } else if(rcode == OP_SUCCESS) { - ret = &tuple; + if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) + { + ret = readtuplefromsocket(l->server_socket); + + } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { + ret = tuple; } else { + assert(rcode == LOGSTORE_RESPONSE_FAIL); // if this is an invalid response, we should have noticed above ret = 0; } diff --git a/tcpclient.h b/tcpclient.h index 8ca8d61..1458a49 100644 --- a/tcpclient.h +++ b/tcpclient.h @@ -16,7 +16,7 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time datatuple * logstore_client_op(logstore_handle_t* l, uint8_t opcode, - datatuple &tuple); + datatuple *tuple); int logstore_client_close(logstore_handle_t* l); diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 10c1e3a..4ab4c03 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -78,23 +78,7 @@ void insertProbeIter(size_t NUM_ENTRIES) for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = key_arr[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jbyte_length(); if(dp==NULL || !dp->append(xid, newtuple)) { dpages++; @@ -145,46 +129,19 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt=0; while( (dt=itr.getnext(xid)) != NULL) { - assert(*(dt->keylen) == key_arr[tuplenum].length()+1); - assert(*(dt->datalen) == data_arr[tuplenum].length()+1); + assert(dt->keylen() == key_arr[tuplenum].length()+1); + assert(dt->datalen() == data_arr[tuplenum].length()+1); tuplenum++; - free(dt->keylen); - free(dt); + datatuple::freetuple(dt); dt = 0; } } printf("Reads completed.\n"); -/* - - int64_t count = 0; - lladdIterator_t * it = logtreeIterator::open(xid, tree); - - while(logtreeIterator::next(xid, it)) { - byte * key; - byte **key_ptr = &key; - int keysize = logtreeIterator::key(xid, it, (byte**)key_ptr); - - pageid_t *value; - pageid_t **value_ptr = &value; - int valsize = lsmTreeIterator_value(xid, it, (byte**)value_ptr); - //printf("keylen %d key %s\n", keysize, (char*)(key)) ; - assert(valsize == sizeof(pageid_t)); - assert(!mycmp(std::string((char*)key), arr[count]) && !mycmp(arr[count],std::string((char*)key))); - assert(keysize == arr[count].length()+1); - count++; - } - assert(count == NUM_ENTRIES); - - logtreeIterator::close(xid, it); - - - */ - - Tcommit(xid); - Tdeinit(); + Tcommit(xid); + Tdeinit(); } diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index a24a210..75efa14 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -77,32 +77,10 @@ void insertProbeIter(size_t NUM_ENTRIES) std::vector dsp; for(size_t i = 0; i < NUM_ENTRIES; i++) { - //prepare the key - datatuple newtuple; - uint32_t keylen = key_arr[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jbyte_length(); if(dp == NULL) { @@ -122,10 +100,7 @@ void insertProbeIter(size_t NUM_ENTRIES) } } - free(newtuple.key); - free(newtuple.data); - - + datatuple::freetuple(newtuple); } printf("\nTREE STRUCTURE\n"); @@ -139,12 +114,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); xid = Tbegin(); - - - - printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES); - size_t tuplenum = 0; treeIterator tree_itr(tree_root); @@ -153,11 +123,10 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *dt=0; while( (dt=tree_itr.getnext()) != NULL) { - assert(*(dt->keylen) == key_arr[tuplenum].length()+1); - assert(*(dt->datalen) == data_arr[tuplenum].length()+1); + assert(dt->keylen() == key_arr[tuplenum].length()+1); + assert(dt->datalen() == data_arr[tuplenum].length()+1); tuplenum++; - free(dt->keylen); - free(dt); + datatuple::freetuple(dt); dt = 0; } @@ -173,21 +142,12 @@ void insertProbeIter(size_t NUM_ENTRIES) //randomly pick a key int ri = rand()%key_arr.size(); - //get the key - uint32_t keylen = key_arr[ri].length()+1; - datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jkeylen) == key_arr[ri].length()+1); - assert(*(dt->datalen) == data_arr[ri].length()+1); - free(dt->keylen); - free(dt); + assert(dt->keylen() == key_arr[ri].length()+1); + assert(dt->datalen() == data_arr[ri].length()+1); + datatuple::freetuple(dt); dt = 0; } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 4634252..c5ea608 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -72,34 +72,14 @@ void insertProbeIter(size_t NUM_ENTRIES) struct timeval start_tv, stop_tv, ti_st, ti_end; double insert_time = 0; - int dpages = 0; - int npages = 0; - DataPage *dp=0; int64_t datasize = 0; std::vector dsp; gettimeofday(&start_tv,0); for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); - //for(int j=0; jbyte_length(); gettimeofday(&ti_st,0); ltable.insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(newtuple.key); - free(newtuple.data); + datatuple::freetuple(newtuple); } gettimeofday(&stop_tv,0); @@ -155,11 +134,10 @@ void insertProbeIter(size_t NUM_ENTRIES) assert(dt!=0); //if(dt!=0) { - found_tuples++; - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); - assert(*(dt->datalen) == (*data_arr)[ri].length()+1); - free(dt->keylen); - free(dt); + found_tuples++; + assert(dt->keylen() == (*key_arr)[ri].length()+1); + assert(dt->datalen() == (*data_arr)[ri].length()+1); + datatuple::freetuple(dt); } dt = 0; free(rkey); diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 09e38c2..fce9368 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -68,54 +68,26 @@ void insertProbeIter(size_t NUM_ENTRIES) struct timeval start_tv, stop_tv, ti_st, ti_end; double insert_time = 0; - int dpages = 0; - int npages = 0; - DataPage *dp=0; int64_t datasize = 0; std::vector dsp; gettimeofday(&start_tv,0); for(size_t i = 0; i < NUM_ENTRIES; i++) { - //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); - //for(int j=0; jbyte_length(); gettimeofday(&ti_st,0); ltable.insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(newtuple.key); - free(newtuple.data); - + datatuple::freetuple(newtuple); } gettimeofday(&stop_tv,0); printf("insert time: %6.1f\n", insert_time); @@ -124,52 +96,6 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("\nTREE STRUCTURE\n"); //ltable.get_tree_c1()->print_tree(xid); printf("datasize: %lld\n", datasize); - //sleep(20); - - /* - //Tcommit(xid); - xid = Tbegin(); - - - printf("Stage 2: Looking up %d keys:\n", NUM_ENTRIES); - - int found_tuples=0; - for(int i=NUM_ENTRIES-1; i>=0; i--) - { - int ri = i; - //printf("key index%d\n", i); - fflush(stdout); - - //get the key - uint32_t keylen = (*key_arr)[ri].length()+1; - datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); - memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen); - //for(int j=0; jkeylen) == (*key_arr)[ri].length()+1); - //assert(*(dt->datalen) == (*data_arr)[ri].length()+1); - free(dt->keylen); - free(dt); - } - dt = 0; - free(rkey); - } - printf("found %d\n", found_tuples); - - key_arr->clear(); - //data_arr->clear(); - delete key_arr; - //delete data_arr; - */ mscheduler.shutdown(); printf("merge threads finished.\n"); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index f0e3494..fb3a4eb 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -21,8 +21,8 @@ void insertProbeIter(size_t NUM_ENTRIES) { srand(1000); - //unlink("storefile.txt"); - //unlink("logfile.txt"); + unlink("storefile.txt"); + unlink("logfile.txt"); sync(); double delete_freq = .05; @@ -91,11 +91,8 @@ void insertProbeIter(size_t NUM_ENTRIES) key_v_list->clear(); delete key_v_list; -// preprandstr(NUM_ENTRIES, data_arr, 10*8192); - printf("key arr size: %d\n", key_arr->size()); - //removeduplicates(key_arr); if(key_arr->size() > NUM_ENTRIES) key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end()); @@ -138,45 +135,21 @@ void insertProbeIter(size_t NUM_ENTRIES) gettimeofday(&start_tv,0); for(size_t i = 0; i < NUM_ENTRIES; i++) { - //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); - //for(int j=0; jbyte_length(); gettimeofday(&ti_st,0); ltable.insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(newtuple.key); - free(newtuple.data); + datatuple::freetuple(newtuple); double rval = ((rand() % 100)+.0)/100; if( rval < delete_freq) //delete a key @@ -185,22 +158,15 @@ void insertProbeIter(size_t NUM_ENTRIES) if(del_index >= 0 && std::find(del_list.begin(), del_list.end(), del_index) == del_list.end()) { delcount++; - datatuple deltuple; - keylen = (*key_arr)[del_index].length()+1; - deltuple.keylen = &keylen; - - deltuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)deltuple.key, (*key_arr)[del_index].c_str(), keylen); - deltuple.datalen = &datalen; - deltuple.setDelete(); + datatuple *deltuple = datatuple::create((*key_arr)[del_index].c_str(), (*key_arr)[del_index].length()+1); gettimeofday(&ti_st,0); ltable.insertTuple(deltuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(deltuple.key); + datatuple::freetuple(deltuple); del_list.push_back(del_index); @@ -211,28 +177,17 @@ void insertProbeIter(size_t NUM_ENTRIES) int up_index = i - (rand()%50); //update one of the last inserted 50 elements if(up_index >= 0 && std::find(del_list.begin(), del_list.end(), up_index) == del_list.end()) {//only update non-deleted elements - upcount++; - datatuple uptuple; - keylen = (*key_arr)[up_index].length()+1; - uptuple.keylen = &keylen; - - uptuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)uptuple.key, (*key_arr)[up_index].c_str(), keylen); - getnextdata(ditem, 512); - datalen = ditem.length()+1; - uptuple.datalen = &datalen; - uptuple.data = (datatuple::data_t) malloc(datalen); - memcpy((byte*)uptuple.data, ditem.c_str(), datalen); - + + upcount++; + datatuple *uptuple = datatuple::create((*key_arr)[up_index].c_str(), (*key_arr)[up_index].length()+1, + ditem.c_str(), ditem.length()+1); gettimeofday(&ti_st,0); ltable.insertTuple(uptuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - free(uptuple.key); - free(uptuple.data); - + datatuple::freetuple(uptuple); } } @@ -244,9 +199,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("#deletions: %d\n#updates: %d\n", delcount, upcount); printf("\nTREE STRUCTURE\n"); - //ltable.get_tree_c1()->print_tree(xid); printf("datasize: %lld\n", datasize); - //sleep(20); Tcommit(xid); xid = Tbegin(); @@ -259,16 +212,11 @@ void insertProbeIter(size_t NUM_ENTRIES) for(int i=NUM_ENTRIES-1; i>=0; i--) { int ri = i; - //printf("key index%d\n", i); - fflush(stdout); //get the key uint32_t keylen = (*key_arr)[ri].length()+1; datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen); - //for(int j=0; jisDelete()); found_tuples++; - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); - //assert(*(dt->datalen) == (*data_arr)[ri].length()+1); - free(dt->keylen); - free(dt); + assert(dt->keylen() == (*key_arr)[ri].length()+1); + datatuple::freetuple(dt); } else { if(dt!=0) { - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); + assert(dt->keylen() == (*key_arr)[ri].length()+1); assert(dt->isDelete()); - free(dt->keylen); - free(dt); + datatuple::freetuple(dt); } } dt = 0; @@ -326,7 +271,7 @@ void insertProbeIter(size_t NUM_ENTRIES) */ int main() { - //insertProbeIter(25000); +// insertProbeIter(25000); insertProbeIter(400000); /* insertProbeIter(5000); diff --git a/test/check_rbtree.cpp b/test/check_rbtree.cpp index dd45a03..40a52eb 100644 --- a/test/check_rbtree.cpp +++ b/test/check_rbtree.cpp @@ -20,7 +20,8 @@ void insertProbeIter(size_t NUM_ENTRIES) { - + unlink("logfile.txt"); + unlink("storefile.txt"); //data generation std::vector data_arr; std::vector key_arr; @@ -38,45 +39,18 @@ void insertProbeIter(size_t NUM_ENTRIES) if(data_arr.size() > NUM_ENTRIES) data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end()); - std::set rbtree; + rbtree_t rbtree; + int64_t datasize = 0; std::vector dsp; for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = key_arr[i].length()+1; - newtuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - *newtuple.keylen = keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jbyte_length(); rbtree.insert(newtuple); - - } printf("\nTREE STRUCTURE\n"); @@ -88,47 +62,29 @@ void insertProbeIter(size_t NUM_ENTRIES) int found_tuples=0; for(int i=NUM_ENTRIES-1; i>=0; i--) { + //find the key with the given tuple int ri = i; - //get the key - uint32_t keylen = key_arr[ri].length()+1; - datatuple::key_t rkey = (datatuple::key_t) malloc(keylen); - for(size_t j=0; jkeylen) == key_arr[ri].length()+1); - assert(*(ret_tuple->datalen) == data_arr[ri].length()+1); - free(barr); - free(ret_tuple); + assert(tuple->keylen() == key_arr[ri].length()+1); + assert(tuple->datalen() == data_arr[ri].length()+1); } else { printf("Not in scratch_tree\n"); } - free(search_tuple.keylen); - free(rkey); + datatuple::freetuple(search_tuple); } printf("found %d\n", found_tuples); } diff --git a/test/check_tcpclient.cpp b/test/check_tcpclient.cpp index 529f17c..dacfa14 100644 --- a/test/check_tcpclient.cpp +++ b/test/check_tcpclient.cpp @@ -26,8 +26,6 @@ static int svrport = 32432; void insertProbeIter(size_t NUM_ENTRIES) { srand(1000); -// std::string servername = svrname; //"sherpa4"; - // int serverport = svrport; //32432; logstore_handle_t * l = logstore_client_open(svrname, svrport, 100); @@ -97,12 +95,8 @@ void insertProbeIter(size_t NUM_ENTRIES) } key_v_list->clear(); delete key_v_list; - -// preprandstr(NUM_ENTRIES, data_arr, 10*8192); - printf("key arr size: %d\n", key_arr->size()); - //removeduplicates(key_arr); if(key_arr->size() > NUM_ENTRIES) key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end()); @@ -122,35 +116,21 @@ void insertProbeIter(size_t NUM_ENTRIES) for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple newtuple; - uint32_t keylen = (*key_arr)[i].length()+1; - newtuple.keylen = &keylen; - - newtuple.key = (datatuple::key_t) malloc(keylen); - memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen); + datatuple::len_t keylen = (*key_arr)[i].length()+1; //prepare the data std::string ditem; getnextdata(ditem, 8192); - uint32_t datalen = ditem.length()+1; - newtuple.datalen = &datalen; - newtuple.data = (datatuple::data_t) malloc(datalen); - memcpy((byte*)newtuple.data, ditem.c_str(), datalen); + datatuple::len_t datalen = ditem.length()+1; - /* - printf("key: \t, keylen: %u\ndata: datalen: %u\n", - //newtuple.key, - *newtuple.keylen, - //newtuple.data, - *newtuple.datalen); - */ - - datasize += newtuple.byte_length(); + datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen, + ditem.c_str(), datalen); - gettimeofday(&ti_st,0); + datasize += newtuple->byte_length(); + + gettimeofday(&ti_st,0); //send the data -// datatuple * ret = sendTuple(servername, serverport, OP_INSERT, newtuple); datatuple * ret = logstore_client_op(l, OP_INSERT, newtuple); assert(ret); @@ -158,8 +138,7 @@ void insertProbeIter(size_t NUM_ENTRIES) // insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); insert_time ++; // XXX - free(newtuple.key); - free(newtuple.data); + datatuple::freetuple(newtuple); if(i % 10000 == 0 && i > 0) printf("%d / %d inserted.\n", i, NUM_ENTRIES); @@ -177,56 +156,40 @@ void insertProbeIter(size_t NUM_ENTRIES) int found_tuples=0; for(int i=NUM_ENTRIES-1; i>=0; i--) - { - int ri = i; + { + int ri = i; //printf("key index%d\n", i); - fflush(stdout); - + //fflush(stdout); + //get the key - uint32_t keylen = (*key_arr)[ri].length()+1; - datatuple searchtuple; - searchtuple.keylen = (uint32_t*)malloc(2*sizeof(uint32_t) + keylen); - *searchtuple.keylen = keylen; + datatuple::len_t keylen = (*key_arr)[ri].length()+1; + datatuple::len_t datalen = 0; - searchtuple.datalen = searchtuple.keylen + 1; - *searchtuple.datalen = 0; - - searchtuple.key = (datatuple::key_t)(searchtuple.keylen + 2); - memcpy((byte*)searchtuple.key, (*key_arr)[ri].c_str(), keylen); + datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen); //find the key with the given tuple - datatuple *dt = logstore_client_op(l, OP_FIND, //servername, serverport, OP_FIND, - searchtuple); - + datatuple *dt = logstore_client_op(l, OP_FIND, searchtuple); + assert(dt!=0); assert(!dt->isDelete()); found_tuples++; - assert(*(dt->keylen) == (*key_arr)[ri].length()+1); + assert(dt->keylen() == (*key_arr)[ri].length()+1); //free dt - free(dt->keylen); - free(dt->datalen); - free(dt->key); - free(dt->data); - free(dt); - + datatuple::freetuple(dt); dt = 0; - free(searchtuple.keylen); - + datatuple::freetuple(searchtuple); } printf("found %d\n", found_tuples); key_arr->clear(); - //data_arr->clear(); delete key_arr; - //delete data_arr; - + logstore_client_close(l); gettimeofday(&stop_tv,0); printf("run time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv))); - } diff --git a/tuplemerger.cpp b/tuplemerger.cpp index 0adbf22..a8a02ef 100644 --- a/tuplemerger.cpp +++ b/tuplemerger.cpp @@ -1,6 +1,8 @@ #include "tuplemerger.h" #include "logstore.h" +// XXX make the imputs 'const' +// XXX test / reason about this... datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2) { assert(!t1->isDelete() || !t2->isDelete()); //both cannot be delete @@ -9,11 +11,11 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2) if(t1->isDelete()) //delete - t2 { - t = datatuple::from_bytes(t2->to_bytes()); + t = t2->create_copy(); } else if(t2->isDelete()) { - t = datatuple::from_bytes(t2->to_bytes()); + t = t2->create_copy(); } else //neither is a delete { @@ -32,26 +34,15 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2) **/ datatuple* append_merger(datatuple *t1, datatuple *t2) { - static const size_t isize = sizeof(uint32_t); - struct datatuple *t = (datatuple*) malloc(sizeof(datatuple)); - byte *arr = (byte*)malloc(t1->byte_length() + *t2->datalen); - - t->keylen = (uint32_t*) arr; - *(t->keylen) = *(t1->keylen); - - t->datalen = (uint32_t*) (arr+isize); - *(t->datalen) = *(t1->datalen) + *(t2->datalen); - - t->key = (datatuple::key_t) (arr+isize+isize); - memcpy((byte*)t->key, (byte*)t1->key, *(t1->keylen)); - - t->data = (datatuple::data_t) (arr+isize+isize+ *(t1->keylen)); - memcpy((byte*)t->data, (byte*)t1->data, *(t1->datalen)); - memcpy(((byte*)t->data) + *(t1->datalen), (byte*)t2->data, *(t2->datalen)); - - return t; + assert(!(t1->isDelete() || t2->isDelete())); + datatuple::len_t keylen = t1->keylen(); + datatuple::len_t datalen = t1->datalen() + t2->datalen(); + byte * data = (byte*)malloc(datalen); + memcpy(data, t1->data(), t1->datalen()); + memcpy(data + t1->datalen(), t2->data(), t2->datalen()); + return datatuple::create(t1->key(), keylen, data, datalen); } /** @@ -62,23 +53,5 @@ datatuple* append_merger(datatuple *t1, datatuple *t2) **/ datatuple* replace_merger(datatuple *t1, datatuple *t2) { - static const size_t isize = sizeof(uint32_t); - struct datatuple *t = (datatuple*) malloc(sizeof(datatuple)); - - byte *arr = (byte*)malloc(t2->byte_length()); - - t->keylen = (uint32_t*) arr; - *(t->keylen) = *(t2->keylen); - - t->datalen = (uint32_t*) (arr+isize); - *(t->datalen) = *(t2->datalen); - - t->key = (datatuple::key_t) (arr+isize+isize); - memcpy((byte*)t->key, (byte*)t2->key, *(t2->keylen)); - - t->data = (datatuple::data_t) (arr+isize+isize+ *(t2->keylen)); - memcpy((byte*)t->data, (byte*)t2->data, *(t2->datalen)); - - return t; - + return t2->create_copy(); }