Reworked memory allocation and network protocol. This gargantuan commit also reduces the default size of C0, and removes quite a bit of redundant logic and error handling.

The tests now pass, except that check_merge never terminates (it takes too long) and check_mergelarge still is not passing.

For better luck running this version of the code, turn off stasis' concurrent buffer manager.  We're doing something bad that leads to deadlocks with the concurrent buffer 
manager.  Another (the same?) bug less-frequently leads to page corruption with the old stasis buffer manager.




git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@556 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-02-10 21:49:50 +00:00
parent 3c297d1a66
commit 940a6da6fe
22 changed files with 549 additions and 915 deletions

View file

@ -86,13 +86,13 @@ void DataPage<TUPLE>::initialize(int xid)
}
template <class TUPLE>
bool DataPage<TUPLE>::append(int xid, TUPLE const & dat)
bool DataPage<TUPLE>::append(int xid, TUPLE const * dat)
{
assert(byte_offset >= HEADER_SIZE);
assert(fix_pcount >= 1);
//check if there is enough space (for the data length + data)
int32_t blen = dat.byte_length() + sizeof(int32_t);
int32_t blen = dat->byte_length() + sizeof(int32_t);
if(PAGE_SIZE * fix_pcount - byte_offset < blen)
{
//check if the record is too large
@ -118,7 +118,7 @@ bool DataPage<TUPLE>::append(int xid, TUPLE const & dat)
byte_offset += sizeof(int32_t);
//write the data
byte * barr = dat.to_bytes();
byte * barr = dat->to_bytes();
if(!writebytes(xid, dsize, barr)) //if write fails, undo the previous write
{
byte_offset -= sizeof(int32_t);
@ -127,7 +127,8 @@ bool DataPage<TUPLE>::append(int xid, TUPLE const & dat)
if(PAGE_SIZE - (byte_offset % PAGE_SIZE) >= sizeof(int32_t))
{
dsize = 0;
writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page
int succ = writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page
assert(succ);
}
return false;
}
@ -138,7 +139,9 @@ bool DataPage<TUPLE>::append(int xid, TUPLE const & dat)
if(PAGE_SIZE - (byte_offset % PAGE_SIZE) >= sizeof(int32_t))
{
dsize = 0;
writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page
int succ = writebytes(xid, sizeof(int32_t), (byte*)(&dsize));//this will succeed, since there is enough space on the page
assert(succ);
}
return true;
@ -205,12 +208,11 @@ bool DataPage<TUPLE>::recordRead(int xid, typename TUPLE::key_t key, size_t keyS
int match = -1;
while((*buf=itr.getnext(xid)) != 0)
{
match = TUPLE::compare((*buf)->get_key(), key);
match = TUPLE::compare((*buf)->key(), key);
if(match<0) //keep searching
{
free((*buf)->keylen);
free(*buf);
datatuple::freetuple(*buf);
*buf=0;
}
else if(match==0) //found
@ -219,8 +221,7 @@ bool DataPage<TUPLE>::recordRead(int xid, typename TUPLE::key_t key, size_t keyS
}
else // match > 0, then does not exist
{
free((*buf)->keylen);
free(*buf);
datatuple::freetuple(*buf);
*buf = 0;
break;
}
@ -230,12 +231,9 @@ bool DataPage<TUPLE>::recordRead(int xid, typename TUPLE::key_t key, size_t keyS
}
template <class TUPLE>
void DataPage<TUPLE>::readbytes(int xid, int32_t offset, int count, byte **data)
void DataPage<TUPLE>::readbytes(int xid, int32_t offset, int count, byte *data)
{
if(*data==NULL)
*data = (byte*)malloc(count);
int32_t bytes_copied = 0;
while(bytes_copied < count)
{
@ -251,7 +249,7 @@ void DataPage<TUPLE>::readbytes(int xid, int32_t offset, int count, byte **data)
int32_t copy_len = ( (PAGE_SIZE - page_offset < count - bytes_copied ) ? PAGE_SIZE - page_offset : count - bytes_copied);
byte * pb_ptr = stasis_page_byte_ptr_from_start(p, page_offset);
memcpy((*data)+bytes_copied, pb_ptr, copy_len);
memcpy((data)+bytes_copied, pb_ptr, copy_len);
//release the page
unlock(p->rwlatch);
@ -425,10 +423,11 @@ TUPLE* DataPage<TUPLE>::RecordIterator::getnext(int xid)
readlock(p->rwlatch,0);
int32_t *dsize_ptr;
int32_t scratch;
if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) //int spread in two pages
{
dsize_ptr = 0;
dp->readbytes(xid, offset, sizeof(int32_t), (byte**)(&dsize_ptr));
dsize_ptr = &scratch;
dp->readbytes(xid, offset, sizeof(int32_t), (byte*)dsize_ptr);
}
else //int in a single page
dsize_ptr = (int32_t*)stasis_page_byte_ptr_from_start(p, offset % PAGE_SIZE);
@ -442,10 +441,12 @@ TUPLE* DataPage<TUPLE>::RecordIterator::getnext(int xid)
return 0;
}
byte* tb=0;
dp->readbytes(xid, offset, *dsize_ptr, &tb);
byte* tb = (byte*)malloc(*dsize_ptr);
dp->readbytes(xid, offset, *dsize_ptr, tb);
TUPLE *tup = TUPLE::from_bytes(tb);
TUPLE *tup = TUPLE::from_bytes(tb); // This version of from_bytes does not consume its argument.
free(tb);
offset += *dsize_ptr;
@ -484,15 +485,18 @@ void DataPage<TUPLE>::RecordIterator::advance(int xid, int count)
readlock(p->rwlatch,0);
}
if(pindex == dp->pcount - 1 && (PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)))
return;
if(pindex == dp->pcount - 1 && (PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t))) {
assert(!p); // XXX Otherwise, was leaking page. do we reach this branch in testing?
return;
}
int32_t *dsize_ptr=0;
if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) //int spread in two pages
dp->readbytes(xid, offset, sizeof(int32_t), (byte**)(&dsize_ptr));
else //int in a single page
int32_t scratch;
if(PAGE_SIZE - (offset % PAGE_SIZE) < sizeof(int32_t)) { //int spread in two pages
dsize_ptr = &scratch;
dp->readbytes(xid, offset, sizeof(int32_t), (byte*)dsize_ptr);
} else { //int in a single page
dsize_ptr = (int32_t*)stasis_page_byte_ptr_from_start(p, offset % PAGE_SIZE);
}
offset += sizeof(int32_t);
if(*dsize_ptr == 0) //no more keys

View file

@ -59,7 +59,7 @@ public:
~DataPage();
bool append(int xid, TUPLE const & dat);
bool append(int xid, TUPLE const * dat);
bool recordRead(int xid, typename TUPLE::key_t key, size_t keySize, TUPLE ** buf);
inline uint16_t recordCount(int xid);
@ -89,10 +89,10 @@ private:
void incrementPageCount(int xid, pageid_t pid, int add=1);
bool writebytes(int xid, int count, byte *data);
inline void readbytes(int xid, int32_t offset, int count, byte **data=0);
inline void readbytes(int xid, int32_t offset, int count, byte *data);
private:
int pcount;
int32_t pcount;
pageid_t *pidarr;
int32_t byte_offset;//points to the next free byte

View file

@ -1,167 +1,147 @@
#ifndef _DATATUPLE_H_
#define _DATATUPLE_H_
typedef unsigned char uchar;
#include <string>
//#define byte unsigned char
typedef unsigned char byte;
#include <cstring>
//#include <stdio.h>
//#include <stdlib.h>
//#include <errno.h>
#include <assert.h>
typedef struct datatuple
{
typedef uchar* key_t;
typedef uchar* data_t;
static const size_t isize = sizeof(uint32_t);
uint32_t *keylen; //key length should be size of string + 1 for \n
uint32_t *datalen;
key_t key;
data_t data;
public:
typedef uint32_t len_t ;
typedef unsigned char* key_t ;
typedef unsigned char* data_t ;
private:
static const len_t DELETE = ((len_t)0) - 1;
len_t datalen_;
byte* key_;
byte* data_; // aliases key. data_ - 1 should be the \0 terminating key_.
datatuple* sanity_check() {
assert(keylen() < 3000);
return this;
}
public:
inline len_t keylen() const {
return data_ - key_;
}
inline len_t datalen() const {
return (datalen_ == DELETE) ? 0 : datalen_;
}
//returns the length of the byte array representation
len_t byte_length() const {
return sizeof(len_t) + sizeof(len_t) + keylen() + datalen();
}
static len_t length_from_header(len_t keylen, len_t datalen) {
return keylen + ((datalen == DELETE) ? 0 : datalen);
}
inline key_t key() const {
return key_;
}
inline data_t data() const {
return data_;
}
//this is used by the stl set
bool operator() (const datatuple& lhs, const datatuple& rhs) const
{
//std::basic_string<uchar> s1(lhs.key);
//std::basic_string<uchar> s2(rhs.key);
return strcmp((char*)lhs.key,(char*)rhs.key) < 0;
//return (*((int32_t*)lhs.key)) <= (*((int32_t*)rhs.key));
}
void clone(const datatuple& tuple) {
//create a copy
byte * arr = (byte*) malloc(tuple.byte_length());
keylen = (uint32_t*) arr;
*keylen = *tuple.keylen;
datalen = (uint32_t*) (arr+isize);
*datalen = *tuple.datalen;
key = (datatuple::key_t) (arr+isize+isize);
memcpy((byte*)key, (byte*)tuple.key, *keylen);
if(!tuple.isDelete())
{
data = (datatuple::data_t) (arr+isize+isize+ *keylen);
memcpy((byte*)data, (byte*)tuple.data, *datalen);
}
else
data = 0;
}
bool operator() (const datatuple* lhs, const datatuple* rhs) const {
return compare(lhs->key(), rhs->key()) < 0; //strcmp((char*)lhs.key(),(char*)rhs.key()) < 0;
}
/**
* return -1 if k1 < k2
* 0 if k1 == k2
* 1 of k1 > k2
**/
static int compare(const key_t k1,const key_t k2)
{
//for char* ending with \0
return strcmp((char*)k1,(char*)k2);
static int compare(const byte* k1,const byte* k2) {
// XXX string comparison is probably not the right approach.
//for char* ending with \0
return strcmp((char*)k1,(char*)k2);
}
//for int32_t
//printf("%d\t%d\n",(*((int32_t*)k1)) ,(*((int32_t*)k2)));
//return (*((int32_t*)k1)) <= (*((int32_t*)k2));
}
inline void setDelete() {
datalen_ = DELETE;
}
void setDelete()
{
*datalen = UINT_MAX;
}
inline bool isDelete() const {
return datalen_ == DELETE;
}
inline bool isDelete() const
{
return *datalen == UINT_MAX;
}
static std::string key_to_str(const byte* k) {
//for strings
return std::string((char*)k);
//for int
/*
std::ostringstream ostr;
ostr << *((int32_t*)k);
return ostr.str();
*/
}
static std::string key_to_str(const byte* k)
{
//for strings
return std::string((char*)k);
//for int
/*
std::ostringstream ostr;
ostr << *((int32_t*)k);
return ostr.str();
*/
}
//copy the tuple. does a deep copy of the contents.
datatuple* create_copy() const {
return create(key(), keylen(), data(), datalen_)->sanity_check();
}
//returns the length of the byte array representation
int32_t byte_length() const{
static const size_t isize = sizeof(uint32_t);
if(isDelete())
return isize + *keylen + isize;
else
return isize + *keylen + isize + (*datalen);
static datatuple* create(const void* key, len_t keylen) {
return create(key, keylen, 0, DELETE)->sanity_check();
}
static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) {
datatuple *ret = (datatuple*)malloc(sizeof(datatuple));
ret->key_ = (byte*)malloc(length_from_header(keylen, datalen));
memcpy(ret->key_, key, keylen);
ret->data_ = ret->key_ + keylen; // need to set this even if delete, since it encodes the key length.
if(datalen != DELETE) {
memcpy(ret->data_, data, datalen);
}
ret->datalen_ = datalen;
return ret->sanity_check();
}
//format: key length _ data length _ key _ data
byte * to_bytes() const {
static const size_t isize = sizeof(uint32_t);
byte * ret;
if(!isDelete())
ret = (byte*) malloc(isize + *keylen + isize + *datalen);
else
ret = (byte*) malloc(isize + *keylen + isize);
memcpy(ret, (byte*)(keylen), isize);
memcpy(ret+isize, (byte*)(datalen), isize);
memcpy(ret+isize+isize, key, *keylen);
if(!isDelete())
memcpy(ret+isize+isize+*keylen, data, *datalen);
byte *ret = (byte*)malloc(byte_length());
((len_t*)ret)[0] = keylen();
((len_t*)ret)[1] = datalen_;
memcpy(((len_t*)ret)+2, key_, length_from_header(keylen(), datalen_));
return ret;
}
//does not copy the data again
//just sets the pointers in the datatuple to
//right positions in the given arr
static datatuple* from_bytes(const byte * arr)
{
static const size_t isize = sizeof(uint32_t);
datatuple *dt = (datatuple*) malloc(sizeof(datatuple));
const byte* get_bytes(len_t *keylen, len_t *datalen) const {
*keylen = this->keylen();
*datalen = datalen_;
return key_;
}
dt->keylen = (uint32_t*) arr;
dt->datalen = (uint32_t*) (arr+isize);
dt->key = (key_t) (arr+isize+isize);
if(!dt->isDelete())
dt->data = (data_t) (arr+isize+isize+ *(dt->keylen));
else
dt->data = 0;
//format of buf: key _ data. The caller needs to 'peel' off key length and data length for this call.
static datatuple* from_bytes(len_t keylen, len_t datalen, byte* buf) {
datatuple *dt = (datatuple*) malloc(sizeof(datatuple));
dt->datalen_ = datalen;
dt->key_ = buf;
dt->data_ = dt->key_ + keylen;
return dt->sanity_check();
}
static datatuple* from_bytes(byte* buf) {
datatuple *dt = (datatuple*) malloc(sizeof(datatuple));
len_t keylen = ((len_t*)buf)[0];
dt->datalen_ = ((len_t*)buf)[1];
len_t buflen = length_from_header(keylen, dt->datalen_);
dt->key_ = (byte*)malloc(buflen);
memcpy(dt->key_,((len_t*)buf)+2,buflen);
dt->data_ = dt->key_ + keylen;
return dt;
}
/*
static datatuple form_tuple(const byte * arr)
{
static const size_t isize = sizeof(uint32_t);
datatuple dt;
return dt->sanity_check();
}
dt.keylen = (uint32_t*) arr;
dt.datalen = (uint32_t*) (arr+isize);
dt.key = (key_t) (arr+isize+isize);
if(!dt.isDelete())
dt.data = (data_t) (arr+isize+isize+ *(dt.keylen));
else
dt.data = 0;
static inline void freetuple(datatuple* dt) {
free(dt->key_);
free(dt);
}
return dt;
}
*/
byte * get_key() { return (byte*) key; }
byte * get_data() { return (byte*) data; }
//releases only the tuple
static void release(datatuple *dt)
{
free(dt);
}
} datatuple;

View file

@ -30,9 +30,8 @@ public:
if(done_) { return NULL; }
if(first_) { first_ = 0;} else { it_++; }
if(it_==itend_) { done_= true; return NULL; }
TUPLE *t = new TUPLE();
t->clone(*it_);
return t;
return (*it_)->create_copy();
}

View file

@ -45,6 +45,11 @@ void logserver::startserver(logtable *ltable)
worker_data->ready_queue = &ready_queue;
worker_data->work_queue = &work_queue;
#ifdef STATS_ENABLED
worker_data->num_reqs = 0;
#endif
worker_data->qlock = qlock;
worker_data->selcond = selcond;
@ -340,14 +345,14 @@ void *serverLoop(void *args)
}
//start listening on the server socket
//second arg is the max number of coonections waiting in queue
//second arg is the max number of connections waiting in queue
if(listen(sockfd,SOMAXCONN)==-1)
{
printf("ERROR on listen.\n");
return 0;
}
printf("LSM Server listenning...\n");
printf("LSM Server listening...\n");
*(sdata->server_socket) = sockfd;
int flag, result;
@ -427,22 +432,23 @@ void * thread_work_fn( void * args)
}
//step 1: read the opcode
uint8_t opcode;
ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t));
if(n == 0) {
network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST);
if(opcode == LOGSTORE_CONN_CLOSED_ERROR) {
opcode = OP_DONE;
n = sizeof(uint8_t);
printf("Obsolescent client closed connection uncleanly\n");
}
assert( n == sizeof(uint8_t));
assert( opcode < OP_INVALID );
if( opcode == OP_DONE ) //close the conn on failure
if( opcode == OP_DONE || (opiserror(opcode))) //close the conn on failure
{
pthread_mutex_lock(item->data->qlock);
printf("client done. conn closed. (%d, %d, %d, %d)\n",
n, errno, *(item->data->workitem), item->data->work_queue->size());
close(*(item->data->workitem));
pthread_mutex_lock(item->data->qlock);
if(opiserror(opcode)) {
printf("network error. conn closed. (%d, %d, %d, %d)\n",
opcode, errno, *(item->data->workitem), item->data->work_queue->size());
} else {
printf("client done. conn closed. (%d, %d)\n",
*(item->data->workitem), item->data->work_queue->size());
}
close(*(item->data->workitem));
if(item->data->work_queue->size() > 0)
{
@ -464,30 +470,8 @@ void * thread_work_fn( void * args)
}
//step 2: read the tuple from client
datatuple tuple;
tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t));
//read the key length
n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
//read the data length
n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
//read the key
tuple.key = (byte*) malloc(*tuple.keylen);
readfromsocket(*(item->data->workitem), (char*) tuple.key, *tuple.keylen);
//read the data
if(!tuple.isDelete() && opcode != OP_FIND)
{
tuple.data = (byte*) malloc(*tuple.datalen);
readfromsocket(*(item->data->workitem), (char*) tuple.data, *tuple.datalen);
}
else
tuple.data = 0;
//step 2: read the tuple from client
datatuple * tuple = readtuplefromsocket(*(item->data->workitem));
//step 3: process the tuple
//pthread_mutex_lock(item->data->table_lock);
//readlock(item->data->table_lock,0);
@ -500,67 +484,61 @@ void * thread_work_fn( void * args)
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
//step 4: send response
uint8_t rcode = OP_SUCCESS;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
uint8_t rcode = LOGSTORE_RESPONSE_SUCCESS;
int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS);
if(err) {
perror("could not respond to client");
}
}
else if(opcode == OP_FIND)
{
//find the tuple
datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen);
datatuple *dt = item->data->ltable->findTuple(-1, tuple->key(), tuple->keylen());
//unlock the lsmlock
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
#ifdef STATS_ENABLED
if(dt == 0)
DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key).c_str());
else if( *dt->datalen != 1024)
DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key).c_str(),
*dt->datalen);
if(dt == 0) {
DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str());
} else if( dt->datalen() != 1024) {
DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(),
dt->datalen);
if(datatuple::compare(tuple->key(), dt->key()) != 0) {
DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(),
datatuple::key_to_str(dt->key).c_str());
}
if(datatuple::compare(tuple.key, dt->key) != 0)
DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key).c_str(),
datatuple::key_to_str(dt->key).c_str());
}
#endif
if(dt == 0) //tuple deleted
bool dt_needs_free;
if(dt == 0) //tuple does not exist.
{
dt = (datatuple*) malloc(sizeof(datatuple));
dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen);
*dt->keylen = *tuple.keylen;
dt->datalen = dt->keylen + 1;
dt->key = (datatuple::key_t) (dt->datalen+1);
memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen);
dt->setDelete();
dt = tuple;
dt->setDelete();
dt_needs_free = false;
} else {
dt_needs_free = true;
}
//send the reply code
uint8_t rcode = OP_SENDING_TUPLE;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
//send the tuple
writetosocket(*(item->data->workitem), (char*) dt->keylen, dt->byte_length());
writetupletosocket(*(item->data->workitem), dt);
//free datatuple
free(dt->keylen);
free(dt);
if(dt_needs_free) {
datatuple::freetuple(dt);
}
}
//close the socket
//close(*(item->data->workitem));
//free the tuple
free(tuple.keylen);
free(tuple.datalen);
free(tuple.key);
free(tuple.data);
datatuple::freetuple(tuple);
//printf("socket %d: work completed.", *(item->data->workitem));
//printf("socket %d: work completed.", *(item->data->workitem));
pthread_mutex_lock(item->data->qlock);
@ -619,5 +597,3 @@ void * thread_work_fn( void * args)
return NULL;
}

View file

@ -81,31 +81,14 @@ struct serverth_data
std::queue<int> *ready_queue;
pthread_cond_t *selcond;
pthread_mutex_t *qlock;
pthread_mutex_t *qlock;
};
void * thread_work_fn( void *);
void * thread_work_fn( void *);
class logserver
{
public:
//server codes
// static uint8_t OP_SUCCESS;
// static uint8_t OP_FAIL;
// static uint8_t OP_SENDING_TUPLE;
//
// //client codes
// static uint8_t OP_FIND;
// static uint8_t OP_INSERT;
//
// static uint8_t OP_DONE;
//
// static uint8_t OP_INVALID;
public:
logserver(int nthreads, int server_port){
this->nthreads = nthreads;

View file

@ -8,6 +8,7 @@
#include "logiterators.h"
#include "datapage.cpp"
#include <stasis/page.h>
#include <stasis/page/slotted.h>
/////////////////////////////////////////////////////////////////
@ -292,7 +293,7 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID,
Page *p = loadPage(xid, tree.page);
writelock(p->rwlatch, 0);
//logtree_state *s = (logtree_state*)p->impl;
tree.slot = 0;
//tree.size = sizeof(lsmTreeNodeRecord)+keySize;
@ -367,20 +368,27 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID,
// NOTE: stasis_record_free call goes to slottedFree in slotted.c
// this function only reduces the numslots when you call it
// with the last slot. so thats why i go backwards here.
for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--)
printf("slots %d (%d) keysize=%lld\n", (int)*stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1, (long long int)keySize);
assert(*stasis_page_slotted_numslots_ptr(p) >= FIRST_SLOT+1);
for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--)
{
assert(*stasis_page_slotted_numslots_ptr(p) > FIRST_SLOT+1);
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0);
int reclen = readRecordLength(xid, p, i);
recordid tmp_rec= {p->id, i, reclen};
stasis_record_free(xid, p, tmp_rec);
}
}
//TODO: could change with stasis_slotted_page_initialize(...);
// TODO: fsck?
// stasis_page_slotted_initialize_page(p);
// reinsert first.
recordid pFirstSlot = { p->id, FIRST_SLOT, readRecordLength(xid, p, FIRST_SLOT)};
assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1);
if(*stasis_page_slotted_numslots_ptr(p) != FIRST_SLOT+1) {
printf("slots %d (%d)\n", *stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1);
assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1);
}
indexnode_rec *nr
= (indexnode_rec*)stasis_record_write_begin(xid, p, pFirstSlot);
@ -833,6 +841,21 @@ logtable::logtable()
}
void logtable::tearDownTree(rbtree_ptr_t tree) {
datatuple * t = 0;
for(rbtree_t::iterator delitr = tree->begin();
delitr != tree->end();
delitr++) {
if(t) {
datatuple::freetuple(t);
}
t = *delitr;
tree->erase(delitr);
}
if(t) { datatuple::freetuple(t); }
delete tree;
}
logtable::~logtable()
{
if(tree_c1 != NULL)
@ -842,23 +865,10 @@ logtable::~logtable()
if(tree_c0 != NULL)
{
for(rbtree_t::iterator delitr=tree_c0->begin();
delitr != tree_c0->end(); delitr++)
free((*delitr).keylen);
delete tree_c0;
tearDownTree(tree_c0);
}
delete tmerger;
/*
if(rbtree_mut)
delete rbtree_mut;
if(tree_c0)
delete tree_c0;
if(input_needed)
delete input_needed;
*/
}
recordid logtable::allocTable(int xid)
@ -917,7 +927,7 @@ void logtable::flushTable()
while(*mergedata->old_c0) {
unlock(mergedata->header_lock);
// pthread_mutex_lock(mergedata->rbtree_mut);
if(tree_bytes >= MAX_C0_SIZE)
if(tree_bytes >= max_c0_size)
pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut);
else
{
@ -974,13 +984,10 @@ void logtable::flushTable()
}
datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize)
{
//prepare a search tuple
datatuple search_tuple;
search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
*(search_tuple.keylen) = keySize;
search_tuple.key = key;
datatuple *search_tuple = datatuple::create(key, keySize);
readlock(mergedata->header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut);
@ -992,10 +999,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
if(rbitr != tree_c0->end())
{
DEBUG("tree_c0 size %d\n", tree_c0->size());
datatuple tuple = *rbitr;
byte *barr = (byte*)malloc(tuple.byte_length());
memcpy(barr, (byte*)tuple.keylen, tuple.byte_length());
ret_tuple = datatuple::from_bytes(barr);
ret_tuple = (*rbitr)->create_copy();
}
bool done = false;
@ -1006,22 +1010,19 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
rbitr = (*(mergedata->old_c0))->find(search_tuple);
if(rbitr != (*(mergedata->old_c0))->end())
{
datatuple tuple = *rbitr;
datatuple *tuple = *rbitr;
if(tuple.isDelete()) //tuple deleted
if(tuple->isDelete()) //tuple deleted
done = true; //return ret_tuple
else if(ret_tuple != 0) //merge the two
{
datatuple *mtuple = tmerger->merge(&tuple, ret_tuple); //merge the two
free(ret_tuple->keylen); //free tuple from current tree
free(ret_tuple);
datatuple *mtuple = tmerger->merge(tuple, ret_tuple); //merge the two
datatuple::freetuple(ret_tuple); //free tuple from current tree
ret_tuple = mtuple; //set return tuple to merge result
}
else //key first found in old mem tree
{
byte *barr = (byte*)malloc(tuple.byte_length());
memcpy(barr, (byte*)tuple.keylen, tuple.byte_length());
ret_tuple = datatuple::from_bytes(barr);
ret_tuple = tuple->create_copy();
}
//we cannot free tuple from old-tree 'cos it is not a copy
}
@ -1042,8 +1043,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
else if(ret_tuple != 0) //merge the two
{
datatuple *mtuple = tmerger->merge(tuple_c1, ret_tuple); //merge the two
free(ret_tuple->keylen); //free tuple from before
free(ret_tuple);
datatuple::freetuple(ret_tuple); //free tuple from before
ret_tuple = mtuple; //set return tuple to merge result
}
else //found for the first time
@ -1057,8 +1057,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
if(!use_copy)
{
free(tuple_c1->keylen); //free tuple from tree c1
free(tuple_c1);
datatuple::freetuple(tuple_c1); //free tuple from tree c1
}
}
}
@ -1078,8 +1077,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
else if(ret_tuple != 0) //merge the two
{
datatuple *mtuple = tmerger->merge(tuple_oc1, ret_tuple); //merge the two
free(ret_tuple->keylen); //free tuple from before
free(ret_tuple);
datatuple::freetuple(ret_tuple); //free tuple from before
ret_tuple = mtuple; //set return tuple to merge result
}
else //found for the first time
@ -1093,8 +1091,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
if(!use_copy)
{
free(tuple_oc1->keylen); //free tuple from tree old c1
free(tuple_oc1);
datatuple::freetuple(tuple_oc1); //free tuple from tree old c1
}
}
}
@ -1113,31 +1110,25 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
else if(ret_tuple != 0)
{
datatuple *mtuple = tmerger->merge(tuple_c2, ret_tuple); //merge the two
free(ret_tuple->keylen); //free tuple from before
free(ret_tuple);
datatuple::freetuple(ret_tuple); //free tuple from before
ret_tuple = mtuple; //set return tuple to merge result
}
else //found for the first time
{
use_copy = true;
ret_tuple = tuple_c2;
//byte *barr = (byte*)malloc(tuple_c2->byte_length());
//memcpy(barr, (byte*)tuple_c2->keylen, tuple_c2->byte_length());
//ret_tuple = datatuple::from_bytes(barr);
}
if(!use_copy)
{
free(tuple_c2->keylen); //free tuple from tree c2
free(tuple_c2);
datatuple::freetuple(tuple_c2); //free tuple from tree c2
}
}
}
//pthread_mutex_unlock(mergedata->rbtree_mut);
unlock(mergedata->header_lock);
free(search_tuple.keylen);
datatuple::freetuple(search_tuple);
return ret_tuple;
}
@ -1149,10 +1140,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize)
datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize)
{
//prepare a search tuple
datatuple search_tuple;
search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
*(search_tuple.keylen) = keySize;
search_tuple.key = key;
datatuple * search_tuple = datatuple::create(key, keySize);
pthread_mutex_lock(mergedata->rbtree_mut);
@ -1163,10 +1151,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
if(rbitr != tree_c0->end())
{
DEBUG("tree_c0 size %d\n", tree_c0->size());
datatuple tuple = *rbitr;
byte *barr = (byte*)malloc(tuple.byte_length());
memcpy(barr, (byte*)tuple.keylen, tuple.byte_length());
ret_tuple = datatuple::from_bytes(barr);
ret_tuple = (*rbitr)->create_copy();
}
else
@ -1179,10 +1164,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
rbitr = (*(mergedata->old_c0))->find(search_tuple);
if(rbitr != (*(mergedata->old_c0))->end())
{
datatuple tuple = *rbitr;
byte *barr = (byte*)malloc(tuple.byte_length());
memcpy(barr, (byte*)tuple.keylen, tuple.byte_length());
ret_tuple = datatuple::from_bytes(barr);
ret_tuple = (*rbitr)->create_copy();
}
}
@ -1221,13 +1203,13 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
pthread_mutex_unlock(mergedata->rbtree_mut);
free(search_tuple.keylen);
datatuple::freetuple(search_tuple);
return ret_tuple;
}
void logtable::insertTuple(struct datatuple &tuple)
void logtable::insertTuple(datatuple *tuple)
{
//static int count = LATCH_INTERVAL;
//static int tsize = 0; //number of tuples
@ -1240,29 +1222,27 @@ void logtable::insertTuple(struct datatuple &tuple)
rbtree_t::iterator rbitr = tree_c0->find(tuple);
if(rbitr != tree_c0->end())
{
datatuple pre_t = *rbitr;
datatuple *pre_t = *rbitr;
//do the merging
datatuple *new_t = tmerger->merge(&pre_t, &tuple);
datatuple *new_t = tmerger->merge(pre_t, tuple);
tree_c0->erase(pre_t); //remove the previous tuple
tree_c0->insert( *new_t); //insert the new tuple
tree_c0->insert(new_t); //insert the new tuple
//update the tree size (+ new_t size - pre_t size)
tree_bytes += (new_t->byte_length() - pre_t.byte_length());
free(pre_t.keylen); //free the previous tuple
free(new_t); // frees the malloc(sizeof(datatuple)) coming from merge
tree_bytes += (new_t->byte_length() - pre_t->byte_length());
datatuple::freetuple(pre_t); //free the previous tuple
}
else //no tuple with same key exists in mem-tree
{
datatuple t;
t.clone(tuple);
datatuple *t = tuple->create_copy();
//insert tuple into the rbtree
tree_c0->insert(t);
tsize++;
tree_bytes += t.byte_length() + RB_TREE_OVERHEAD;
tree_bytes += t->byte_length() + RB_TREE_OVERHEAD;
}
@ -1276,7 +1256,7 @@ void logtable::insertTuple(struct datatuple &tuple)
}
*/
if(tree_bytes >= MAX_C0_SIZE )
if(tree_bytes >= max_c0_size )
{
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
pthread_mutex_unlock(mergedata->rbtree_mut);
@ -1300,7 +1280,7 @@ void logtable::insertTuple(struct datatuple &tuple)
}
DataPage<datatuple>* logtable::insertTuple(int xid, struct datatuple &tuple, recordid &dpstate, logtree *ltree)
DataPage<datatuple>* logtable::insertTuple(int xid, datatuple *tuple, recordid &dpstate, logtree *ltree)
{
//create a new data page
@ -1326,8 +1306,8 @@ DataPage<datatuple>* logtable::insertTuple(int xid, struct datatuple &tuple, rec
//insert the record key and id of the first page of the datapage to the logtree
Tread(xid,ltree->get_tree_state(), &alloc_conf);
logtree::appendPage(xid, ltree->get_root_rec(), ltree->lastLeaf,
tuple.get_key(),
*tuple.keylen,
tuple->key(),
tuple->keylen(),
ltree->alloc_region,
&alloc_conf,
dp->get_start_pid()
@ -1507,6 +1487,7 @@ int logtreeIterator::next(int xid, lladdIterator_t *it)
}
else
{
assert(!impl->p);
if(impl->t != NULL)
free(impl->t);
impl->t = 0;

View file

@ -41,7 +41,8 @@ double tv_to_double(struct timeval tv);
struct logtable_mergedata;
typedef std::set<datatuple*, datatuple> rbtree_t;
typedef rbtree_t* rbtree_ptr_t;
typedef struct RegionAllocConf_t
{
@ -174,11 +175,11 @@ public:
~logtable();
//user access functions
datatuple * findTuple(int xid, datatuple::key_t key, size_t keySize);
datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize);
datatuple * findTuple_first(int xid, datatuple::key_t key, size_t keySize);
void insertTuple(struct datatuple &tuple);
void insertTuple(struct datatuple *tuple);
//other class functions
@ -186,9 +187,11 @@ public:
void flushTable();
DataPage<datatuple>* insertTuple(int xid, struct datatuple &tuple, recordid &dpstate,logtree *ltree);
static inline void tearDownTree(rbtree_ptr_t t);
datatuple * findTuple(int xid, datatuple::key_t key, size_t keySize, logtree *ltree);
DataPage<datatuple>* insertTuple(int xid, datatuple *tuple, recordid &dpstate,logtree *ltree);
datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree);
inline recordid & get_table_rec(){return table_rec;}
@ -198,8 +201,6 @@ public:
inline void set_tree_c1(logtree *t){tree_c1=t;}
inline void set_tree_c2(logtree *t){tree_c2=t;}
typedef std::set<datatuple, datatuple> rbtree_t;
typedef rbtree_t* rbtree_ptr_t;
inline rbtree_ptr_t get_tree_c0(){return tree_c0;}
void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;}
@ -233,6 +234,8 @@ public:
logtable_mergedata * mergedata;
int64_t max_c0_size;
private:

View file

@ -3,12 +3,13 @@
#include "merger.h"
#include "logiterators.cpp"
#include "datapage.h"
//pageid_t merge_scheduler::C0_MEM_SIZE = 1000 * 1000 * 1000;
//template <> struct merger_args<rbtree_t>;
//template <> struct merger_args<logtree>;
inline DataPage<datatuple>*
insertTuple(int xid, DataPage<datatuple> *dp, datatuple &t,
insertTuple(int xid, DataPage<datatuple> *dp, datatuple *t,
logtable *ltable,
logtree * ltree,
recordid & dpstate,
@ -124,8 +125,9 @@ void merge_scheduler::shutdown()
}
void merge_scheduler::startlogtable(int index)
void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
{
logtable * ltable = mergedata[index].first;
struct logtable_mergedata *mdata = mergedata[index].second;
@ -158,6 +160,8 @@ void merge_scheduler::startlogtable(int index)
recordid * oldridp = new recordid;
*oldridp = NULLRID;
ltable->max_c0_size = MAX_C0_SIZE;
logtree ** block1_scratch = new logtree*;
*block1_scratch=0;
@ -391,12 +395,12 @@ void* memMergeThread(void*arg)
double target_R = *(a->r_i);
double new_c1_size = npages * PAGE_SIZE;
assert(target_R >= MIN_R);
if( (new_c1_size / MAX_C0_SIZE > target_R) ||
if( (new_c1_size / ltable->max_c0_size > target_R) ||
(a->max_size && new_c1_size > a->max_size ) )
{
printf("mmt:\tsignaling C2 for merge\n");
printf("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
MAX_C0_SIZE, a->max_size, target_R);
ltable->max_c0_size, a->max_size, target_R);
// XXX need to report backpressure here!
while(*a->out_tree) {
@ -454,22 +458,10 @@ void* memMergeThread(void*arg)
//TODO: get the freeing outside of the lock
//// ----------- Free in_tree
for(rbtree_t::iterator delitr=deltree->begin();
delitr != deltree->end(); delitr++)
free((*delitr).keylen);
delete deltree;
logtable::tearDownTree(deltree);
//deltree = 0;
/*
for(rbtree_t::iterator delitr=(*a->in_tree)->begin();
delitr != (*a->in_tree)->end(); delitr++)
free((*delitr).keylen);
delete *a->in_tree;
*a->in_tree = 0;
*/
}
//pthread_mutex_unlock(a->block_ready_mut);
@ -580,7 +572,7 @@ void *diskMergeThread(void*arg)
merge_count++;
*a->my_tree_size = mergedPages;
//update the current optimal R value
*(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (MAX_C0_SIZE/PAGE_SIZE) ) );
*(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) );
printf("dmt:\tmerge_count %d\t#written pages: %lld\n optimal r %.2f", merge_count, npages, *(a->r_i));
@ -664,55 +656,50 @@ int64_t merge_iterators(int xid,
DEBUG("tuple\t%lld: keylen %d datalen %d\n",
ntuples, *(t2->keylen),*(t2->datalen) );
while(t1 != 0 && datatuple::compare(t1->key, t2->key) < 0) // t1 is less than t2
while(t1 != 0 && datatuple::compare(t1->key(), t2->key()) < 0) // t1 is less than t2
{
//insert t1
dp = insertTuple(xid, dp, *t1, ltable, scratch_tree,
dp = insertTuple(xid, dp, t1, ltable, scratch_tree,
ltable->get_dpstate2(),
dpages, npages);
free(t1->keylen);
free(t1);
datatuple::freetuple(t1);
ntuples++;
//advance itrA
t1 = itrA->getnext();
}
if(t1 != 0 && datatuple::compare(t1->key, t2->key) == 0)
if(t1 != 0 && datatuple::compare(t1->key(), t2->key()) == 0)
{
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
//insert merged tuple, drop deletes
if(dropDeletes && !mtuple->isDelete())
dp = insertTuple(xid, dp, *mtuple, ltable, scratch_tree, ltable->get_dpstate2(),
dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, ltable->get_dpstate2(),
dpages, npages);
free(t1->keylen);
free(t1);
datatuple::freetuple(t1);
t1 = itrA->getnext(); //advance itrA
free(mtuple->keylen);
free(mtuple);
datatuple::freetuple(mtuple);
}
else
{
//insert t2
dp = insertTuple(xid, dp, *t2, ltable, scratch_tree, ltable->get_dpstate2(),
dp = insertTuple(xid, dp, t2, ltable, scratch_tree, ltable->get_dpstate2(),
dpages, npages);
// cannot free any tuples here; they may still be read through a lookup
}
free(t2->keylen);
free(t2);
datatuple::freetuple(t2);
ntuples++;
}
while(t1 != 0) // t1 is less than t2
{
dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate2(),
dp = insertTuple(xid, dp, t1, ltable, scratch_tree, ltable->get_dpstate2(),
dpages, npages);
free(t1->keylen);
free(t1);
datatuple::freetuple(t1);
ntuples++;
//advance itrA
t1 = itrA->getnext();
@ -727,9 +714,8 @@ int64_t merge_iterators(int xid,
}
inline DataPage<datatuple>*
insertTuple(int xid, DataPage<datatuple> *dp, datatuple &t,
insertTuple(int xid, DataPage<datatuple> *dp, datatuple *t,
logtable *ltable,
logtree * ltree,
recordid & dpstate,

View file

@ -7,12 +7,8 @@
#include "logstore.h"
#include "logiterators.h"
typedef std::set<datatuple, datatuple> rbtree_t;
typedef rbtree_t* rbtree_ptr_t;
//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true...
static const int RB_TREE_OVERHEAD = 400;
static const int64_t MAX_C0_SIZE = 800 *1024*1024; //max size of c0
static const double MIN_R = 3.0;
//T is either logtree or red-black tree
template <class T>
@ -91,7 +87,7 @@ public:
~merge_scheduler();
int addlogtable(logtable * ltable);
void startlogtable(int index);
void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024);
struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;}

143
network.h
View file

@ -9,39 +9,156 @@
#define NETWORK_H_
#include <stdio.h>
#include <errno.h>
typedef uint8_t network_op_t;
//server codes
static const uint8_t OP_SUCCESS = 1;
static const uint8_t OP_FAIL = 2;
static const uint8_t OP_SENDING_TUPLE = 3;
static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1;
static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1;
static const network_op_t LOGSTORE_RESPONSE_FAIL = 2;
static const network_op_t LOGSTORE_RESPONSE_SENDING_TUPLES = 3;
static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3;
//client codes
static const uint8_t OP_FIND = 4;
static const uint8_t OP_INSERT = 5;
static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8;
static const network_op_t OP_INSERT = 8; // Create, Update, Delete
static const network_op_t OP_FIND = 9; // Read
static const uint8_t OP_DONE = 6;
static const network_op_t OP_DONE = 10; // Please close the connection.
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 10;
static const uint8_t OP_INVALID = 32;
//error codes
static const network_op_t LOGSTORE_FIRST_ERROR = 28;
static const network_op_t LOGSTORE_CONN_CLOSED_ERROR = 28; // Unexpected EOF
static const network_op_t LOGSTORE_SOCKET_ERROR = 29; // The OS returned an error.
static const network_op_t LOGSTORE_REMOTE_ERROR = 30; // The other side didn't like our request
static const network_op_t LOGSTORE_PROTOCOL_ERROR = 31; // The other side responeded with gibberish.
static const network_op_t LOGSTORE_LAST_ERROR = 31;
static const network_op_t OP_INVALID = 32;
typedef enum {
LOGSTORE_CLIENT_REQUEST,
LOGSTORE_SERVER_RESPONSE
} logstore_opcode_type;
static inline void readfromsocket(int sockd, char *buf, int count)
static inline int readfromsocket(int sockd, void *buf, ssize_t count)
{
int n = 0;
ssize_t n = 0;
while( n < count )
{
n += read( sockd, buf + n, count - n);
ssize_t i = read( sockd, ((byte*)buf) + n, count - n);
if(i == -1) {
perror("readfromsocket failed");
return errno;
} else if(i == 0) {
errno = EOF;
return errno;
}
n += i;
}
return 0;
}
static inline void writetosocket(int sockd, char *buf, int count)
static inline int writetosocket(int sockd, const void *buf, ssize_t count)
{
int n = 0;
ssize_t n = 0;
while( n < count )
{
n += write( sockd, buf + n, count - n);
ssize_t i = write( sockd, ((byte*)buf) + n, count - n);
if(i == -1) {
perror("writetosocket failed");
return errno;
} else if(i == 0) {
errno = EOF;
return errno;
}
n += i;
}
return 0;
}
static inline bool opiserror(network_op_t op) {
return (LOGSTORE_FIRST_ERROR <= op && op <= LOGSTORE_LAST_ERROR);
}
static inline bool opisrequest(network_op_t op) {
return (LOGSTORE_FIRST_REQUEST_CODE <= op && op <= LOGSTORE_LAST_REQUEST_CODE);
}
static inline bool opisresponse(network_op_t op) {
return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE);
}
static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type) {
network_op_t ret;
ssize_t n = read(sockd, &ret, sizeof(network_op_t));
if(n == sizeof(network_op_t)) {
// done.
} else if(n == 0) { // EOF
perror("Socket closed mid request.");
return LOGSTORE_CONN_CLOSED_ERROR;
} else {
assert(n == -1); // sizeof(network_op_t) is 1, so short reads are impossible.
perror("Could not read opcode from socket");
return LOGSTORE_SOCKET_ERROR;
}
// sanity checking
switch(type) {
case LOGSTORE_CLIENT_REQUEST: {
if(!(opisrequest(ret) || opiserror(ret))) {
fprintf(stderr, "Read invalid request code %d\n", (int)ret);
if(opisresponse(ret)) {
fprintf(stderr, "(also, the request code is a valid response code)\n");
}
ret = LOGSTORE_PROTOCOL_ERROR;
}
} break;
case LOGSTORE_SERVER_RESPONSE: {
if(!(opisresponse(ret) || opiserror(ret))) {
fprintf(stderr, "Read invalid response code %d\n", (int)ret);
if(opisrequest(ret)) {
fprintf(stderr, "(also, the response code is a valid request code)\n");
}
ret = LOGSTORE_PROTOCOL_ERROR;
}
}
}
return ret;
}
static inline int writeoptosocket(int sockd, network_op_t op) {
assert(opiserror(op) || opisrequest(op) || opisresponse(op));
return writetosocket(sockd, &op, sizeof(network_op_t));
}
static inline datatuple* readtuplefromsocket(int sockd) {
datatuple::len_t keylen, datalen, buflen;
if( readfromsocket(sockd, &keylen, sizeof(keylen)) ) return NULL;
if( readfromsocket(sockd, &datalen, sizeof(datalen)) ) return NULL;
buflen = datatuple::length_from_header(keylen, datalen);
byte* bytes = (byte*) malloc(buflen);
if( readfromsocket(sockd, bytes, buflen) ) return NULL;
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
}
static inline int writetupletosocket(int sockd, const datatuple* tup) {
datatuple::len_t keylen, datalen;
const byte* buf = tup->get_bytes(&keylen, &datalen);
int err;
if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err;
if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err;
if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err;
return 0;
}
#endif /* NETWORK_H_ */

View file

@ -38,19 +38,14 @@ void terminate (int param)
exit(0);
}
void insertProbeIter(int NUM_ENTRIES)
void initialize_server()
{
//signal handling
void (*prev_fn)(int);
prev_fn = signal (SIGINT,terminate);
//if (prev_fn==SIG_IGN)
//signal (SIGTERM,SIG_IGN);
sync();
bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE;
bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE;
Tinit();
@ -59,8 +54,6 @@ void insertProbeIter(int NUM_ENTRIES)
mscheduler = new merge_scheduler;
logtable ltable;
int pcount = 40;
ltable.set_fixed_page_count(pcount);
@ -71,16 +64,13 @@ void insertProbeIter(int NUM_ENTRIES)
int lindex = mscheduler->addlogtable(&ltable);
ltable.setMergeData(mscheduler->getMergeData(lindex));
mscheduler->startlogtable(lindex);
int64_t c0_size = 1024 * 1024 * 10;
printf("warning: running w/ tiny c0 for testing"); // XXX
mscheduler->startlogtable(lindex, c0_size);
lserver = new logserver(10, 32432);
lserver->startserver(&ltable);
// Tdeinit();
}
@ -90,18 +80,7 @@ void insertProbeIter(int NUM_ENTRIES)
*/
int main()
{
//insertProbeIter(25000);
insertProbeIter(10000);
/*
insertProbeIter(5000);
insertProbeIter(2500);
insertProbeIter(1000);
insertProbeIter(500);
insertProbeIter(1000);
insertProbeIter(100);
insertProbeIter(10);
*/
return 0;
initialize_server();
abort(); // can't get here.
}

View file

@ -78,12 +78,17 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time
return ret;
}
static inline void close_conn(logstore_handle_t *l) {
printf("read/write err.. conn closed.\n");
close(l->server_socket); //close the connection
l->server_socket = -1;
}
datatuple *
logstore_client_op(logstore_handle_t *l,
// int *server_socket,
// struct sockaddr_in serveraddr,
// struct hostent *server,
uint8_t opcode, datatuple &tuple)
uint8_t opcode, datatuple * tuple)
{
if(l->server_socket < 0)
@ -122,60 +127,27 @@ logstore_client_op(logstore_handle_t *l,
}
//send the opcode
int n = write(l->server_socket, (byte*) &opcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
if( writetosocket(l->server_socket, &opcode, sizeof(opcode)) ) { close_conn(l); return 0; }
//send the tuple
n = write(l->server_socket, (byte*) tuple.keylen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
if( writetupletosocket(l->server_socket, tuple) ) { close_conn(l); return 0; }
n = write(l->server_socket, (byte*) tuple.datalen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
network_op_t rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE);
writetosocket(l->server_socket, (char*) tuple.key, *tuple.keylen);
if(!tuple.isDelete() && *tuple.datalen != 0)
writetosocket(l->server_socket, (char*) tuple.data, *tuple.datalen);
if( opiserror(rcode) ) { close_conn(l); return 0; }
//printf("\nssocket %d ", *server_socket);
//read the reply code
uint8_t rcode;
n = read(l->server_socket, (byte*) &rcode, sizeof(uint8_t));
if( n <= 0 )
{
printf("read err.. conn closed.\n");
close(l->server_socket); //close the connection
l->server_socket = -1;
return 0;
}
//printf("rdone\n");
datatuple * ret;
if(rcode == OP_SENDING_TUPLE)
{
datatuple *rcvdtuple = (datatuple*)malloc(sizeof(datatuple));
//read the keylen
rcvdtuple->keylen = (uint32_t*) malloc(sizeof(uint32_t));
n = read(l->server_socket, (char*) rcvdtuple->keylen, sizeof(uint32_t));
assert(n == sizeof(uint32_t));
//read the datalen
rcvdtuple->datalen = (uint32_t*) malloc(sizeof(uint32_t));
n = read(l->server_socket, (byte*) rcvdtuple->datalen, sizeof(uint32_t));
assert(n == sizeof(uint32_t));
//read key
rcvdtuple->key = (byte*) malloc(*rcvdtuple->keylen);
readfromsocket(l->server_socket, (char*) rcvdtuple->key, *rcvdtuple->keylen);
if(!rcvdtuple->isDelete())
{
//read key
rcvdtuple->data = (byte*) malloc(*rcvdtuple->datalen);
readfromsocket(l->server_socket, (char*) rcvdtuple->data, *rcvdtuple->datalen);
}
ret = rcvdtuple;
} else if(rcode == OP_SUCCESS) {
ret = &tuple;
if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES)
{
ret = readtuplefromsocket(l->server_socket);
} else if(rcode == LOGSTORE_RESPONSE_SUCCESS) {
ret = tuple;
} else {
assert(rcode == LOGSTORE_RESPONSE_FAIL); // if this is an invalid response, we should have noticed above
ret = 0;
}

View file

@ -16,7 +16,7 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time
datatuple * logstore_client_op(logstore_handle_t* l,
uint8_t opcode,
datatuple &tuple);
datatuple *tuple);
int logstore_client_close(logstore_handle_t* l);

View file

@ -78,23 +78,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = key_arr[i].length()+1;
newtuple.keylen = &keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
for(size_t j=0; j<keylen-1; j++)
newtuple.key[j] = key_arr[i][j];
newtuple.key[keylen-1]='\0';
//prepare the data
uint32_t datalen = data_arr[i].length()+1;
newtuple.datalen = &datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
for(size_t j=0; j<datalen-1; j++)
newtuple.data[j] = data_arr[i][j];
newtuple.data[datalen-1]='\0';
datatuple *newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1, data_arr[i].c_str(), data_arr[i].length()+1);
/*
printf("key: \t, keylen: %u\ndata: datalen: %u\n",
@ -103,7 +87,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
//newtuple.data,
*newtuple.datalen);
*/
datasize += newtuple.byte_length();
datasize += newtuple->byte_length();
if(dp==NULL || !dp->append(xid, newtuple))
{
dpages++;
@ -145,46 +129,19 @@ void insertProbeIter(size_t NUM_ENTRIES)
datatuple *dt=0;
while( (dt=itr.getnext(xid)) != NULL)
{
assert(*(dt->keylen) == key_arr[tuplenum].length()+1);
assert(*(dt->datalen) == data_arr[tuplenum].length()+1);
assert(dt->keylen() == key_arr[tuplenum].length()+1);
assert(dt->datalen() == data_arr[tuplenum].length()+1);
tuplenum++;
free(dt->keylen);
free(dt);
datatuple::freetuple(dt);
dt = 0;
}
}
printf("Reads completed.\n");
/*
int64_t count = 0;
lladdIterator_t * it = logtreeIterator::open(xid, tree);
while(logtreeIterator::next(xid, it)) {
byte * key;
byte **key_ptr = &key;
int keysize = logtreeIterator::key(xid, it, (byte**)key_ptr);
pageid_t *value;
pageid_t **value_ptr = &value;
int valsize = lsmTreeIterator_value(xid, it, (byte**)value_ptr);
//printf("keylen %d key %s\n", keysize, (char*)(key)) ;
assert(valsize == sizeof(pageid_t));
assert(!mycmp(std::string((char*)key), arr[count]) && !mycmp(arr[count],std::string((char*)key)));
assert(keysize == arr[count].length()+1);
count++;
}
assert(count == NUM_ENTRIES);
logtreeIterator::close(xid, it);
*/
Tcommit(xid);
Tdeinit();
Tcommit(xid);
Tdeinit();
}

View file

@ -77,32 +77,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
std::vector<pageid_t> dsp;
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = key_arr[i].length()+1;
newtuple.keylen = &keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
for(size_t j=0; j<keylen-1; j++)
newtuple.key[j] = key_arr[i][j];
newtuple.key[keylen-1]='\0';
//prepare the tuple
datatuple* newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1, data_arr[i].c_str(), data_arr[i].length()+1);
//prepare the data
uint32_t datalen = data_arr[i].length()+1;
newtuple.datalen = &datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
for(size_t j=0; j<datalen-1; j++)
newtuple.data[j] = data_arr[i][j];
newtuple.data[datalen-1]='\0';
// printf("key: \t, keylen: %u\ndata: datalen: %u\n",
//newtuple.key,
// *newtuple.keylen,
//newtuple.data,
// *newtuple.datalen);
datasize += newtuple.byte_length();
datasize += newtuple->byte_length();
if(dp == NULL)
{
@ -122,10 +100,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
}
}
free(newtuple.key);
free(newtuple.data);
datatuple::freetuple(newtuple);
}
printf("\nTREE STRUCTURE\n");
@ -139,12 +114,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
Tcommit(xid);
xid = Tbegin();
printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES);
size_t tuplenum = 0;
treeIterator<datatuple> tree_itr(tree_root);
@ -153,11 +123,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
datatuple *dt=0;
while( (dt=tree_itr.getnext()) != NULL)
{
assert(*(dt->keylen) == key_arr[tuplenum].length()+1);
assert(*(dt->datalen) == data_arr[tuplenum].length()+1);
assert(dt->keylen() == key_arr[tuplenum].length()+1);
assert(dt->datalen() == data_arr[tuplenum].length()+1);
tuplenum++;
free(dt->keylen);
free(dt);
datatuple::freetuple(dt);
dt = 0;
}
@ -173,21 +142,12 @@ void insertProbeIter(size_t NUM_ENTRIES)
//randomly pick a key
int ri = rand()%key_arr.size();
//get the key
uint32_t keylen = key_arr[ri].length()+1;
datatuple::key_t rkey = (datatuple::key_t) malloc(keylen);
for(size_t j=0; j<keylen-1; j++)
rkey[j] = key_arr[ri][j];
rkey[keylen-1]='\0';
//find the key with the given tuple
datatuple *dt = ltable.findTuple(xid, rkey, keylen, lt);
datatuple *dt = ltable.findTuple(xid, (const datatuple::key_t) key_arr[ri].c_str(), (size_t)key_arr[ri].length()+1, lt);
assert(dt!=0);
assert(*(dt->keylen) == key_arr[ri].length()+1);
assert(*(dt->datalen) == data_arr[ri].length()+1);
free(dt->keylen);
free(dt);
assert(dt->keylen() == key_arr[ri].length()+1);
assert(dt->datalen() == data_arr[ri].length()+1);
datatuple::freetuple(dt);
dt = 0;
}

View file

@ -72,34 +72,14 @@ void insertProbeIter(size_t NUM_ENTRIES)
struct timeval start_tv, stop_tv, ti_st, ti_end;
double insert_time = 0;
int dpages = 0;
int npages = 0;
DataPage<datatuple> *dp=0;
int64_t datasize = 0;
std::vector<pageid_t> dsp;
gettimeofday(&start_tv,0);
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = (*key_arr)[i].length()+1;
newtuple.keylen = &keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen);
//for(int j=0; j<keylen-1; j++)
// newtuple.key[j] = (*key_arr)[i][j];
//newtuple.key[keylen-1]='\0';
datatuple *newtuple = datatuple::create((*key_arr)[i].c_str(), (*key_arr)[i].length()+1,(*data_arr)[i].c_str(), (*data_arr)[i].length()+1);
//prepare the data
uint32_t datalen = (*data_arr)[i].length()+1;
newtuple.datalen = &datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
memcpy((byte*)newtuple.data, (*data_arr)[i].c_str(), datalen);
// for(int j=0; j<datalen-1; j++)
// newtuple.data[j] = (*data_arr)[i][j];
// newtuple.data[datalen-1]='\0';
/*
printf("key: \t, keylen: %u\ndata: datalen: %u\n",
//newtuple.key,
@ -108,15 +88,14 @@ void insertProbeIter(size_t NUM_ENTRIES)
*newtuple.datalen);
*/
datasize += newtuple.byte_length();
datasize += newtuple->byte_length();
gettimeofday(&ti_st,0);
ltable.insertTuple(newtuple);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
free(newtuple.key);
free(newtuple.data);
datatuple::freetuple(newtuple);
}
gettimeofday(&stop_tv,0);
@ -155,11 +134,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
assert(dt!=0);
//if(dt!=0)
{
found_tuples++;
assert(*(dt->keylen) == (*key_arr)[ri].length()+1);
assert(*(dt->datalen) == (*data_arr)[ri].length()+1);
free(dt->keylen);
free(dt);
found_tuples++;
assert(dt->keylen() == (*key_arr)[ri].length()+1);
assert(dt->datalen() == (*data_arr)[ri].length()+1);
datatuple::freetuple(dt);
}
dt = 0;
free(rkey);

View file

@ -68,54 +68,26 @@ void insertProbeIter(size_t NUM_ENTRIES)
struct timeval start_tv, stop_tv, ti_st, ti_end;
double insert_time = 0;
int dpages = 0;
int npages = 0;
DataPage<datatuple> *dp=0;
int64_t datasize = 0;
std::vector<pageid_t> dsp;
gettimeofday(&start_tv,0);
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = (*key_arr)[i].length()+1;
newtuple.keylen = &keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen);
//for(int j=0; j<keylen-1; j++)
// newtuple.key[j] = (*key_arr)[i][j];
//newtuple.key[keylen-1]='\0';
//prepare the data
std::string ditem;
getnextdata(ditem, 10*8192);
uint32_t datalen = ditem.length()+1;
newtuple.datalen = &datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
memcpy((byte*)newtuple.data, ditem.c_str(), datalen);
// for(int j=0; j<datalen-1; j++)
// newtuple.data[j] = (*data_arr)[i][j];
// newtuple.data[datalen-1]='\0';
/*
printf("key: \t, keylen: %u\ndata: datalen: %u\n",
//newtuple.key,
*newtuple.keylen,
//newtuple.data,
*newtuple.datalen);
*/
datasize += newtuple.byte_length();
//prepare the tuple
datatuple *newtuple = datatuple::create((*key_arr)[i].c_str(), (*key_arr)[i].length()+1, ditem.c_str(), ditem.length()+1);
datasize += newtuple->byte_length();
gettimeofday(&ti_st,0);
ltable.insertTuple(newtuple);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
free(newtuple.key);
free(newtuple.data);
datatuple::freetuple(newtuple);
}
gettimeofday(&stop_tv,0);
printf("insert time: %6.1f\n", insert_time);
@ -124,52 +96,6 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("\nTREE STRUCTURE\n");
//ltable.get_tree_c1()->print_tree(xid);
printf("datasize: %lld\n", datasize);
//sleep(20);
/*
//Tcommit(xid);
xid = Tbegin();
printf("Stage 2: Looking up %d keys:\n", NUM_ENTRIES);
int found_tuples=0;
for(int i=NUM_ENTRIES-1; i>=0; i--)
{
int ri = i;
//printf("key index%d\n", i);
fflush(stdout);
//get the key
uint32_t keylen = (*key_arr)[ri].length()+1;
datatuple::key_t rkey = (datatuple::key_t) malloc(keylen);
memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen);
//for(int j=0; j<keylen-1; j++)
//rkey[j] = (*key_arr)[ri][j];
//rkey[keylen-1]='\0';
//find the key with the given tuple
datatuple *dt = ltable.findTuple(xid, rkey, keylen);
assert(dt!=0);
//if(dt!=0)
{
found_tuples++;
assert(*(dt->keylen) == (*key_arr)[ri].length()+1);
//assert(*(dt->datalen) == (*data_arr)[ri].length()+1);
free(dt->keylen);
free(dt);
}
dt = 0;
free(rkey);
}
printf("found %d\n", found_tuples);
key_arr->clear();
//data_arr->clear();
delete key_arr;
//delete data_arr;
*/
mscheduler.shutdown();
printf("merge threads finished.\n");

View file

@ -21,8 +21,8 @@
void insertProbeIter(size_t NUM_ENTRIES)
{
srand(1000);
//unlink("storefile.txt");
//unlink("logfile.txt");
unlink("storefile.txt");
unlink("logfile.txt");
sync();
double delete_freq = .05;
@ -91,11 +91,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
key_v_list->clear();
delete key_v_list;
// preprandstr(NUM_ENTRIES, data_arr, 10*8192);
printf("key arr size: %d\n", key_arr->size());
//removeduplicates(key_arr);
if(key_arr->size() > NUM_ENTRIES)
key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end());
@ -138,45 +135,21 @@ void insertProbeIter(size_t NUM_ENTRIES)
gettimeofday(&start_tv,0);
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = (*key_arr)[i].length()+1;
newtuple.keylen = &keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen);
//for(int j=0; j<keylen-1; j++)
// newtuple.key[j] = (*key_arr)[i][j];
//newtuple.key[keylen-1]='\0';
//prepare the data
std::string ditem;
getnextdata(ditem, 8192);
uint32_t datalen = ditem.length()+1;
newtuple.datalen = &datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
memcpy((byte*)newtuple.data, ditem.c_str(), datalen);
// for(int j=0; j<datalen-1; j++)
// newtuple.data[j] = (*data_arr)[i][j];
// newtuple.data[datalen-1]='\0';
//prepare the key
datatuple *newtuple = datatuple::create((*key_arr)[i].c_str(), (*key_arr)[i].length()+1, ditem.c_str(), ditem.length()+1);
/*
printf("key: \t, keylen: %u\ndata: datalen: %u\n",
//newtuple.key,
*newtuple.keylen,
//newtuple.data,
*newtuple.datalen);
*/
datasize += newtuple.byte_length();
datasize += newtuple->byte_length();
gettimeofday(&ti_st,0);
ltable.insertTuple(newtuple);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
free(newtuple.key);
free(newtuple.data);
datatuple::freetuple(newtuple);
double rval = ((rand() % 100)+.0)/100;
if( rval < delete_freq) //delete a key
@ -185,22 +158,15 @@ void insertProbeIter(size_t NUM_ENTRIES)
if(del_index >= 0 && std::find(del_list.begin(), del_list.end(), del_index) == del_list.end())
{
delcount++;
datatuple deltuple;
keylen = (*key_arr)[del_index].length()+1;
deltuple.keylen = &keylen;
deltuple.key = (datatuple::key_t) malloc(keylen);
memcpy((byte*)deltuple.key, (*key_arr)[del_index].c_str(), keylen);
deltuple.datalen = &datalen;
deltuple.setDelete();
datatuple *deltuple = datatuple::create((*key_arr)[del_index].c_str(), (*key_arr)[del_index].length()+1);
gettimeofday(&ti_st,0);
ltable.insertTuple(deltuple);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
free(deltuple.key);
datatuple::freetuple(deltuple);
del_list.push_back(del_index);
@ -211,28 +177,17 @@ void insertProbeIter(size_t NUM_ENTRIES)
int up_index = i - (rand()%50); //update one of the last inserted 50 elements
if(up_index >= 0 && std::find(del_list.begin(), del_list.end(), up_index) == del_list.end())
{//only update non-deleted elements
upcount++;
datatuple uptuple;
keylen = (*key_arr)[up_index].length()+1;
uptuple.keylen = &keylen;
uptuple.key = (datatuple::key_t) malloc(keylen);
memcpy((byte*)uptuple.key, (*key_arr)[up_index].c_str(), keylen);
getnextdata(ditem, 512);
datalen = ditem.length()+1;
uptuple.datalen = &datalen;
uptuple.data = (datatuple::data_t) malloc(datalen);
memcpy((byte*)uptuple.data, ditem.c_str(), datalen);
upcount++;
datatuple *uptuple = datatuple::create((*key_arr)[up_index].c_str(), (*key_arr)[up_index].length()+1,
ditem.c_str(), ditem.length()+1);
gettimeofday(&ti_st,0);
ltable.insertTuple(uptuple);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
free(uptuple.key);
free(uptuple.data);
datatuple::freetuple(uptuple);
}
}
@ -244,9 +199,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("#deletions: %d\n#updates: %d\n", delcount, upcount);
printf("\nTREE STRUCTURE\n");
//ltable.get_tree_c1()->print_tree(xid);
printf("datasize: %lld\n", datasize);
//sleep(20);
Tcommit(xid);
xid = Tbegin();
@ -259,16 +212,11 @@ void insertProbeIter(size_t NUM_ENTRIES)
for(int i=NUM_ENTRIES-1; i>=0; i--)
{
int ri = i;
//printf("key index%d\n", i);
fflush(stdout);
//get the key
uint32_t keylen = (*key_arr)[ri].length()+1;
datatuple::key_t rkey = (datatuple::key_t) malloc(keylen);
memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen);
//for(int j=0; j<keylen-1; j++)
//rkey[j] = (*key_arr)[ri][j];
//rkey[keylen-1]='\0';
//find the key with the given tuple
datatuple *dt = ltable.findTuple(xid, rkey, keylen);
@ -278,19 +226,16 @@ void insertProbeIter(size_t NUM_ENTRIES)
assert(dt!=0);
assert(!dt->isDelete());
found_tuples++;
assert(*(dt->keylen) == (*key_arr)[ri].length()+1);
//assert(*(dt->datalen) == (*data_arr)[ri].length()+1);
free(dt->keylen);
free(dt);
assert(dt->keylen() == (*key_arr)[ri].length()+1);
datatuple::freetuple(dt);
}
else
{
if(dt!=0)
{
assert(*(dt->keylen) == (*key_arr)[ri].length()+1);
assert(dt->keylen() == (*key_arr)[ri].length()+1);
assert(dt->isDelete());
free(dt->keylen);
free(dt);
datatuple::freetuple(dt);
}
}
dt = 0;
@ -326,7 +271,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
*/
int main()
{
//insertProbeIter(25000);
// insertProbeIter(25000);
insertProbeIter(400000);
/*
insertProbeIter(5000);

View file

@ -20,7 +20,8 @@
void insertProbeIter(size_t NUM_ENTRIES)
{
unlink("logfile.txt");
unlink("storefile.txt");
//data generation
std::vector<std::string> data_arr;
std::vector<std::string> key_arr;
@ -38,45 +39,18 @@ void insertProbeIter(size_t NUM_ENTRIES)
if(data_arr.size() > NUM_ENTRIES)
data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end());
std::set<datatuple, datatuple> rbtree;
rbtree_t rbtree;
int64_t datasize = 0;
std::vector<pageid_t> dsp;
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = key_arr[i].length()+1;
newtuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
*newtuple.keylen = keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
for(size_t j=0; j<keylen-1; j++)
newtuple.key[j] = key_arr[i][j];
newtuple.key[keylen-1]='\0';
datatuple *newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1,data_arr[i].c_str(), data_arr[i].length()+1);
//prepare the data
uint32_t datalen = data_arr[i].length()+1;
newtuple.datalen = (uint32_t*)malloc(sizeof(uint32_t));
*newtuple.datalen = datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
for(size_t j=0; j<datalen-1; j++)
newtuple.data[j] = data_arr[i][j];
newtuple.data[datalen-1]='\0';
/*
printf("key: \t, keylen: %u\ndata: datalen: %u\n",
//newtuple.key,
*newtuple.keylen,
//newtuple.data,
*newtuple.datalen);
*/
datasize += newtuple.byte_length();
datasize += newtuple->byte_length();
rbtree.insert(newtuple);
}
printf("\nTREE STRUCTURE\n");
@ -88,47 +62,29 @@ void insertProbeIter(size_t NUM_ENTRIES)
int found_tuples=0;
for(int i=NUM_ENTRIES-1; i>=0; i--)
{
//find the key with the given tuple
int ri = i;
//get the key
uint32_t keylen = key_arr[ri].length()+1;
datatuple::key_t rkey = (datatuple::key_t) malloc(keylen);
for(size_t j=0; j<keylen-1; j++)
rkey[j] = key_arr[ri][j];
rkey[keylen-1]='\0';
//find the key with the given tuple
//prepare a search tuple
datatuple search_tuple;
search_tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
*(search_tuple.keylen) = keylen;
search_tuple.key = rkey;
datatuple *search_tuple = datatuple::create(key_arr[ri].c_str(), key_arr[ri].length()+1);
datatuple *ret_tuple=0;
//step 1: look in tree_c0
rbtree_t::iterator rbitr = rbtree.find(search_tuple);
if(rbitr != rbtree.end())
{
datatuple tuple = *rbitr;
byte *barr = tuple.to_bytes();
ret_tuple = datatuple::from_bytes(barr);
datatuple *tuple = *rbitr;
found_tuples++;
assert(*(ret_tuple->keylen) == key_arr[ri].length()+1);
assert(*(ret_tuple->datalen) == data_arr[ri].length()+1);
free(barr);
free(ret_tuple);
assert(tuple->keylen() == key_arr[ri].length()+1);
assert(tuple->datalen() == data_arr[ri].length()+1);
}
else
{
printf("Not in scratch_tree\n");
}
free(search_tuple.keylen);
free(rkey);
datatuple::freetuple(search_tuple);
}
printf("found %d\n", found_tuples);
}

View file

@ -26,8 +26,6 @@ static int svrport = 32432;
void insertProbeIter(size_t NUM_ENTRIES)
{
srand(1000);
// std::string servername = svrname; //"sherpa4";
// int serverport = svrport; //32432;
logstore_handle_t * l = logstore_client_open(svrname, svrport, 100);
@ -97,12 +95,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
}
key_v_list->clear();
delete key_v_list;
// preprandstr(NUM_ENTRIES, data_arr, 10*8192);
printf("key arr size: %d\n", key_arr->size());
//removeduplicates(key_arr);
if(key_arr->size() > NUM_ENTRIES)
key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end());
@ -122,35 +116,21 @@ void insertProbeIter(size_t NUM_ENTRIES)
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the key
datatuple newtuple;
uint32_t keylen = (*key_arr)[i].length()+1;
newtuple.keylen = &keylen;
newtuple.key = (datatuple::key_t) malloc(keylen);
memcpy((byte*)newtuple.key, (*key_arr)[i].c_str(), keylen);
datatuple::len_t keylen = (*key_arr)[i].length()+1;
//prepare the data
std::string ditem;
getnextdata(ditem, 8192);
uint32_t datalen = ditem.length()+1;
newtuple.datalen = &datalen;
newtuple.data = (datatuple::data_t) malloc(datalen);
memcpy((byte*)newtuple.data, ditem.c_str(), datalen);
datatuple::len_t datalen = ditem.length()+1;
/*
printf("key: \t, keylen: %u\ndata: datalen: %u\n",
//newtuple.key,
*newtuple.keylen,
//newtuple.data,
*newtuple.datalen);
*/
datasize += newtuple.byte_length();
datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen,
ditem.c_str(), datalen);
gettimeofday(&ti_st,0);
datasize += newtuple->byte_length();
gettimeofday(&ti_st,0);
//send the data
// datatuple * ret = sendTuple(servername, serverport, OP_INSERT, newtuple);
datatuple * ret = logstore_client_op(l, OP_INSERT, newtuple);
assert(ret);
@ -158,8 +138,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
// insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
insert_time ++; // XXX
free(newtuple.key);
free(newtuple.data);
datatuple::freetuple(newtuple);
if(i % 10000 == 0 && i > 0)
printf("%d / %d inserted.\n", i, NUM_ENTRIES);
@ -177,56 +156,40 @@ void insertProbeIter(size_t NUM_ENTRIES)
int found_tuples=0;
for(int i=NUM_ENTRIES-1; i>=0; i--)
{
int ri = i;
{
int ri = i;
//printf("key index%d\n", i);
fflush(stdout);
//fflush(stdout);
//get the key
uint32_t keylen = (*key_arr)[ri].length()+1;
datatuple searchtuple;
searchtuple.keylen = (uint32_t*)malloc(2*sizeof(uint32_t) + keylen);
*searchtuple.keylen = keylen;
datatuple::len_t keylen = (*key_arr)[ri].length()+1;
datatuple::len_t datalen = 0;
searchtuple.datalen = searchtuple.keylen + 1;
*searchtuple.datalen = 0;
searchtuple.key = (datatuple::key_t)(searchtuple.keylen + 2);
memcpy((byte*)searchtuple.key, (*key_arr)[ri].c_str(), keylen);
datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen);
//find the key with the given tuple
datatuple *dt = logstore_client_op(l, OP_FIND, //servername, serverport, OP_FIND,
searchtuple);
datatuple *dt = logstore_client_op(l, OP_FIND, searchtuple);
assert(dt!=0);
assert(!dt->isDelete());
found_tuples++;
assert(*(dt->keylen) == (*key_arr)[ri].length()+1);
assert(dt->keylen() == (*key_arr)[ri].length()+1);
//free dt
free(dt->keylen);
free(dt->datalen);
free(dt->key);
free(dt->data);
free(dt);
datatuple::freetuple(dt);
dt = 0;
free(searchtuple.keylen);
datatuple::freetuple(searchtuple);
}
printf("found %d\n", found_tuples);
key_arr->clear();
//data_arr->clear();
delete key_arr;
//delete data_arr;
logstore_client_close(l);
gettimeofday(&stop_tv,0);
printf("run time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv)));
}

View file

@ -1,6 +1,8 @@
#include "tuplemerger.h"
#include "logstore.h"
// XXX make the imputs 'const'
// XXX test / reason about this...
datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2)
{
assert(!t1->isDelete() || !t2->isDelete()); //both cannot be delete
@ -9,11 +11,11 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2)
if(t1->isDelete()) //delete - t2
{
t = datatuple::from_bytes(t2->to_bytes());
t = t2->create_copy();
}
else if(t2->isDelete())
{
t = datatuple::from_bytes(t2->to_bytes());
t = t2->create_copy();
}
else //neither is a delete
{
@ -32,26 +34,15 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2)
**/
datatuple* append_merger(datatuple *t1, datatuple *t2)
{
static const size_t isize = sizeof(uint32_t);
struct datatuple *t = (datatuple*) malloc(sizeof(datatuple));
byte *arr = (byte*)malloc(t1->byte_length() + *t2->datalen);
t->keylen = (uint32_t*) arr;
*(t->keylen) = *(t1->keylen);
t->datalen = (uint32_t*) (arr+isize);
*(t->datalen) = *(t1->datalen) + *(t2->datalen);
t->key = (datatuple::key_t) (arr+isize+isize);
memcpy((byte*)t->key, (byte*)t1->key, *(t1->keylen));
t->data = (datatuple::data_t) (arr+isize+isize+ *(t1->keylen));
memcpy((byte*)t->data, (byte*)t1->data, *(t1->datalen));
memcpy(((byte*)t->data) + *(t1->datalen), (byte*)t2->data, *(t2->datalen));
return t;
assert(!(t1->isDelete() || t2->isDelete()));
datatuple::len_t keylen = t1->keylen();
datatuple::len_t datalen = t1->datalen() + t2->datalen();
byte * data = (byte*)malloc(datalen);
memcpy(data, t1->data(), t1->datalen());
memcpy(data + t1->datalen(), t2->data(), t2->datalen());
return datatuple::create(t1->key(), keylen, data, datalen);
}
/**
@ -62,23 +53,5 @@ datatuple* append_merger(datatuple *t1, datatuple *t2)
**/
datatuple* replace_merger(datatuple *t1, datatuple *t2)
{
static const size_t isize = sizeof(uint32_t);
struct datatuple *t = (datatuple*) malloc(sizeof(datatuple));
byte *arr = (byte*)malloc(t2->byte_length());
t->keylen = (uint32_t*) arr;
*(t->keylen) = *(t2->keylen);
t->datalen = (uint32_t*) (arr+isize);
*(t->datalen) = *(t2->datalen);
t->key = (datatuple::key_t) (arr+isize+isize);
memcpy((byte*)t->key, (byte*)t2->key, *(t2->keylen));
t->data = (datatuple::data_t) (arr+isize+isize+ *(t2->keylen));
memcpy((byte*)t->data, (byte*)t2->data, *(t2->datalen));
return t;
return t2->create_copy();
}